ZooKeeper分布式配置,看這篇就夠了
本文轉載自微信公眾號「牧小農」,作者牧小農。轉載本文請聯系牧小農公眾號。
ZooKeeper 的由來
PS:這個不重要,不感興趣的,可以直接看下面
來源:《從 Paxos 到 ZooKeeper 》
ZooKeeper 最早起源于雅虎研究院的一個研究小組,在當時,研究人員發現,在雅虎內部有很多的大型系統基本上都需要依賴一個類似的系統來進行分布式協調,但是這些系統往往都存在分布式單點的問題,所有雅虎的開發人員就嘗試開發了一個通用的無單點問題的分布式協調框架,以便讓開發人員將精力集中在處理業務邏輯上。關于"ZooKeeper"這個項目的名字。也有一個故事,在項目開始初期,因為考慮到內部的很多項目都是用動物的名字來命名的(例如:Pig項目),所以雅虎的工程師也希望給這個項目也取一個動物的名字,這個時候擔任研究院的首席科學家 Raghu Ramakrishnan 開玩笑地說:“在這樣下去,我們這兒就變成動物園”,此話一出,大家紛紛表示就叫動物園管理員吧,因為各個以動物命名的分布式組件放在一起,這個分布式系統看上去就像一個大型的動物園了,而ZooKeeper 正好要用來進行分布式環境的協調,于是ZooKeeper 的名字也就由此誕生了。
分布式配置中心
在上一期中我們講解了 ZooKeeper集群的配置和安裝,ZooKeeper集群 主要是幫我們做分布式協調的,今天我們用ZK實現分布式配置。關于ZooKeeper 集群的配置大家可以參考上一篇文章《Zookeeper 集群部署的那些事兒》。
為什么需要分布式配置中心
對于剛開始的時候,很多公司的服務器可能是由單個組成,但是隨著業務的發展,單一節點的服務無法滿足業務的飛速發展,后面就出現了分布式、集群的概念,到了現在形成的微服務,技術的改進能夠更好的滿足業務的需要。
假設我們線上有很多個微服務分布在不同的服務器上,其中一個微服務,我們就叫它 goods-service,當 goods-service的IP地址需要變更的時候,但是 goods-service又對很多其他的程序提供了服務,這個時候如果沒有一個統一配置的東西,每一個應用到 goods-service的應用程序都要做相應的IP地址修改,這是一個很麻煩的事情!
如果使用ZooKeeper來做分布式配置的話,是可以解決這個問題的。
注冊中心對比
如果我們只考慮服務治理的話,Eureka是比較合適的,Eureka是比較純粹的注冊中心了,和Eureka不同Apache ZooKeeper 在設計的時候就遵循 CP原則,任何時候對 ZooKeeper 的訪問請求都能得到一致的數據結果,同事系統對網絡分割具有容錯性,今天我們講解的就是關于ZooKeeper 的注冊發現。
配置中心的核心
低延遲: 配置改變( create/update/delete)后能夠最快的把最新的配置同步到其他節點中
高可用: 配置中心可以穩定的對外提供服務
其中 低延遲 我們可以通過 ZooKeeper 的 Watcher 機制來實現(等下會講到Watcher機制)。約定一個節點用來存放配置信息,每個客戶端都來監聽這個節點的NodeDataChanged事件,當配置發生改變時將最新的配置更新到這個節點上,誰更新無所謂,任何節點都可以更新,當這個節點觸發 NodeDataChanged 事件后,在去通知所有監聽這個節點的客戶端去獲取這個節點的最新信息,因為watcher 是一次性的,所以當我們在獲取最新信息的時候需要設置監聽事件,大部分查詢信息都是具有原子性的,所以ZooKeeper中的 getData 也是具有原子性操作,能夠保證我們取得的信息是最新的。
對于 高可用 我們首先需要保證的多集群操作來進行ZooKeeper進行部署,在代碼層不太需要做過多的工作。
Watch 機制
Watch 是 ZooKeeper 針對節點的一次性觀察者機制,就如同我們上面 * 低延遲* 中講到的,一次觸發后就失效,需要手工重新創建Watch。
當Watch監視的數據發生變化的時候,就會通知設置了 Watch 的客戶端,就是我們API中的Watcher,Watcher機制就是為了監聽Znode節點發生了哪些變化,所以會有對應的事件類型和狀態類型,用過代碼中switch進行監聽,一個客戶端可以鏈接多個節點,只要Znode節點發生變化就會執行 process(WatchedEventevent)。
如下圖所示:
從上圖中我們可以看到,在ZooKeeper中,Watch采用的是推送機制,而不是客戶端輪詢,有些中間件采用的是拉取的模式,例如:KafKa。
Watch有兩種監聽模式,分別為 事件類型和狀態類型 :
事件類型:Znode 節點關聯,主要是針對節點的操作
- 創建節點:EventType.NodeCreated
- 節點數據發生變化:EventType.NodeDataChanged
- 當前節點的子節點發生變化:EventType.NodeChildrenChanged
- 刪除節點:EventType.NodeDeleted
狀態類型:客戶端關聯,主要是針對于ZooKeeper集群和應用服務之間的狀態的變更
- 未連接:KeeperState.Disconnected
- 已連接:KeeperState.SyncConnected
- 認證失敗:KeeperState.AuthFailed
- 過期:KeeperState.Expired
- 客戶端連接到只讀服務器:KeeperState.ConnectedReadOnly
watch的特性
一次性觸發: 對于ZooKeeper的Watcher事件,是一次性觸發的,當 Watch 監視的數據發生變化的時候,通知設置當前Watch 的 Client,就是我們對應的 Watcher,因為ZooKeeper 的監控都是一次性的,所以我們需要在每次觸發后設置監控。
客戶端串行執行: 客戶端Watcher回調的過程是一個串行同步的過程,可以為我們保證順序的執行。
輕量級: WatchedEvent是ZooKeeper整個Watcher通知機制的最小通知單元,總共包含三個部分(通知狀態、事件類型和節點路徑),Watcher通知,只會告訴客戶端發生事件而不會告知具體內容,需要客戶端主動去進行獲取,比如 當監聽到 WatchedEvent.NodeDataChanged 信息變化的時候,只會告訴我們這個節點的數據發生了變更,你快來獲取最新的值吧。
客戶端設置的每個監視點與會話關聯,如果會話過期,等待中的監視點將會被刪除。不過監視點可以跨越不同服務端的連接而保持,例如,當一個ZooKeeper客戶端與一個ZooKeeper服務端的連接斷開后連接到集合中的另一個服務端,客戶端會發送未觸發的監視點列表,在注冊監視點時,服務端將要檢查已監視的znode節點在之前注冊監視點之后是否已經變化,如果znode節點已經發生變化,一個監視點的事件就會被發送給客戶端,否則在新的服務端上注冊監視點。這一機制使得我們可以關心邏輯層會話,而非底層連接本身。
客戶端注冊
ZooKeeper 注冊的時候會向ZooKeeper 服務端請求注冊,服務端會返回請求響應,不管成功失敗,都會返回響應結果,當響應成功的時候,ZooKeeper服務端會把Watcher對象放到客戶端的WatchManager管理并返回響應給客戶端
服務端注冊
FinalRequestProcessor.ProcessRequest()會判斷當前請求是否需要注冊Watcher
如果ZooKeeper判斷當前客戶端需要進行Watcher注冊,會將當前的ServerCnxn 對象和數據路徑傳入 getData 方法中去。ServerCnxn 是ZooKeeper 客戶端和服務端之間的連接接口,代表了一個客戶端和服務端的連接,可以將 ServerCnxn 當做一個 Watcher 對象,因為它實現了 Watcher 的 process 接口。
WatcherManager
WatcherManager是 ZK服務端 Watcher 的管理器,分為 WatchTable 和 Watch2Paths 兩個存儲結構,這兩個是不同的存儲結構 1)WatchTable:從數據節點路徑的顆粒度管理 Watcher 2)Watch2Paths:從Watcher的顆粒度來控制時間出發的數據節點
在服務端,DataTree 中會托管兩個 WatchManager, 分別是 dataWatches (數據變更Watch) 和 childWatches(子節點變更Watch)。
Watcher 觸發邏輯
1)封裝WatchedEven:將(KeeperState(通知狀態),EventType(事件類型),Path(節點路徑))封裝成一個 WatchedEvent 對象 2)查詢Watcher:根據路徑取出對應的Watcher,如果存在,取出數據同時從 WatcherManager(WatchTable/Watch2Paths) 中刪除 3)調用Process方法觸發Watcher
4.客戶端回調 Watcher
1)反序列化:字節流轉換成 WatcherEvent 對象 2)處理 chrootPath:如果客戶端設置了 chrootPath 屬性,那么需要對服務器傳過來的完整節點路徑進行 chrootPath 處理,生成客戶端的一個相對節點路徑。比如(/mxn/app/love,經過chrootPath處理,會變成 /love) 3)還原 WatchedEvent:WatcherEvent 轉換成 WatchedEvent 4)回調 Watcher:將 WatcherEvent 對象交給 EventThread 線程,在下一個輪詢周期中進行 Watcher 回調
EventThread 處理時間通知
1) SendThread 接收到服務端的通知事件后,會通過調用 EventThread.queueEvent 方法將事件傳給 EventThread 線程 2)queueEvent 方法首先會根據該通知事件,從 ZKWatchManager 中取出所有相關的 Watcher 客戶端識別出 事件類型 EventType 后,會從相應的 Watcher 存儲 (即3個注冊方法( dataWatches、existWatcher 或 childWatcher)中去除對應的 Watcher 3) 獲取到相關的所有 Watcher 后,會將其放入 waitingEvents 這個隊列去
代碼實現
下面我們就來演示如何使用代碼來實現ZooKeeper的配置
首先我們需要引入ZK的jar
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.6.3</version>
- </dependency>
配置類
既然我們要做的是分布式配置,首先我們需要模擬一個配置,這個配置用來同步服務的地址
- /**
- * @program: mxnzookeeper
- * @ClassName MyConf
- * @description: 配置類
- * @author: muxiaonong
- * @create: 2021-10-19 22:18
- * @Version 1.0
- **/
- public class MyConfig {
- private String conf ;
- public String getConf() {
- return conf;
- }
- public void setConf(String conf) {
- this.conf = conf;
- }
- }
Watcher
創建ZooKeeper的時候,我們需要一個Watcher進行監聽,后續對Znode節點操作的時候,我們也需要使用到Watcher,但是這兩類的功能不一樣,所以我們需要定義一個自己的watcher類,如下所示:
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import java.util.concurrent.CountDownLatch;
- /**
- * @program: mxnzookeeper
- * @ClassName DefaultWatch
- * @description:
- * @author: muxiaonong
- * @create: 2021-10-19 22:02
- * @Version 1.0
- **/
- public class DefaultWatch implements Watcher {
- CountDownLatch cc;
- public void setCc(CountDownLatch cc) {
- this.cc = cc;
- }
- @Override
- public void process(WatchedEvent event) {
- System.out.println(event.toString());
- switch (event.getState()) {
- case Unknown:
- break;
- case Disconnected:
- break;
- case NoSyncConnected:
- break;
- case SyncConnected:
- System.out.println("連接成功。。。。。");
- //連接成功后,執行countDown,此時便可以拿zk對象使用了
- cc.countDown();
- break;
- case AuthFailed:
- break;
- case ConnectedReadOnly:
- break;
- case SaslAuthenticated:
- break;
- case Expired:
- break;
- case Closed:
- break;
- }
- }
- }
由于是異步進行操作的,我們創建一個ZooKeeper對象之后,如果不進行阻塞操作的話,有可能還沒有連接完成就執行后續的操作,所以這里我們用 CountDownLatch進行阻塞操作,當監測連接成功后,進行 countDown放行,執行后續的ZK的動作。
當我們連接成功 ZooKeeper 之后,我們需要通過 exists判斷是否存在節點,存在就進行 getData操作。這里我們創建一個 WatchCallBack因為 exists和getData都需要一個callback,所以除了實現Watcher以外還需要實現 節點狀態:AsyncCallback.StatCallback數據監聽:AsyncCallback.DataCallback
- import org.apache.zookeeper.AsyncCallback;
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooKeeper;
- import org.apache.zookeeper.data.Stat;
- import java.util.concurrent.CountDownLatch;
- /**
- * @program: mxnzookeeper
- * @ClassName WatchCallBack
- * @description:
- * @author: muxiaonong
- * @create: 2021-10-19 22:13
- * @Version 1.0
- **/
- public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
- ZooKeeper zk ;
- MyConfig conf ;
- CountDownLatch cc = new CountDownLatch(1);
- public MyConfig getConf() {
- return conf;
- }
- public void setConf(MyConfig conf) {
- this.conf = conf;
- }
- public ZooKeeper getZk() {
- return zk;
- }
- public void setZk(ZooKeeper zk) {
- this.zk = zk;
- }
- public void aWait(){
- //exists的異步實現版本
- zk.exists(ZKConstants.ZK_NODE,this,this ,"exists watch");
- try {
- cc.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- /** @Author mxn
- * @Description //TODO 此回調用于檢索節點的stat
- * @Date 21:24 2021/10/20
- * @param rc 調用返回的code或結果
- * @param path 傳遞給異步調用的路徑
- * @param ctx 傳遞給異步調用的上下文對象
- * @param stat 指定路徑上節點的Stat對象
- * @return
- **/
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- if(stat != null){
- //getData的異步實現版本
- zk.getData(ZKConstants.ZK_NODE,this,this,"status");
- }
- }
- /** @Author mxn
- * @Description //TODO 此回調用于檢索節點的數據和stat
- * @Date 21:23 2021/10/20
- * @param rc 調用返回的code或結果
- * @param path 傳遞給異步調用的路徑
- * @param ctx 傳遞給異步調用的上下文對象
- * @param data 節點的數據
- * @param stat 指定節點的Stat對象
- * @return
- **/
- @Override
- public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
- if(data != null ){
- String s = new String(data);
- conf.setConf(s);
- cc.countDown();
- }
- }
- /** @Author mxn
- * @Description //TODO Watcher接口的實現。
- * Watcher接口指定事件處理程序類必須實現的公共接口。
- * ZooKeeper客戶機將從它連接到的ZooKeeper服務器獲取各種事件。
- * 使用這種客戶機的應用程序通過向客戶機注冊回調對象來處理這些事件。
- * 回調對象應該是實現監視器接口的類的實例。
- * @Date 21:24 2021/10/20
- * @Param watchedEvent WatchedEvent表示監視者能夠響應的ZooKeeper上的更改。
- * WatchedEvent包含發生了什么,
- * ZooKeeper的當前狀態,以及事件中涉及的znode的路徑。
- * @return
- **/
- @Override
- public void process(WatchedEvent event) {
- switch (event.getType()) {
- case None:
- break;
- case NodeCreated:
- //當一個node被創建后,獲取node
- //getData中又會觸發StatCallback的回調processResult
- zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs");
- break;
- case NodeDeleted:
- //節點刪除
- conf.setConf("");
- //重新開啟CountDownLatch
- cc = new CountDownLatch(1);
- break;
- case NodeDataChanged:
- //節點數據被改變了
- //觸發DataCallback的回調
- zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs");
- break;
- //子節點發生變化的時候
- case NodeChildrenChanged:
- break;
- }
- }
- }
當前面準備好了之后,我們可以編寫測試用例了:
ZKUtils 工具類
- import org.apache.zookeeper.ZooKeeper;
- import java.util.concurrent.CountDownLatch;
- /**
- * @program: mxnzookeeper
- * @ClassName ZKUtils
- * @description:
- * @author: muxiaonong
- * @create: 2021-10-19 21:59
- * @Version 1.0
- **/
- public class ZKUtils {
- private static ZooKeeper zk;
- //192.168.5.130:2181/mxn 這個后面/mxn,表示客戶端如果成功建立了到zk集群的連接,
- // 那么默認該客戶端工作的根path就是/mxn,如果不帶/mxn,默認根path是/
- //當然我們要保證/mxn這個節點在ZK上是存在的
- private static String address ="192.18.5.129:2181,192.168.5.130:2181,192.168.5.130:2181/mxn";
- private static DefaultWatch watch = new DefaultWatch();
- private static CountDownLatch init = new CountDownLatch(1);
- public static ZooKeeper getZK(){
- try {
- //因為是異步的,所以要await,等到連接上zk集群之后再進行后續操作
- zk = new ZooKeeper(address,1000,watch);
- watch.setCc(init);
- init.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- return zk;
- }
- }
測試類:
- import org.apache.zookeeper.ZooKeeper;
- import org.junit.Before;
- import org.junit.Test;
- /**
- * @program: mxnzookeeper
- * @ClassName TestConfig
- * @description:
- * @author: muxiaonong
- * @create: 2021-10-19 22:04
- * @Version 1.0
- **/
- public class TestConfig {
- ZooKeeper zk;
- @Before
- public void conn(){
- zk = ZKUtils.getZK();
- }
- /** @Author mxn
- * @Description //TODO 關閉ZK
- * @Date 21:16 2021/10/20
- * @Param
- * @return
- **/
- public void close(){
- try {
- zk.close();
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- @Test
- public void getConf(){
- WatchCallBack watchCallBack = new WatchCallBack();
- watchCallBack.setZk(zk);
- MyConfig myConfig = new MyConfig();
- watchCallBack.setConf(myConfig);
- //阻塞等待
- watchCallBack.aWait();
- while(true){
- if(myConfig.getConf().equals("")){
- System.out.println("zk node 節點丟失了 ......");
- watchCallBack.aWait();
- }else{
- System.out.println(myConfig.getConf());
- }
- //
- try {
- //每隔500毫秒打印一次
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
運行測試
首先我們要知道,因為我們連接IP的時候加上了 /mxn這個目錄結構,所以我們在服務器初始狀態就必須要有這個節點:
集群初始狀態:
- [zk: localhost:2181(CONNECTED) 7] ls /
- [mxn, zookeeper]
我們啟動程序看看
連接成功
ZooKeeper 下 /mxn 現在也是空
- [zk: localhost:2181(CONNECTED) 9] ls /mxn
- []
- [zk: localhost:2181(CONNECTED) 10]
現在我們來創建一個 /mxn/myZNode節點數據
- [zk: localhost:2181(CONNECTED) 10] create /mxn/myZNode "muxiaonong666"
- Created /mxn/myZNode
可以看到,創建完成之后,程序馬上給出響應,打印出了我配置的值,muxiaonong666
此時,再設置 /mxn/myZNode的值為 muxiaonong6969
啪,很快啊!!!我們就可以看到值瞬間改變了
這個時候我們如果刪除 /mxn/myZNode節點,會發生什么呢,前面我們已經寫了watch,如果Znode被刪除了,,watch and callback執行
- case NodeDeleted:
- //節點刪除
- conf.setConf("");
- //重新開啟CountDownLatch
- cc = new CountDownLatch(1);
- break;
- if(myConfig.getConf().equals("")){
- System.out.println("zk node 節點丟失了 ......");
- ////此時應該阻塞住,等待著node重新創建
- watchCallBack.aWait();
- }
刪除 /mxn/myZNode 節點
- delete /mxn/myZNode
我們可以看到前面還在打印數據,后面就提示丟失。
但是這個時候我們客戶端沒有關閉,而是還在等待數據的更新,如果這個時候當重新進行創建 /mxn/myZNode節點的時候,程序又會繼續瘋狂輸出。
- create/mxn/myZNode"muxiaonong666"
程序正常運行,并且成功獲取到了zk配置的最新數據,到這里基本上就實現了,ZooKeeper的分布式配置中心功能了
在這里我測試用的是 getData,但是在項目實戰用可能用的更多的是 子節點的操作 getChildren
總結
到這里我們這篇 ZooKeeper分布式配置注冊發現 就講完了,如果有疑問的地方歡迎進行討論,ZooKeeper 可以作為分布式配置中心,也可以用來當然微服務的注冊,不過現在微服務都有自己的一套服務發現,對于了解ZooKeeper可以我們方便我們在進行技術選型的時候更好的去抉擇, ZooKeeper 的高可用和最終一致性也是比較穩定。
本文代碼地址:https://github.com/muxiaonong/ZooKeeper/tree/master/mxnzookeeper