成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

基于Zookeeper的分布式鎖

開發 前端 分布式
實現分布式鎖目前有三種流行方案,分別為基于數據庫、Redis、Zookeeper的方案,其中前兩種方案網絡上有很多資料可以參考,本文不做展開。我們來看下使用Zookeeper如何實現分布式鎖。

這篇文章只需要你10分鐘的時間。

實現分布式鎖目前有三種流行方案,分別為基于數據庫、Redis、Zookeeper的方案,其中前兩種方案網絡上有很多資料可以參考,本文不做展開。我們來看下使用Zookeeper如何實現分布式鎖。

什么是Zookeeper?

Zookeeper(業界簡稱zk)是一種提供配置管理、分布式協同以及命名的中心化服務,這些提供的功能都是分布式系統中非常底層且必不可少的基本功能,但是如果自己實現這些功能而且要達到高吞吐、低延遲同時還要保持一致性和可用性,實際上非常困難。因此zookeeper提供了這些功能,開發者在zookeeper之上構建自己的各種分布式系統。

雖然zookeeper的實現比較復雜,但是它提供的模型抽象卻是非常簡單的。Zookeeper提供一個多層級的節點命名空間(節點稱為znode),每個節點都用一個以斜杠(/)分隔的路徑表示,而且每個節點都有父節點(根節點除外),非常類似于文件系統。例如,/foo/doo這個表示一個znode,它的父節點為/foo,父父節點為/,而/為根節點沒有父節點。與文件系統不同的是,這些節點都可以設置關聯的數據,而文件系統中只有文件節點可以存放數據而目錄節點不行。Zookeeper為了保證高吞吐和低延遲,在內存中維護了這個樹狀的目錄結構,這種特性使得Zookeeper不能用于存放大量的數據,每個節點的存放數據上限為1M。

而為了保證高可用,zookeeper需要以集群形態來部署,這樣只要集群中大部分機器是可用的(能夠容忍一定的機器故障),那么zookeeper本身仍然是可用的。客戶端在使用zookeeper時,需要知道集群機器列表,通過與集群中的某一臺機器建立TCP連接來使用服務,客戶端使用這個TCP鏈接來發送請求、獲取結果、獲取監聽事件以及發送心跳包。如果這個連接異常斷開了,客戶端可以連接到另外的機器上。

架構簡圖如下所示:

基于Zookeeper的分布式鎖

客戶端的讀請求可以被集群中的任意一臺機器處理,如果讀請求在節點上注冊了監聽器,這個監聽器也是由所連接的zookeeper機器來處理。對于寫請求,這些請求會同時發給其他zookeeper機器并且達成一致后,請求才會返回成功。因此,隨著zookeeper的集群機器增多,讀請求的吞吐會提高但是寫請求的吞吐會下降。

有序性是zookeeper中非常重要的一個特性,所有的更新都是全局有序的,每個更新都有一個唯一的時間戳,這個時間戳稱為zxid(Zookeeper Transaction Id)。而讀請求只會相對于更新有序,也就是讀請求的返回結果中會帶有這個zookeeper***的zxid。

如何使用zookeeper實現分布式鎖?

在描述算法流程之前,先看下zookeeper中幾個關于節點的有趣的性質:

  • 有序節點:假如當前有一個父節點為/lock,我們可以在這個父節點下面創建子節點;zookeeper提供了一個可選的有序特性,例如我們可以創建子節點“/lock/node-”并且指明有序,那么zookeeper在生成子節點時會根據當前的子節點數量自動添加整數序號,也就是說如果是***個創建的子節點,那么生成的子節點為/lock/node-0000000000,下一個節點則為/lock/node-0000000001,依次類推。
  • 臨時節點:客戶端可以建立一個臨時節點,在會話結束或者會話超時后,zookeeper會自動刪除該節點。
  • 事件監聽:在讀取數據時,我們可以同時對節點設置事件監聽,當節點數據或結構變化時,zookeeper會通知客戶端。當前zookeeper有如下四種事件:1)節點創建;2)節點刪除;3)節點數據修改;4)子節點變更。

下面描述使用zookeeper實現分布式鎖的算法流程,假設鎖空間的根節點為/lock:

  • 客戶端連接zookeeper,并在/lock下創建 臨時的 且 有序的 子節點,***個客戶端對應的子節點為/lock/lock-0000000000,第二個為/lock/lock-0000000001,以此類推。
  • 客戶端獲取/lock下的子節點列表,判斷自己創建的子節點是否為當前子節點列表中 序號最小 的子節點,如果是則認為獲得鎖,否則監聽/lock的子節點變更消息,獲得子節點變更通知后重復此步驟直至獲得鎖;
  • 執行業務代碼;
  • 完成業務流程后,刪除對應的子節點釋放鎖。

