分布式協(xié)調(diào)框架Zookeeper核心設(shè)計(jì)理解與實(shí)戰(zhàn)
本文轉(zhuǎn)載自微信公眾號(hào)「KK架構(gòu)」,作者wangkai 。轉(zhuǎn)載本文請(qǐng)聯(lián)系KK架構(gòu)公眾號(hào)。
一、前言
想起很久以前在某個(gè)客戶現(xiàn)場(chǎng),微服務(wù) B 突然無法調(diào)用到微服務(wù) A,為了使服務(wù)盡快正常恢復(fù),重啟了微服務(wù) B 。
但客戶不依不饒?jiān)儐栠@個(gè)問題出現(xiàn)的原因,于是我還大老遠(yuǎn)從杭州飛到深圳,現(xiàn)場(chǎng)排查問題。
最后的結(jié)論是,zk 在某時(shí)刻出現(xiàn)主備切換,此時(shí)微服務(wù) A(基于 dubbo)需要重新往 zk上注冊(cè),但是端口號(hào)變了。
但是微服務(wù) B 本地有微服務(wù) A rpc 接口的緩存,緩存里面還是舊的端口,所以調(diào)用不到。
解決方法就是,把微服務(wù)的 rpc 端口號(hào)改成固定的。
雖說原因找到了,但對(duì)于 Zookeeper 的理解還是不夠深刻,于是重新學(xué)習(xí)了 Zookeeper 的核心設(shè)計(jì),并記錄于此文共勉。
二、Zookeeper 核心架構(gòu)設(shè)計(jì)
1、Zookeeper 特點(diǎn)
(1)Zookeeper 是一個(gè)分布式協(xié)調(diào)服務(wù),是為了解決多個(gè)節(jié)點(diǎn)狀態(tài)不一致的問題,充當(dāng)中間機(jī)構(gòu)來調(diào)停。如果出現(xiàn)了不一致,則把這個(gè)不一致的情況寫入到 Zookeeper 中,Zookeeper 會(huì)返回響應(yīng),響應(yīng)成功,則表示幫你達(dá)成了一致。
比如,A、B、C 節(jié)點(diǎn)在集群?jiǎn)?dòng)時(shí),需要推舉出一個(gè)主節(jié)點(diǎn),這個(gè)時(shí)候,A、B、C 只要同時(shí)往 Zookeeper 上注冊(cè)臨時(shí)節(jié)點(diǎn),誰先注冊(cè)成功,誰就是主節(jié)點(diǎn)。
(2)Zookeeper 雖然是一個(gè)集群,但是數(shù)據(jù)并不是分散存儲(chǔ)在各個(gè)節(jié)點(diǎn)上的,而是每個(gè)節(jié)點(diǎn)都保存了集群所有的數(shù)據(jù)。
其中一個(gè)節(jié)點(diǎn)作為主節(jié)點(diǎn),提供分布式事務(wù)的寫服務(wù),其他節(jié)點(diǎn)和這個(gè)節(jié)點(diǎn)同步數(shù)據(jù),保持和主節(jié)點(diǎn)狀態(tài)一致。
(3)Zookeeper 所有節(jié)點(diǎn)的數(shù)據(jù)狀態(tài)通過 Zab 協(xié)議保持一致。當(dāng)集群中沒有 Leader 節(jié)點(diǎn)時(shí),內(nèi)部會(huì)執(zhí)行選舉,選舉結(jié)束,F(xiàn)ollower 和 Leader 執(zhí)行狀態(tài)同步;當(dāng)有 Leader 節(jié)點(diǎn)時(shí),Leader 通過 ZAB 協(xié)議主導(dǎo)分布式事務(wù)的執(zhí)行,并且所有的事務(wù)都是串行執(zhí)行的。
(4)Zookeeper 的節(jié)點(diǎn)個(gè)數(shù)是不能線性擴(kuò)展的,節(jié)點(diǎn)越多,同步數(shù)據(jù)的壓力越大,執(zhí)行分布式事務(wù)性能越差。推薦3、5、7 這樣的數(shù)目。
2、Zookeeper 角色的理解
Zookeeper 并沒有沿用 Master/Slave 概念,而是引入了 Leader,F(xiàn)ollower,Observer 三種角色。
通過 Leader 選舉算法來選定一臺(tái)服務(wù)器充當(dāng) Leader 節(jié)點(diǎn),Leader 服務(wù)器為客戶端提供讀、寫服務(wù)。
Follower 節(jié)點(diǎn)可以參加選舉,也可以接受客戶端的讀請(qǐng)求,但是接受到客戶端的寫請(qǐng)求時(shí),會(huì)轉(zhuǎn)發(fā)到 Leader 服務(wù)器去處理。
Observer 角色只能提供讀服務(wù),不能選舉和被選舉,所以它存在的意義是在不影響寫性能的前提下,提升集群的讀性能。
3、Zookeeper 同時(shí)滿足了 CAP 嗎?
答案是否,CAP 只能同時(shí)滿足其二。
Zookeeper 是有取舍的,它實(shí)現(xiàn)了 A 可用性、P 分區(qū)容錯(cuò)性、C 的寫入一致性,犧牲的是 C的讀一致性。
也就是說,Zookeeper 并不保證讀取的一定是最新的數(shù)據(jù)。如果一定要最新,需要使用 sync 回調(diào)處理。
三、核心機(jī)制一:ZNode 數(shù)據(jù)模型
Zookeeper 的 ZNode 模型其實(shí)可以理解為類文件系統(tǒng),如下圖:
1、ZNode 并不適合存儲(chǔ)太大的數(shù)據(jù)
為什么是類文件系統(tǒng)呢?因?yàn)?ZNode 模型沒有文件和文件夾的概念,每個(gè)節(jié)點(diǎn)既可以有子節(jié)點(diǎn),也可以存儲(chǔ)數(shù)據(jù)。
那么既然每個(gè)節(jié)點(diǎn)可以存儲(chǔ)數(shù)據(jù),是不是可以任意存儲(chǔ)無限制的數(shù)據(jù)呢?答案是否定的。在 Zookeeper 中,限制了每個(gè)節(jié)點(diǎn)只能存儲(chǔ)小于 1 M 的數(shù)據(jù),實(shí)際應(yīng)用中,最好不要超過 1kb。
原因有以下四點(diǎn):
- 同步壓力:Zookeeper 的每個(gè)節(jié)點(diǎn)都存儲(chǔ)了 Zookeeper 的所有數(shù)據(jù),每個(gè)節(jié)點(diǎn)的狀態(tài)都要保持和 Leader 一致,同步過程至少要保證半數(shù)以上的節(jié)點(diǎn)同步成功,才算最終成功。如果數(shù)據(jù)越大,則寫入的難度也越大。
- 請(qǐng)求阻塞:Zookeeper 為了保證寫入的強(qiáng)一致性,會(huì)嚴(yán)格按照寫入的順序串行執(zhí)行,某個(gè)時(shí)刻只能執(zhí)行一個(gè)事務(wù)。如果上一個(gè)事務(wù)執(zhí)行耗時(shí)比較長(zhǎng),會(huì)阻塞后面的請(qǐng)求;
- 存儲(chǔ)壓力:正是因?yàn)槊總€(gè) Zookeeper 的節(jié)點(diǎn)都存儲(chǔ)了完整的數(shù)據(jù),每個(gè) ZNode 存儲(chǔ)的數(shù)據(jù)越大,則消耗的物理內(nèi)存也越大;
- 設(shè)計(jì)初衷:Zookeeper 的設(shè)計(jì)初衷,不是為了提供大規(guī)模的存儲(chǔ)服務(wù),而是提供了這樣的數(shù)據(jù)模型解決一些分布式問題。
2、ZNode 的分類
(1)按生命周期分類
按照聲明周期,ZNode 可分為永久節(jié)點(diǎn)和臨時(shí)節(jié)點(diǎn)。
很好理解,永久節(jié)點(diǎn)就是要顯示的刪除,否則會(huì)一直存在;臨時(shí)節(jié)點(diǎn),是和會(huì)話綁定的,會(huì)話創(chuàng)建的所有節(jié)點(diǎn),會(huì)在會(huì)話斷開連接時(shí),全部被 Zookeeper 系統(tǒng)刪除。
(2)按照是否帶序列號(hào)分類
帶序列號(hào)的話,比如在代碼中創(chuàng)建 /a 節(jié)點(diǎn),創(chuàng)建之后其實(shí)是 /a000000000000001,再創(chuàng)建的話,就是 /a000000000000002,依次遞增
不帶序號(hào),就是創(chuàng)建什么就是什么
(3)所以,一共有四種 ZNode:
- 永久的不帶序號(hào)的
- 永久的帶序號(hào)的
- 臨時(shí)的不帶序號(hào)的
- 臨時(shí)的帶序號(hào)的
(4)注意的點(diǎn)
臨時(shí)節(jié)點(diǎn)下面不能掛載子節(jié)點(diǎn),只能作為其他節(jié)點(diǎn)的葉子節(jié)點(diǎn)。
3. 代碼實(shí)戰(zhàn)
ZNode 的數(shù)據(jù)模型其實(shí)很簡(jiǎn)單,只有這么多知識(shí)。下面用代碼來鞏固一下。
這里我們使用 curator 框架來做 demo。(當(dāng)然,你可以選擇使用 Zookeeper 官方自帶的 Api)
引入 pom 坐標(biāo):
- <!-- curator-framework -->
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>4.2.0</version>
- </dependency>
- <!-- curator-recipes -->
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>4.2.0</version>
- </dependency>
代碼:
- public class ZkTest {
- // 會(huì)話超時(shí)
- private final int SESSION_TIMEOUT = 30 * 1000;
- // 連接超時(shí) 、 有啥區(qū)別
- private static final int CONNECTION_TIMEOUT = 3 * 1000;
- private static final String CONNECT_ADDR = "localhost:2181";
- private CuratorFramework client = null;
- public static void main(String[] args) throws Exception {
- // 創(chuàng)建客戶端
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
- CuratorFramework client = CuratorFrameworkFactory.builder()
- .connectString(CONNECT_ADDR)
- .connectionTimeoutMs(CONNECTION_TIMEOUT)
- .retryPolicy(retryPolicy)
- .build();
- client.start();
- System.out.println(ZooKeeper.States.CONNECTED);
- System.out.println(client.getState());
- // 創(chuàng)建節(jié)點(diǎn) /test1
- client.create()
- .forPath("/test1", "curator data".getBytes(StandardCharsets.UTF_8));
- System.out.println(client.getChildren().forPath("/"));
- // 臨時(shí)節(jié)點(diǎn)
- client.create().withMode(CreateMode.EPHEMERAL)
- .forPath("/secondPath", "hello world".getBytes(StandardCharsets.UTF_8));
- System.out.println(new String(client.getData().forPath("/secondPath")));
- client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
- .forPath("/abc", "hello".getBytes(StandardCharsets.UTF_8));
- // 遞歸創(chuàng)建
- client.create()
- .creatingParentContainersIfNeeded()
- .forPath("/secondPath1/sencond2/sencond3");
- Thread.sleep(10000);
- }
四、核心機(jī)制二:Watcher 監(jiān)聽機(jī)制
Watcher 監(jiān)聽機(jī)制是 Zookeeper 解決各種分布式不一致疑難雜癥的獨(dú)家法門,也是學(xué)習(xí) Zookeeper 必學(xué)的知識(shí)點(diǎn)。
1. 對(duì)于 Watcher 機(jī)制的理解
Zookeeper 提供了數(shù)據(jù)的發(fā)布與訂閱的功能,多個(gè)訂閱者可以同時(shí)監(jiān)聽某一個(gè)對(duì)象,當(dāng)這個(gè)對(duì)象自身狀態(tài)發(fā)生變化時(shí)(例如節(jié)點(diǎn)數(shù)據(jù)或者節(jié)點(diǎn)的子節(jié)點(diǎn)個(gè)數(shù)變化),Zookeeper 系統(tǒng)會(huì)通知這些訂閱者。
對(duì)于發(fā)布和訂閱這個(gè)概念的理解,我們可以用這個(gè)場(chǎng)景來理解:
比如前兩天的臺(tái)風(fēng),老板想發(fā)一個(gè)通知給員工:明天在家辦公。
于是老板會(huì)在釘釘群上 Ding 一個(gè)消息,員工自己打開釘釘查看。
在這個(gè)場(chǎng)景中,老板是發(fā)布者,員工是訂閱者,釘釘群就是 Zookeeper 系統(tǒng)。
老板并不一一給員工發(fā)消息,而是把消息發(fā)到群里,員工就可以感知到消息的變化。
訂閱者 | 員工 | 客戶端1 |
---|---|---|
系統(tǒng) | 釘釘群 | Zookeeper系統(tǒng) |
發(fā)布者 | 老板 | 客戶端2 |
2、 Watcher 機(jī)制的流程
客戶端首先將 Watcher 注冊(cè)到服務(wù)器上,同時(shí)將 Watcher 對(duì)象保存在客戶端的 Watcher 管理器中。當(dāng) Zookeeper 服務(wù)端監(jiān)聽到數(shù)據(jù)狀態(tài)發(fā)生變化時(shí),服務(wù)端會(huì)首先主動(dòng)通知客戶端,接著客戶端的 Watcher 管理器會(huì)觸發(fā)相關(guān)的 Watcher 來回調(diào)響應(yīng)的邏輯,從而完成整體的發(fā)布/訂閱流程。
監(jiān)聽器 Watcher 的定義:
- public interface Watcher {
- // WatchedEvent 對(duì)象中有下面三個(gè)屬性,Zookeeper狀態(tài),事件類型,路徑
- // final private KeeperState keeperState;
- // final private EventType eventType;
- // private String path;
- abstract public void process(WatchedEvent event);
- }
下面是監(jiān)聽的大致流程圖:
稍稍解釋一下:
1、Client1 和 Client2 都關(guān)心 /app2 節(jié)點(diǎn)的數(shù)據(jù)狀態(tài)變化,于是注冊(cè)一個(gè)對(duì)于 /app2 的監(jiān)聽器到 Zookeeper 上;
2、當(dāng) Client3 修改 /app2 的值后,Zookeeper 會(huì)主動(dòng)通知 Client1 和 Client2 ,并且回調(diào)監(jiān)聽器的方法。
當(dāng)然這里的數(shù)據(jù)狀態(tài)變化有下面這些類型:
- 節(jié)點(diǎn)被創(chuàng)建;
- 節(jié)點(diǎn)被刪除;
- 節(jié)點(diǎn)數(shù)據(jù)發(fā)生改變;
- 節(jié)點(diǎn)的子節(jié)點(diǎn)個(gè)數(shù)發(fā)生改變。
3. 通過代碼來初步理解
我們還是用 Curator 框架來驗(yàn)證一下這個(gè)監(jiān)聽器。
代碼很簡(jiǎn)單,這里我們使用 TreeCache 表示對(duì)于 /app2 的監(jiān)聽,并且注冊(cè)了監(jiān)聽的方法。
- public class CuratorWatcher {
- public static void main(String[] args) throws Exception {
- CuratorFramework client = CuratorFrameworkFactory.builder().connectString("localhost:2181")
- .connectionTimeoutMs(10000)
- .retryPolicy(new ExponentialBackoffRetry(1000, 10))
- .build();
- client.start();
- String path = "/app2";
- TreeCache treeCache = new TreeCache(client, path);
- treeCache.start();
- treeCache.getListenable().addListener((client1, event) -> {
- System.out.println("event.getData()," + event.getData());
- System.out.println("event.getType()," + event.getType());
- });
- Thread.sleep(Integer.MAX_VALUE);
- }
- }
當(dāng) /app2 的狀態(tài)發(fā)生變化時(shí),就會(huì)調(diào)用監(jiān)聽的方法。
Curator 是對(duì)原生的 Zookeeper Api 有封裝的,原生的 Zookeeper 提供的 Api ,注冊(cè)監(jiān)聽后,當(dāng)數(shù)據(jù)發(fā)生改變時(shí),監(jiān)聽就被服務(wù)端刪除了,要重復(fù)注冊(cè)監(jiān)聽。
Curator 則對(duì)這個(gè)做了相應(yīng)的封裝和改進(jìn)。
五、代碼實(shí)戰(zhàn):實(shí)現(xiàn)主備選舉
這里我們主要想實(shí)現(xiàn)的功能是:
- 有兩個(gè)節(jié)點(diǎn),bigdata001,bigdata002 ,他們互相主備。
- bigdata001 啟動(dòng)時(shí),往 zk 上注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn) /ElectorLock(鎖),并且往 /ActiveMaster 下面注冊(cè)一個(gè)子節(jié)點(diǎn),表示自己是主節(jié)點(diǎn)。
- bigdata002 啟動(dòng)時(shí),發(fā)現(xiàn)臨時(shí)節(jié)點(diǎn) /ElectorLock 存在,表示當(dāng)前系統(tǒng)已經(jīng)有主節(jié)點(diǎn)了,則自己往 /StandbyMaster 下注冊(cè)一個(gè)節(jié)點(diǎn),表示自己是 standby。
- bigdata001 退出時(shí),釋放 /ElectorLock,并且刪除 /activeMaster 下的節(jié)點(diǎn)。
- bigdata002 感知到 /ElectorLock 不存在時(shí),則自己去注冊(cè) /ElectorLock,并在 /ActiveMaster 下注冊(cè)自己,表示自己已經(jīng)成為了主節(jié)點(diǎn)。
代碼還是用 Curator 框架實(shí)現(xiàn)的:
- package com.kkarch.zookeeper;
- import cn.hutool.core.util.StrUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.recipes.cache.TreeCache;
- import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
- import org.apache.zookeeper.CreateMode;
- import java.nio.charset.StandardCharsets;
- /**
- * 分布式選舉
- *
- * @Author wangkai
- * @Time 2021/7/25 20:12
- */
- @Slf4j
- public class ElectorTest {
- private static final String PARENT = "/cluster_ha";
- private static final String ACTIVE = PARENT + "/ActiveMaster";
- private static final String STANDBY = PARENT + "/StandbyMaster";
- private static final String LOCK = PARENT + "/ElectorLock";
- private static final String HOSTNAME = "bigdata05";
- private static final String activeMasterPath = ACTIVE + "/" + HOSTNAME;
- private static final String standByMasterPath = STANDBY + "/" + HOSTNAME;
- public static void main(String[] args) throws Exception {
- CuratorFramework zk = ZkUtil.createZkClient("localhost:2181");
- zk.start();
- // 注冊(cè)好監(jiān)聽
- TreeCache treeCache = new TreeCache(zk, PARENT);
- treeCache.start();
- treeCache.getListenable().addListener((client, event) -> {
- if (event.getType().equals(TreeCacheEvent.Type.INITIALIZED) || event.getType().equals(TreeCacheEvent.Type.CONNECTION_LOST)
- || event.getType().equals(TreeCacheEvent.Type.CONNECTION_RECONNECTED) || event.getType().equals(TreeCacheEvent.Type.CONNECTION_SUSPENDED)) {
- return;
- }
- System.out.println(event.getData());
- // 如果 Active 下有節(jié)點(diǎn)被移除了,沒有節(jié)點(diǎn),則應(yīng)該去競(jìng)選成為 Active
- if (StrUtil.startWith(event.getData().getPath(), ACTIVE) && event.getType().equals(TreeCacheEvent.Type.NODE_REMOVED)) {
- if (getChildrenNumber(zk, ACTIVE) == 0) {
- createZNode(client, LOCK, HOSTNAME.getBytes(StandardCharsets.UTF_8), CreateMode.EPHEMERAL);
- System.out.println(HOSTNAME + "爭(zhēng)搶到了鎖");
- }
- }
- // 如果有鎖節(jié)點(diǎn)被創(chuàng)建,則判斷是不是自己創(chuàng)建的,如果是,則切換自己的狀態(tài)為 ACTIVE
- else if (StrUtil.equals(event.getData().getPath(), LOCK) && event.getType().equals(TreeCacheEvent.Type.NODE_ADDED)) {
- if (StrUtil.equals(new String(event.getData().getData()), HOSTNAME)) {
- createZNode(zk, activeMasterPath, HOSTNAME.getBytes(StandardCharsets.UTF_8), CreateMode.EPHEMERAL);
- if (checkExists(client, standByMasterPath)) {
- deleteZNode(client, standByMasterPath);
- }
- }
- }
- });
- // 先創(chuàng)建 ACTIVE 和 STANDBY 節(jié)點(diǎn)
- if (zk.checkExists().forPath(ACTIVE) == null) {
- zk.create().creatingParentContainersIfNeeded().forPath(ACTIVE);
- }
- if (zk.checkExists().forPath(STANDBY) == null) {
- zk.create().creatingParentContainersIfNeeded().forPath(STANDBY);
- }
- // 判斷 ACTIVE 下是否有子節(jié)點(diǎn),如果沒有則去爭(zhēng)搶一把鎖
- if (getChildrenNumber(zk, ACTIVE) == 0) {
- createZNode(zk, LOCK, HOSTNAME.getBytes(StandardCharsets.UTF_8), CreateMode.EPHEMERAL);
- }
- // 如果有,則自己成為 STANDBY 狀態(tài)
- else {
- createZNode(zk, standByMasterPath, HOSTNAME.getBytes(StandardCharsets.UTF_8), CreateMode.EPHEMERAL);
- }
- Thread.sleep(1000000000);
- }
- public static int getChildrenNumber(CuratorFramework client, String path) throws Exception {
- return client.getChildren().forPath(path).size();
- }
- public static void createZNode(CuratorFramework client, String path, byte[] data, CreateMode mode) {
- try {
- client.create().withMode(mode).forPath(path, data);
- } catch (Exception e) {
- log.error("創(chuàng)建節(jié)點(diǎn)失敗", e);
- System.out.println("創(chuàng)建節(jié)點(diǎn)失敗了");
- }
- }
- public static boolean checkExists(CuratorFramework client, String path) throws Exception {
- return client.checkExists().forPath(path) != null;
- }
- public static void deleteZNode(CuratorFramework client, String path) {
- try {
- if (checkExists(client, path)) {
- client.delete().forPath(path);
- }
- } catch (Exception e) {
- log.error("刪除節(jié)點(diǎn)失敗", e);
- }
- }
- }