面試官:Zookeeper了解嗎?說說都有哪些使用場景?
前言
- Zookeeper特性與節點說明
- Zookeeper客戶端使用與集群原理
前兩篇講了Zookeeper的特性、客戶端使用和集群原理,因為 Zookeeper 是分布式系統中很常見的一個基礎系統。 而且問的話常問的就是說 zookeeper 的使用場景是什么? 看你知道不知道一些基本的使用場景。 但是其實 Zookeeper 挖深了自然是可以問的很深很深的。本文主要來聊聊 Zookeeper 主要的幾個使用場景。
- 分布式集群管理
- 分布式注冊中心
- 分布式JOB
- 分布式鎖
分布式集群管理
分布式集群管理的需求
- 主動查看線上服務節點
- 查看服務節點資源使用情況
- 服務離線通知
- 服務資源(CPU、內存、硬盤)超出閥值通知
架構設計
節點結構
- niuh-manger // 根節點
- server00001 : //服務節點 1
- server00002 ://服務節點 2
- server........n ://服務節點 n
服務狀態信息
- ip
- cpu
- memory
- disk
功能實現
數據生成與上報
- 創建臨時節點:
- 定時變更節點狀態信息:
主動查詢
- 實時查詢 zookeeper 獲取集群節點的狀態信息。
被動通知
- 監聽根節點下子節點的變化情況,如果CPU 等硬件資源低于警告位則發出警報。
關鍵示例代碼
- package com.niuh.os;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import org.I0Itec.zkclient.ZkClient;
- import java.lang.instrument.Instrumentation;
- import java.lang.management.ManagementFactory;
- import java.lang.management.MemoryUsage;
- import java.net.InetAddress;
- import java.net.UnknownHostException;
- public class Agent {
- private static Agent ourInstance = new Agent();
- private String server = "127.0.0.1:2181";
- private ZkClient zkClient;
- private static final String rootPath = "/niuh-manger";
- private static final String servicePath = rootPath + "/service";
- private String nodePath; ///niuh-manger/service0000001 當前節點路徑
- private Thread stateThread;
- public static Agent getInstance() {
- return ourInstance;
- }
- private Agent() {
- }
- // javaagent 數據監控
- public static void premain(String args, Instrumentation instrumentation) {
- Agent.getInstance().init();
- }
- public void init() {
- zkClient = new ZkClient(server, 5000, 10000);
- System.out.println("zk連接成功" + server);
- // 創建根節點
- buildRoot();
- // 創建臨時節點
- createServerNode();
- // 啟動更新的線程
- stateThread = new Thread(() -> {
- while (true) {
- updateServerNode();
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }, "zk_stateThread");
- stateThread.setDaemon(true);
- stateThread.start();
- }
- // 數據寫到 當前的臨時節點中去
- public void updateServerNode() {
- zkClient.writeData(nodePath, getOsInfo());
- }
- // 生成服務節點
- public void createServerNode() {
- nodePath = zkClient.createEphemeralSequential(servicePath, getOsInfo());
- System.out.println("創建節點:" + nodePath);
- }
- // 更新服務節點狀態
- public String getOsInfo() {
- OsBean bean = new OsBean();
- bean.lastUpdateTime = System.currentTimeMillis();
- bean.ip = getLocalIp();
- bean.cpu = CPUMonitorCalc.getInstance().getProcessCpu();
- MemoryUsage memoryUsag = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
- bean.usedMemorySize = memoryUsag.getUsed() / 1024 / 1024;
- bean.usableMemorySize = memoryUsag.getMax() / 1024 / 1024;
- bean.pid = ManagementFactory.getRuntimeMXBean().getName();
- ObjectMapper mapper = new ObjectMapper();
- try {
- return mapper.writeValueAsString(bean);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
- public static String getLocalIp() {
- InetAddress addr = null;
- try {
- addr = InetAddress.getLocalHost();
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
- return addr.getHostAddress();
- }
- public void buildRoot() {
- if (!zkClient.exists(rootPath)) {
- zkClient.createPersistent(rootPath);
- }
- }
- }
實現效果
啟動參數設置
運行測試用例:
- package com.niuh.test;
- import com.niuh.os.Agent;
- import org.junit.Ignore;
- import org.junit.Test;
- public class AgentTest {
- @Test
- @Ignore
- public void initTest() {
- Agent.premain(null, null);
- runCPU(2); //20% 占用
- try {
- Thread.sleep(Long.MAX_VALUE);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- //
- private void runCPU(int count) {
- for (int i = 0; i < count; i++) {
- new Thread(() -> {
- while (true) {
- long bac = 1000000;
- bac = bac >> 1;
- }
- }).start();
- ;
- }
- }
- }
控制臺輸出:
- CPU 報警...22.55120088850181
- CPU 報警...46.06592086097357CPU 報警...47.87206766163349CPU 報警...49.49176420213768CPU 報警...48.967942479969004CPU 報警...49.193921607021565CPU 報警...48.806604284784676CPU 報警...48.63229912951865CPU 報警...49.34509647972038CPU 報警...47.07551108884401CPU 報警...49.18489236134496CPU 報警...49.903007346777066CPU 報警...49.28868795953268// 關閉測試用例服務已下線:OsBean{ip='192.168.43.11', cpu=49.28868795953268, usedMemorySize=56, usableMemorySize=3641, pid='47192@hejianhui', lastUpdateTime=1602056208842}
本Demo不適用在生產環境,示例Demo涉及組件zookeeper-agent、zookeeper-web。源代碼提交在 github:https://github.com/Niuh-Frame/niuh-zookeeper。
分布式注冊中心
在單體式服務中,通常是由多個客戶端去調用一個服務,只要在客戶端中配置唯一服務節點地址即可,當升級到分布式后,服務節點變多,像一線大廠服務節點更是上萬之多,這么多節點不可能手動配置在客戶端,這里就需要一個中間服務,專門用于幫助客戶端發現服務節點,即許多技術書籍經常提到的服務發現。
一個完整的注冊中心涵蓋以下功能特性:
- 服務注冊:提供者上線時將自提供的服務提交給注冊中心。
- 服務注銷:通知注冊心提供者下線。
- 服務訂閱:動態實時接收服務變更消息。
- 可靠:注冊服務本身是集群的,數據冗余存儲。避免單點故障,及數據丟失。
- 容錯:當服務提供者出現宕機,斷電等極情況時,注冊中心能夠動態感知并通知客戶端服務提供者的狀態。
Dubbo 對 Zookeeper的使用
阿里著名的開源項目Dubbo 是一個基于JAVA的RCP框架,其中必不可少的注冊中心可基于多種第三方組件實現,但其官方推薦的還是Zookeeper作為注冊中心服務。
Dubbo Zookeeper注冊中心存儲結構
節點說明
流程說明
- 服務提供者啟動時: 向 /dubbo/com.foo.BarService/providers 目錄下寫入自己的 URL 地址
- 服務消費者啟動時: 訂閱 /dubbo/com.foo.BarService/providers 目錄下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目錄下寫入自己的 URL 地址
- 監控中心啟動時: 訂閱 /dubbo/com.foo.BarService 目錄下的所有提供者和消費者 URL 地址。
示例Demo
服務端代碼
- package com.niuh.zk.dubbo;
- import com.alibaba.dubbo.config.ApplicationConfig;
- import com.alibaba.dubbo.config.ProtocolConfig;
- import com.alibaba.dubbo.config.RegistryConfig;
- import com.alibaba.dubbo.config.ServiceConfig;
- import java.io.IOException;
- public class Server {
- public void openServer(int port) {
- // 構建應用
- ApplicationConfig config = new ApplicationConfig();
- config.setName("simple-app");
- // 通信協議
- ProtocolConfig protocolConfig = new ProtocolConfig("dubbo", port);
- protocolConfig.setThreads(200);
- ServiceConfig<UserService> serviceConfig = new ServiceConfig();
- serviceConfig.setApplication(config);
- serviceConfig.setProtocol(protocolConfig);
- serviceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
- serviceConfig.setInterface(UserService.class);
- UserServiceImpl ref = new UserServiceImpl();
- serviceConfig.setRef(ref);
- //開始提供服務 開張做生意
- serviceConfig.export();
- System.out.println("服務已開啟!端口:"+serviceConfig.getExportedUrls().get(0).getPort());
- ref.setPort(serviceConfig.getExportedUrls().get(0).getPort());
- }
- public static void main(String[] args) throws IOException {
- new Server().openServer(-1);
- System.in.read();
- }
- }
客戶端代碼
- package com.niuh.zk.dubbo;
- import com.alibaba.dubbo.config.ApplicationConfig;
- import com.alibaba.dubbo.config.ReferenceConfig;
- import com.alibaba.dubbo.config.RegistryConfig;
- import java.io.IOException;
- public class Client { UserService service; // URL 遠程服務的調用地址 public UserService buildService(String url) { ApplicationConfig config = new ApplicationConfig("young-app");
- // 構建一個引用對象 ReferenceConfig<UserService> referenceConfig = new ReferenceConfig<UserService>(); referenceConfig.setApplication(config);
- referenceConfig.setInterface(UserService.class); // referenceConfig.setUrl(url); referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
- referenceConfig.setTimeout(5000);
- // 透明化 this.service = referenceConfig.get(); return service;
- } static int i = 0;
- public static void main(String[] args) throws IOException { Client client1 = new Client(); client1.buildService("");
- String cmd; while (!(cmd = read()).equals("exit")) {
- UserVo u = client1.service.getUser(Integer.parseInt(cmd)); System.out.println(u); } } private static String read() throws IOException {
- byte[] b = new byte[1024];
- int size = System.in.read(b);
- return new String(b, 0, size).trim();
- }}
查詢 zk 實際存儲內容:
- /dubbo
- /dubbo/com.niuh.zk.dubbo.UserService/dubbo/com.niuh.zk.dubbo.UserService/configurators/dubbo/com.niuh.zk.dubbo.UserService/routers/dubbo/com.niuh.zk.dubbo.UserService/providers/dubbo/com.niuh.zk.dubbo.UserService/providers/dubbo://192.168.43.11:20880/com.niuh.zk.dubbo.UserService?anyhost=true&application=simple-app&dubbo=2.6.2&generic=false&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=48302&side=provider&threads=200×tamp=1602057895881/dubbo/com.niuh.zk.dubbo.UserService/consumers/dubbo/com.niuh.zk.dubbo.UserService/consumers/consumer://192.168.43.11com.niuh.zk.dubbo.UserService?application=young-app&category=consumers&check=false&dubbo=2.6.2&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=49036&side=consumer&timeout=5000×tamp=1602075359549
示例Demo涉及組件zookeeper-dubbo。源代碼提交在 github:https://github.com/Niuh-Frame/niuh-zookeeper。
分布式JOB
分布式JOB需求
多個服務節點只允許其中一個主節點運行JOB任務。
當主節點掛掉后能自動切換主節點,繼續執行JOB任務。
架構設計
node結構
- niuh-master
- server0001:master
- server0002:slave
- server000n:slave
選舉流程
服務啟動:
- 在niuh-maste下創建server子節點,值為slave
- 獲取所有niuh-master 下所有子節點
- 判斷是否存在master 節點
- 如果沒有設置自己為master節點
子節點刪除事件觸發:
- 獲取所有niuh-master 下所有子節點
- 判斷是否存在master 節點
- 如果沒有設置最小值序號為master 節點
示例Demo
- package com.niuh.zookeeper.master;
- import org.I0Itec.zkclient.ZkClient;
- import java.util.Map;
- import java.util.stream.Collectors;
- public class MasterResolve {
- private String server = "127.0.0.1:2181";
- private ZkClient zkClient;
- private static final String rootPath = "/niuh-master";
- private static final String servicePath = rootPath + "/service";
- private String nodePath;
- private volatile boolean master = false;
- private static MasterResolve resolve;
- private MasterResolve() {
- zkClient = new ZkClient(server, 2000, 5000);
- buildRoot(); createServerNode(); } public static MasterResolve getInstance() {
- if (resolve == null) {
- resolve= new MasterResolve();
- } return resolve;
- } // 構建根節點
- public void buildRoot() {
- if (!zkClient.exists(rootPath)) {
- zkClient.createPersistent(rootPath);
- }
- }
- // 創建server節點
- public void createServerNode() {
- nodePath = zkClient.createEphemeralSequential(servicePath, "slave");
- System.out.println("創建service節點:" + nodePath);
- initMaster();
- initListener();
- }
- private void initMaster() {
- boolean existMaster = zkClient.getChildren(rootPath)
- .stream()
- .map(p -> rootPath + "/" + p)
- .map(p -> zkClient.readData(p))
- .anyMatch(d -> "master".equals(d));
- if (!existMaster) {
- doElection();
- System.out.println("當前當選master");
- }
- }
- private void initListener() {
- zkClient.subscribeChildChanges(rootPath, (parentPath, currentChilds) -> {
- doElection();// 執行選舉
- });
- }
- // 執行選舉
- public void doElection() {
- Map<String, Object> childData = zkClient.getChildren(rootPath)
- .stream()
- .map(p -> rootPath + "/" + p)
- .collect(Collectors.toMap(p -> p, p -> zkClient.readData(p)));
- if (childData.containsValue("master")) {
- return;
- }
- childData.keySet().stream().sorted().findFirst().ifPresent(p -> {
- if (p.equals(nodePath)) { // 設置最小值序號為master 節點
- zkClient.writeData(nodePath, "master");
- master = true;
- System.out.println("當前當選master" + nodePath);
- }
- });
- }
- public static boolean isMaster() {
- return getInstance().master;
- }
- }
示例Demo涉及組件zookeeper-master。源代碼提交在 github :https://github.com/Niuh-Frame/niuh-zookeeper。
分布式鎖
鎖的的基本概念
開發中鎖的概念并不陌生,通過鎖可以實現在多個線程或多個進程間在爭搶資源時,能夠合理的分配置資源的所有權。在單體應用中我們可以通過 synchronized 或 ReentrantLock 來實現鎖。但在分布式系統中,僅僅是加synchronized 是不夠的,需要借助第三組件來實現。比如一些簡單的做法是使用關系型數據行級鎖來實現不同進程之間的互斥,但大型分布式系統的性能瓶頸往往集中在數據庫操作上。為了提高性能得采用如Redis、Zookeeper之內的組件實現分布式鎖。
共享鎖:也稱作只讀鎖,當一方獲得共享鎖之后,其它方也可以獲得共享鎖。但其只允許讀取。在共享鎖全部釋放之前,其它方不能獲得寫鎖。
排它鎖:也稱作讀寫鎖,獲得排它鎖后,可以進行數據的讀寫。在其釋放之前,其它方不能獲得任何鎖。
鎖的獲取
某銀行賬戶,可以同時進行帳戶信息的讀取,但讀取期間不能修改帳戶數據。其賬戶ID為:888
獲得讀鎖流程
- 基于資源ID創建臨時序號讀鎖節點 /lock/888.R0000000002 Read
- 獲取 /lock 下所有子節點,判斷其最小的節點是否為讀鎖,如果是則獲鎖成功
- 最小節點不是讀鎖,則阻塞等待。添加lock/ 子節點變更監聽。
- 當節點變更監聽觸發,執行第2步
數據結構
獲得寫鎖
- 基于資源ID創建臨時序號寫鎖節點 /lock/888.R0000000002 Write
- 獲取 /lock 下所有子節點,判斷其最小的節點是否為自己,如果是則獲鎖成功
- 最小節點不是自己,則阻塞等待。添加lock/ 子節點變更監聽。
- 當節點變更監聽觸發,執行第2步
釋放鎖
讀取完畢后,手動刪除臨時節點,如果獲鎖期間宕機,則會在會話失效后自動刪除。
關于羊群效應
在等待鎖獲得期間,所有等待節點都在監聽 Lock節點,一但lock 節點變更所有等待節點都會被觸發,然后在同時反查Lock 子節點。如果等待對例過大會使用Zookeeper承受非常大的流量壓力。
為了改善這種情況,可以采用監聽鏈表的方式,每個等待隊列只監聽前一個節點,如果前一個節點釋放鎖的時候,才會被觸發通知。這樣就形成了一個監聽鏈表。
示例Demo
- package com.niuh.zookeeper.lock;
- import org.I0Itec.zkclient.IZkDataListener;
- import org.I0Itec.zkclient.ZkClient;
- import java.util.List;
- import java.util.stream.Collectors;
- public class ZookeeperLock {
- private String server = "127.0.0.1:2181";
- private ZkClient zkClient;
- private static final String rootPath = "/niuh-lock1";
- public ZookeeperLock() {
- zkClient = new ZkClient(server, 5000, 20000);
- buildRoot(); } // 構建根節點
- public void buildRoot() {
- if (!zkClient.exists(rootPath)) {
- zkClient.createPersistent(rootPath);
- }
- }
- // 獲取鎖
- public Lock lock(String lockId, long timeout) {
- // 創建臨時節點
- Lock lockNode = createLockNode(lockId);
- lockNode = tryActiveLock(lockNode);// 嘗試激活鎖
- if (!lockNode.isActive()) {
- try {
- synchronized (lockNode) {
- lockNode.wait(timeout); // 線程鎖住
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- if (!lockNode.isActive()) {
- throw new RuntimeException(" lock timeout");
- }
- return lockNode;
- }
- // 釋放鎖
- public void unlock(Lock lock) {
- if (lock.isActive()) {
- zkClient.delete(lock.getPath());
- }
- }
- // 嘗試激活鎖
- private Lock tryActiveLock(Lock lockNode) {
- // 獲取根節點下面所有的子節點
- List<String> list = zkClient.getChildren(rootPath)
- .stream()
- .sorted()
- .map(p -> rootPath + "/" + p)
- .collect(Collectors.toList()); // 判斷當前是否為最小節點
- String firstNodePath = list.get(0);
- // 最小節點是不是當前節點
- if (firstNodePath.equals(lockNode.getPath())) {
- lockNode.setActive(true);
- } else {
- String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1);
- zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- }
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- // 事件處理 與心跳 在同一個線程,如果Debug時占用太多時間,將導致本節點被刪除,從而影響鎖邏輯。
- System.out.println("節點刪除:" + dataPath);
- Lock lock = tryActiveLock(lockNode);
- synchronized (lockNode) {
- if (lock.isActive()) {
- lockNode.notify(); // 釋放了
- }
- }
- zkClient.unsubscribeDataChanges(upNodePath, this);
- }
- });
- }
- return lockNode;
- }
- public Lock createLockNode(String lockId) {
- String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "w");
- return new Lock(lockId, nodePath);
- }
- }
示例Demo涉及組件zookeeper-lock。源代碼提交在 github :https://github.com/Niuh-Frame/niuh-zookeeper。