步驟1中創建的臨時節點能夠保證在故障的情況下鎖也能被釋放,考慮這么個場景:假如客戶端a當前創建的子節點為序號最小的節點,獲得鎖之后客戶端所在機器宕機了,客戶端沒有主動刪除子節點;如果創建的是***的節點,那么這個鎖永遠不會釋放,導致死鎖;由于創建的是臨時節點,客戶端宕機后,過了一定時間zookeeper沒有收到客戶端的心跳包判斷會話失效,將臨時節點刪除從而釋放鎖。

另外細心的朋友可能會想到,在步驟2中獲取子節點列表與設置監聽這兩步操作的原子性問題,考慮這么個場景:客戶端a對應子節點為/lock/lock-0000000000,客戶端b對應子節點為/lock/lock-0000000001,客戶端b獲取子節點列表時發現自己不是序號最小的,但是在設置監聽器前客戶端a完成業務流程刪除了子節點/lock/lock-0000000000,客戶端b設置的監聽器豈不是丟失了這個事件從而導致永遠等待了?這個問題不存在的。因為zookeeper提供的API中設置監聽器的操作與讀操作是 原子執行 的,也就是說在讀子節點列表時同時設置監聽器,保證不會丟失事件。

***,對于這個算法有個極大的優化點:假如當前有1000個節點在等待鎖,如果獲得鎖的客戶端釋放鎖時,這1000個客戶端都會被喚醒,這種情況稱為“羊群效應”;在這種羊群效應中,zookeeper需要通知1000個客戶端,這會阻塞其他的操作,***的情況應該只喚醒新的最小節點對應的客戶端。應該怎么做呢?在設置事件監聽時,每個客戶端應該對剛好在它之前的子節點設置事件監聽,例如子節點列表為/lock/lock-0000000000、/lock/lock-0000000001、/lock/lock-0000000002,序號為1的客戶端監聽序號為0的子節點刪除消息,序號為2的監聽序號為1的子節點刪除消息。

所以調整后的分布式鎖算法流程如下:

  • 客戶端連接zookeeper,并在/lock下創建 臨時的 且 有序的 子節點,***個客戶端對應的子節點為/lock/lock-0000000000,第二個為/lock/lock-0000000001,以此類推。
  • 客戶端獲取/lock下的子節點列表,判斷自己創建的子節點是否為當前子節點列表中 序號最小 的子節點,如果是則認為獲得鎖,否則 監聽剛好在自己之前一位的子節點刪除消息 ,獲得子節點變更通知后重復此步驟直至獲得鎖;
  • 執行業務代碼;
  • 完成業務流程后,刪除對應的子節點釋放鎖。

Curator的源碼分析

雖然zookeeper原生客戶端暴露的API已經非常簡潔了,但是實現一個分布式鎖還是比較麻煩的…我們可以直接使用 curator 這個開源項目提供的zookeeper分布式鎖實現。

我們只需要引入下面這個包(基于maven):

 

  1. <dependency> 
  2.     <groupId>org.apache.curator</groupId> 
  3.     <artifactId>curator-recipes</artifactId> 
  4.     <version>4.0.0</version> 
  5. </dependency> 

然后就可以用啦!代碼如下:

 

  1. public static void main(String[] args) throws Exception { 
  2.     //創建zookeeper的客戶端 
  3.     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); 
  4.     CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy); 
  5.     client.start(); 
  6.  
  7.     //創建分布式鎖, 鎖空間的根節點路徑為/curator/lock 
  8.     InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock"); 
  9.     mutex.acquire(); 
  10.     //獲得了鎖, 進行業務流程 
  11.     System.out.println("Enter mutex"); 
  12.     //完成業務流程, 釋放鎖 
  13.     mutex.release(); 
  14.      
  15.     //關閉客戶端 
  16.     client.close(); 

可以看到關鍵的核心操作就只有mutex.acquire()和mutex.release(),簡直太方便了!

下面來分析下獲取鎖的源碼實現。acquire的方法如下:

 

  1. /* 
  2.  * 獲取鎖,當鎖被占用時會阻塞等待,這個操作支持同線程的可重入(也就是重復獲取鎖),acquire的次數需要與release的次數相同。 
  3.  * @throws Exception ZK errors, connection interruptions 
  4.  */ 
  5. @Override 
  6. public void acquire() throws Exception 
  7.     if ( !internalLock(-1, null) ) 
  8.     { 
  9.         throw new IOException("Lost connection while trying to acquire lock: " + basePath); 
  10.     } 

這里有個地方需要注意,當與zookeeper通信存在異常時,acquire會直接拋出異常,需要使用者自身做重試策略。代碼中調用了internalLock(-1, null),參數表明在鎖被占用時***阻塞等待。internalLock的代碼如下:

 

  1. private boolean internalLock(long time, TimeUnit unit) throws Exception 
  2.  
  3.     //這里處理同線程的可重入性,如果已經獲得鎖,那么只是在對應的數據結構中增加acquire的次數統計,直接返回成功 
  4.     Thread currentThread = Thread.currentThread(); 
  5.     LockData lockData = threadData.get(currentThread); 
  6.     if ( lockData != null ) 
  7.     { 
  8.         // re-entering 
  9.         lockData.lockCount.incrementAndGet(); 
  10.         return true
  11.     } 
  12.  
  13.     //這里才真正去zookeeper中獲取鎖 
  14.     String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); 
  15.     if ( lockPath != null ) 
  16.     { 
  17.         //獲得鎖之后,記錄當前的線程獲得鎖的信息,在重入時只需在LockData中增加次數統計即可 
  18.         LockData newLockData = new LockData(currentThread, lockPath); 
  19.         threadData.put(currentThread, newLockData); 
  20.         return true
  21.     } 
  22.  
  23.     //在阻塞返回時仍然獲取不到鎖,這里上下文的處理隱含的意思為zookeeper通信異常 
  24.     return false

代碼中增加了具體注釋,不做展開。看下zookeeper獲取鎖的具體實現:

 

  1. String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception 
  2.     //參數初始化,此處省略 
  3.     //... 
  4.     
  5.     //自旋獲取鎖 
  6.     while ( !isDone ) 
  7.     { 
  8.         isDone = true
  9.  
  10.         try 
  11.         { 
  12.             //在鎖空間下創建臨時且有序的子節點 
  13.             ourPath = driver.createsTheLock(client, path, localLockNodeBytes); 
  14.             //判斷是否獲得鎖(子節點序號最小),獲得鎖則直接返回,否則阻塞等待前一個子節點刪除通知 
  15.             hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); 
  16.         } 
  17.         catch ( KeeperException.NoNodeException e ) 
  18.         { 
  19.             //對于NoNodeException,代碼中確保了只有發生session過期才會在這里拋出NoNodeException,因此這里根據重試策略進行重試 
  20.             if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) 
  21.             { 
  22.                 isDone = false
  23.             } 
  24.             else 
  25.             { 
  26.                 throw e; 
  27.             } 
  28.         } 
  29.     } 
  30.  
  31.     //如果獲得鎖則返回該子節點的路徑 
  32.     if ( hasTheLock ) 
  33.     { 
  34.         return ourPath; 
  35.     } 
  36.  
  37.     return null

上面代碼中主要有兩步操作:

  • driver.createsTheLock:創建臨時且有序的子節點,里面實現比較簡單不做展開,主要關注幾種節點的模式:1)PERSISTENT(***);2)PERSISTENT_SEQUENTIAL(***且有序);3)EPHEMERAL(臨時);4)EPHEMERAL_SEQUENTIAL(臨時且有序)。
  • internalLockLoop:阻塞等待直到獲得鎖。

看下internalLockLoop是怎么判斷鎖以及阻塞等待的,這里刪除了一些無關代碼,只保留主流程:

 

  1. //自旋直至獲得鎖 
  2. while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) 
  3.     //獲取所有的子節點列表,并且按序號從小到大排序 
  4.     List<String>        children = getSortedChildren(); 
  5.      
  6.     //根據序號判斷當前子節點是否為最小子節點 
  7.     String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash 
  8.     PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); 
  9.     if ( predicateResults.getsTheLock() ) 
  10.     { 
  11.         //如果為最小子節點則認為獲得鎖 
  12.         haveTheLock = true
  13.     } 
  14.     else 
  15.     { 
  16.         //否則獲取前一個子節點 
  17.         String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); 
  18.  
  19.         //這里使用對象監視器做線程同步,當獲取不到鎖時監聽前一個子節點刪除消息并且進行wait(),當前一個子節點刪除(也就是鎖釋放)時,回調會通過notifyAll喚醒此線程,此線程繼續自旋判斷是否獲得鎖 
  20.         synchronized(this) 
  21.         { 
  22.             try  
  23.             { 
  24.                 //這里使用getData()接口而不是checkExists()是因為,如果前一個子節點已經被刪除了那么會拋出異常而且不會設置事件監聽器,而checkExists雖然也可以獲取到節點是否存在的信息但是同時設置了監聽器,這個監聽器其實永遠不會觸發,對于zookeeper來說屬于資源泄露 
  25.                 client.getData().usingWatcher(watcher).forPath(previousSequencePath); 
  26.  
  27.                 //如果設置了阻塞等待的時間 
  28.                 if ( millisToWait != null ) 
  29.                 { 
  30.                     millisToWait -= (System.currentTimeMillis() - startMillis); 
  31.                     startMillis = System.currentTimeMillis(); 
  32.                     if ( millisToWait <= 0 ) 
  33.                     { 
  34.                         doDelete = true;    // 等待時間到達,刪除對應的子節點 
  35.                         break; 
  36.                     } 
  37.                      
  38.                     //等待相應的時間 
  39.                     wait(millisToWait); 
  40.                 } 
  41.                 else 
  42.                 { 
  43.                    //永遠等待 
  44.                     wait(); 
  45.                 } 
  46.             } 
  47.             catch ( KeeperException.NoNodeException e )  
  48.             { 
  49.                 //上面使用getData來設置監聽器時,如果前一個子節點已經被刪除那么會拋出NoNodeException,只需要自旋一次即可,無需額外處理 
  50.             } 
  51.         } 
  52.     } 

具體邏輯見注釋,不再贅述。代碼中設置的事件監聽器,在事件發生回調時只是簡單的notifyAll喚醒當前線程以重新自旋判斷,比較簡單不再展開。

以上。

責任編輯:未麗燕 來源: 深元的博客
相關推薦

2022-10-27 10:44:14

分布式Zookeeper

2021-02-28 07:49:28

Zookeeper分布式

2021-10-25 10:21:59

ZK分布式鎖ZooKeeper

2021-07-16 07:57:34

ZooKeeperCurator源碼

2020-11-16 12:55:41

Redis分布式鎖Zookeeper

2019-07-16 09:22:10

RedisZookeeper分布式鎖

2017-04-13 10:51:09

Consul分布式

2018-01-25 19:01:47

Zookeeper分布式數據

2022-07-25 06:44:19

ZooKeeper分布式鎖

2019-06-19 15:40:06

分布式鎖RedisJava

2021-07-08 09:21:17

ZooKeeper分布式鎖 Curator

2021-06-03 00:02:43

RedisRedlock算法

2021-07-30 00:09:21

Redlock算法Redis

2022-03-08 07:22:48

Redis腳本分布式鎖

2019-10-10 09:16:34

Zookeeper架構分布式

2015-05-18 09:59:48

ZooKeeper分布式計算Hadoop

2022-11-06 19:28:02

分布式鎖etcd云原生

2024-11-28 15:11:28

2021-07-10 10:02:30

ZooKeeperCurator并發

2019-11-18 14:16:10

ZookeeperRedis大數據
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久草中文网 | 中文字幕一区二区三区日韩精品 | 日本在线中文 | 一区二区在线免费观看视频 | 91香蕉视频在线观看 | 精品电影 | 99久久久久久 | 国产一级电影在线观看 | 一级黄色生活视频 | 91伊人网 | www.成人免费视频 | 亚洲情侣视频 | 黄色在线免费看 | 日韩精品一区二区三区中文在线 | 免费一级大片 | 91精品国产综合久久久久 | 亚洲精品1区 | 两性午夜视频 | 黑人成人网 | 亚洲精品68久久久一区 | 久久综合一区 | 人人爱干| 久久精品视频在线观看 | 精品一区二区三区视频在线观看 | 欧美亚洲国产日韩 | 欧美6一10sex性hd | 蜜桃精品噜噜噜成人av | 亚洲午夜精品一区二区三区他趣 | 久久精品国产亚洲a | 亚洲人成免费 | 欧美一区日韩一区 | 免费成人在线网站 | 亚洲国产成人精品久久久国产成人一区 | 欧美午夜精品 | 91网站在线观看视频 | 99久久精品国产一区二区三区 | 欧美xxxx网站 | 久久国产电影 | 国产精品久久久久久久久久久久午夜片 | 国产免费一区二区三区 | 国产精品久久久久久av公交车 |