一文搞懂 CountDownLatch 用法和源碼!
CountDownLatch 是多線(xiàn)程控制的一種工具,它被稱(chēng)為 門(mén)閥、 計(jì)數(shù)器或者 閉鎖。這個(gè)工具經(jīng)常用來(lái)用來(lái)協(xié)調(diào)多個(gè)線(xiàn)程之間的同步,或者說(shuō)起到線(xiàn)程之間的通信(而不是用作互斥的作用)。下面我們就來(lái)一起認(rèn)識(shí)一下 CountDownLatch
認(rèn)識(shí) CountDownLatch
CountDownLatch 能夠使一個(gè)線(xiàn)程在等待另外一些線(xiàn)程完成各自工作之后,再繼續(xù)執(zhí)行。它相當(dāng)于是一個(gè)計(jì)數(shù)器,這個(gè)計(jì)數(shù)器的初始值就是線(xiàn)程的數(shù)量,每當(dāng)一個(gè)任務(wù)完成后,計(jì)數(shù)器的值就會(huì)減一,當(dāng)計(jì)數(shù)器的值為 0 時(shí),表示所有的線(xiàn)程都已經(jīng)任務(wù)了,然后在 CountDownLatch 上等待的線(xiàn)程就可以恢復(fù)執(zhí)行接下來(lái)的任務(wù)。
CountDownLatch 的使用
CountDownLatch 提供了一個(gè)構(gòu)造方法,你必須指定其初始值,還指定了 countDown 方法,這個(gè)方法的作用主要用來(lái)減小計(jì)數(shù)器的值,當(dāng)計(jì)數(shù)器變?yōu)?0 時(shí),在 CountDownLatch 上 await 的線(xiàn)程就會(huì)被喚醒,繼續(xù)執(zhí)行其他任務(wù)。當(dāng)然也可以延遲喚醒,給 CountDownLatch 加一個(gè)延遲時(shí)間就可以實(shí)現(xiàn)。
其主要方法如下
CountDownLatch 主要有下面這幾個(gè)應(yīng)用場(chǎng)景
CountDownLatch 應(yīng)用場(chǎng)景
典型的應(yīng)用場(chǎng)景就是當(dāng)一個(gè)服務(wù)啟動(dòng)時(shí),同時(shí)會(huì)加載很多組件和服務(wù),這時(shí)候主線(xiàn)程會(huì)等待組件和服務(wù)的加載。當(dāng)所有的組件和服務(wù)都加載完畢后,主線(xiàn)程和其他線(xiàn)程在一起完成某個(gè)任務(wù)。
CountDownLatch 還可以實(shí)現(xiàn)學(xué)生一起比賽跑步的程序,CountDownLatch 初始化為學(xué)生數(shù)量的線(xiàn)程,鳴槍后,每個(gè)學(xué)生就是一條線(xiàn)程,來(lái)完成各自的任務(wù),當(dāng)?shù)谝粋€(gè)學(xué)生跑完全程后,CountDownLatch 就會(huì)減一,直到所有的學(xué)生完成后,CountDownLatch 會(huì)變?yōu)?0 ,接下來(lái)再一起宣布跑步成績(jī)。
順著這個(gè)場(chǎng)景,你自己就可以延伸、拓展出來(lái)很多其他任務(wù)場(chǎng)景。
CountDownLatch 用法
下面我們通過(guò)一個(gè)簡(jiǎn)單的計(jì)數(shù)器來(lái)演示一下 CountDownLatch 的用法
- public class TCountDownLatch {
- public static void main(String[] args) {
- CountDownLatch latch = new CountDownLatch(5);
- Increment increment = new Increment(latch);
- Decrement decrement = new Decrement(latch);
- new Thread(increment).start();
- new Thread(decrement).start();
- try {
- Thread.sleep(6000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- class Decrement implements Runnable {
- CountDownLatch countDownLatch;
- public Decrement(CountDownLatch countDownLatch){
- this.countDownLatch = countDownLatch;
- }
- @Override
- public void run() {
- try {
- for(long i = countDownLatch.getCount();i > 0;i--){
- Thread.sleep(1000);
- System.out.println("countdown");
- this.countDownLatch.countDown();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- class Increment implements Runnable {
- CountDownLatch countDownLatch;
- public Increment(CountDownLatch countDownLatch){
- this.countDownLatch = countDownLatch;
- }
- @Override
- public void run() {
- try {
- System.out.println("await");
- countDownLatch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("Waiter Released");
- }
- }
在 main 方法中我們初始化了一個(gè)計(jì)數(shù)器為 5 的 CountDownLatch,在 Decrement 方法中我們使用 countDown 執(zhí)行減一操作,然后睡眠一段時(shí)間,同時(shí)在 Increment 類(lèi)中進(jìn)行等待,直到 Decrement 中的線(xiàn)程完成計(jì)數(shù)減一的操作后,喚醒 Increment 類(lèi)中的 run 方法,使其繼續(xù)執(zhí)行。
下面我們?cè)賮?lái)通過(guò)學(xué)生賽跑這個(gè)例子來(lái)演示一下 CountDownLatch 的具體用法
- public class StudentRunRace {
- CountDownLatch stopLatch = new CountDownLatch(1);
- CountDownLatch runLatch = new CountDownLatch(10);
- public void waitSignal() throws Exception{
- System.out.println("選手" + Thread.currentThread().getName() + "正在等待裁判發(fā)布口令");
- stopLatch.await();
- System.out.println("選手" + Thread.currentThread().getName() + "已接受裁判口令");
- Thread.sleep((long) (Math.random() * 10000));
- System.out.println("選手" + Thread.currentThread().getName() + "到達(dá)終點(diǎn)");
- runLatch.countDown();
- }
- public void waitStop() throws Exception{
- Thread.sleep((long) (Math.random() * 10000));
- System.out.println("裁判"+Thread.currentThread().getName()+"即將發(fā)布口令");
- stopLatch.countDown();
- System.out.println("裁判"+Thread.currentThread().getName()+"已發(fā)送口令,正在等待所有選手到達(dá)終點(diǎn)");
- runLatch.await();
- System.out.println("所有選手都到達(dá)終點(diǎn)");
- System.out.println("裁判"+Thread.currentThread().getName()+"匯總成績(jī)排名");
- }
- public static void main(String[] args) {
- ExecutorService service = Executors.newCachedThreadPool();
- StudentRunRace studentRunRace = new StudentRunRace();
- for (int i = 0; i < 10; i++) {
- Runnable runnable = () -> {
- try {
- studentRunRace.waitSignal();
- } catch (Exception e) {
- e.printStackTrace();
- }
- };
- service.execute(runnable);
- }
- try {
- studentRunRace.waitStop();
- } catch (Exception e) {
- e.printStackTrace();
- }
- service.shutdown();
- }
- }
下面我們就來(lái)一起分析一下 CountDownLatch 的源碼
CountDownLatch 源碼分析
CountDownLatch 使用起來(lái)比較簡(jiǎn)單,但是卻非常有用,現(xiàn)在你可以在你的工具箱中加上 CountDownLatch 這個(gè)工具類(lèi)了。下面我們就來(lái)深入認(rèn)識(shí)一下 CountDownLatch。
CountDownLatch 的底層是由 AbstractQueuedSynchronizer 支持,而 AQS 的數(shù)據(jù)結(jié)構(gòu)的核心就是兩個(gè)隊(duì)列,一個(gè)是 同步隊(duì)列(sync queue),一個(gè)是條件隊(duì)列(condition queue)。
Sync 內(nèi)部類(lèi)
CountDownLatch 在其內(nèi)部是一個(gè) Sync ,它繼承了 AQS 抽象類(lèi)。
- private static final class Sync extends AbstractQueuedSynchronizer {...}
CountDownLatch 其實(shí)其內(nèi)部只有一個(gè) sync 屬性,并且是 final 的
- private final Sync sync;
CountDownLatch 只有一個(gè)帶參數(shù)的構(gòu)造方法
- public CountDownLatch(int count) {
- if (count < 0) throw new IllegalArgumentException("count < 0");
- this.sync = new Sync(count);
- }
也就是說(shuō),初始化的時(shí)候必須指定計(jì)數(shù)器的數(shù)量,如果數(shù)量為負(fù)會(huì)直接拋出異常。
然后把 count 初始化為 Sync 內(nèi)部的 count,也就是
- Sync(int count) {
- setState(count);
- }
注意這里有一個(gè) setState(count),這是什么意思呢?見(jiàn)聞知意這只是一個(gè)設(shè)置狀態(tài)的操作,但是實(shí)際上不單單是,還有一層意思是 state 的值代表著待達(dá)到條件的線(xiàn)程數(shù)。這個(gè)我們?cè)诹?countDown 方法的時(shí)候再討論。
getCount() 方法的返回值是 getState() 方法,它是 AbstractQueuedSynchronizer 中的方法,這個(gè)方法會(huì)返回當(dāng)前線(xiàn)程計(jì)數(shù),具有 volatile 讀取的內(nèi)存語(yǔ)義。
- // ---- CountDownLatch ----
- int getCount() {
- return getState();
- }
- // ---- AbstractQueuedSynchronizer ----
- protected final int getState() {
- return state;
- }
tryAcquireShared() 方法用于獲取·共享狀態(tài)下對(duì)象的狀態(tài),判斷對(duì)象是否為 0 ,如果為 0 返回 1 ,表示能夠嘗試獲取,如果不為 0,那么返回 -1,表示無(wú)法獲取。
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- }
- // ---- getState() 方法和上面的方法相同 ----
這個(gè) 共享狀態(tài) 屬于 AQS 中的概念,在 AQS 中分為兩種模式,一種是 獨(dú)占模式,一種是 共享模式。
- tryAcquire 獨(dú)占模式,嘗試獲取資源,成功則返回 true,失敗則返回 false。
- tryAcquireShared 共享方式,嘗試獲取資源。負(fù)數(shù)表示失敗;0 表示成功,但沒(méi)有剩余可用資源;正數(shù)表示成功,且有剩余資源。
tryReleaseShared() 方法用于共享模式下的釋放
- protected boolean tryReleaseShared(int releases) {
- // 減小數(shù)量,變?yōu)?nbsp;0 的時(shí)候進(jìn)行通知。
- for (;;) {
- int c = getState();
- if (c == 0)
- return false;
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
這個(gè)方法是一個(gè)無(wú)限循環(huán),獲取線(xiàn)程狀態(tài),如果線(xiàn)程狀態(tài)是 0 則表示沒(méi)有被線(xiàn)程占有,沒(méi)有占有的話(huà)那么直接返回 false ,表示已經(jīng)釋放;然后下一個(gè)狀態(tài)進(jìn)行 - 1 ,使用 compareAndSetState CAS 方法進(jìn)行和內(nèi)存值的比較,如果內(nèi)存值也是 1 的話(huà),就會(huì)更新內(nèi)存值為 0 ,判斷 nextc 是否為 0 ,如果 CAS 比較不成功的話(huà),會(huì)再次進(jìn)行循環(huán)判斷。
await 方法
await() 方法是 CountDownLatch 一個(gè)非常重要的方法,基本上可以說(shuō)只有 countDown 和 await 方法才是 CountDownLatch 的精髓所在,這個(gè)方法將會(huì)使當(dāng)前線(xiàn)程在 CountDownLatch 計(jì)數(shù)減至零之前一直等待,除非線(xiàn)程被中斷。
CountDownLatch 中的 await 方法有兩種,一種是不帶任何參數(shù)的 await(),一種是可以等待一段時(shí)間的await(long timeout, TimeUnit unit)。下面我們先來(lái)看一下 await() 方法。
- public void await() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
await 方法內(nèi)部會(huì)調(diào)用 acquireSharedInterruptibly 方法,這個(gè) acquireSharedInterruptibly 是 AQS 中的方法,以共享模式進(jìn)行中斷。
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
可以看到,acquireSharedInterruptibly 方法的內(nèi)部會(huì)首先判斷線(xiàn)程是否中斷,如果線(xiàn)程中斷,則直接拋出線(xiàn)程中斷異常。如果沒(méi)有中斷,那么會(huì)以共享的方式獲取。如果能夠在共享的方式下不能獲取鎖,那么就會(huì)以共享的方式斷開(kāi)鏈接。
- private void doAcquireSharedInterruptibly(int arg)
- throws InterruptedException {
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- int r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- failed = false;
- return;
- }
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
這個(gè)方法有些長(zhǎng),我們分開(kāi)來(lái)看
- 首先,會(huì)先構(gòu)造一個(gè)共享模式的 Node 入隊(duì)
- 然后使用無(wú)限循環(huán)判斷新構(gòu)造 node 的前驅(qū)節(jié)點(diǎn),如果 node 節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),那么就會(huì)判斷線(xiàn)程的狀態(tài),這里調(diào)用了一個(gè) setHeadAndPropagate ,其源碼如下
- private void setHeadAndPropagate(Node node, int propagate) {
- Node h = head;
- setHead(node);
- if (propagate > 0 || h == null || h.waitStatus < 0 ||
- (h = head) == null || h.waitStatus < 0) {
- Node s = node.next;
- if (s == null || s.isShared())
- doReleaseShared();
- }
- }
首先會(huì)設(shè)置頭節(jié)點(diǎn),然后進(jìn)行一系列的判斷,獲取節(jié)點(diǎn)的獲取節(jié)點(diǎn)的后繼,以共享模式進(jìn)行釋放,就會(huì)調(diào)用 doReleaseShared 方法,我們?cè)賮?lái)看一下 doReleaseShared 方法
- private void doReleaseShared() {
- for (;;) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- if (ws == Node.SIGNAL) {
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue; // loop to recheck cases
- unparkSuccessor(h);
- }
- else if (ws == 0 &&
- !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue; // loop on failed CAS
- }
- if (h == head) // loop if head changed
- break;
- }
- }
這個(gè)方法會(huì)以無(wú)限循環(huán)的方式首先判斷頭節(jié)點(diǎn)是否等于尾節(jié)點(diǎn),如果頭節(jié)點(diǎn)等于尾節(jié)點(diǎn)的話(huà),就會(huì)直接退出。如果頭節(jié)點(diǎn)不等于尾節(jié)點(diǎn),會(huì)判斷狀態(tài)是否為 SIGNAL,不是的話(huà)就繼續(xù)循環(huán) compareAndSetWaitStatus,然后斷開(kāi)后繼節(jié)點(diǎn)。如果狀態(tài)不是 SIGNAL,也會(huì)調(diào)用 compareAndSetWaitStatus 設(shè)置狀態(tài)為 PROPAGATE,狀態(tài)為 0 并且不成功,就會(huì)繼續(xù)循環(huán)。
也就是說(shuō) setHeadAndPropagate 就是設(shè)置頭節(jié)點(diǎn)并且釋放后繼節(jié)點(diǎn)的一系列過(guò)程。
- 我們來(lái)看下面的 if 判斷,也就是 shouldParkAfterFailedAcquire(p, node) 這里
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
如果上面 Node p = node.predecessor() 獲取前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn),就會(huì)進(jìn)行 park 斷開(kāi)操作,判斷此時(shí)是否能夠斷開(kāi),判斷的標(biāo)準(zhǔn)如下
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- return true;
- if (ws > 0) {
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
這個(gè)方法會(huì)判斷 Node p 的前驅(qū)節(jié)點(diǎn)的結(jié)點(diǎn)狀態(tài)(waitStatus),節(jié)點(diǎn)狀態(tài)一共有五種,分別是
- CANCELLED(1):表示當(dāng)前結(jié)點(diǎn)已取消調(diào)度。當(dāng)超時(shí)或被中斷(響應(yīng)中斷的情況下),會(huì)觸發(fā)變更為此狀態(tài),進(jìn)入該狀態(tài)后的結(jié)點(diǎn)將不會(huì)再變化。
- SIGNAL(-1):表示后繼結(jié)點(diǎn)在等待當(dāng)前結(jié)點(diǎn)喚醒。后繼結(jié)點(diǎn)入隊(duì)時(shí),會(huì)將前繼結(jié)點(diǎn)的狀態(tài)更新為 SIGNAL。
- CONDITION(-2):表示結(jié)點(diǎn)等待在 Condition 上,當(dāng)其他線(xiàn)程調(diào)用了 Condition 的 signal() 方法后,CONDITION狀態(tài)的結(jié)點(diǎn)將從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中,等待獲取同步鎖。
- PROPAGATE(-3):共享模式下,前繼結(jié)點(diǎn)不僅會(huì)喚醒其后繼結(jié)點(diǎn),同時(shí)也可能會(huì)喚醒后繼的后繼結(jié)點(diǎn)。
- 0:新結(jié)點(diǎn)入隊(duì)時(shí)的默認(rèn)狀態(tài)。
如果前驅(qū)節(jié)點(diǎn)是 SIGNAL 就會(huì)返回 true 表示可以斷開(kāi),如果前驅(qū)節(jié)點(diǎn)的狀態(tài)大于 0 (此時(shí)為什么不用 ws == Node.CANCELLED ) 呢?因?yàn)?ws 大于 0 的條件只有 CANCELLED 狀態(tài)了。然后就是一系列的查找遍歷操作直到前驅(qū)節(jié)點(diǎn)的 waitStatus > 0。如果 ws <= 0 ,而且還不是 SIGNAL 狀態(tài)的話(huà),就會(huì)使用 CAS 替換前驅(qū)節(jié)點(diǎn)的 ws 為 SIGNAL 狀態(tài)。
如果檢查判斷是中斷狀態(tài)的話(huà),就會(huì)返回 false。
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
這個(gè)方法使用 LockSupport.park 斷開(kāi)連接,然后返回線(xiàn)程是否中斷的標(biāo)志。
- cancelAcquire() 用于取消等待隊(duì)列,如果等待過(guò)程中沒(méi)有成功獲取資源(如timeout,或者可中斷的情況下被中斷了),那么取消結(jié)點(diǎn)在隊(duì)列中的等待。
- private void cancelAcquire(Node node) {
- if (node == null)
- return;
- node.thread = null;
- Node pred = node.prev;
- while (pred.waitStatus > 0)
- node.prev = pred = pred.prev;
- Node predNext = pred.next;
- node.waitStatus = Node.CANCELLED;
- if (node == tail && compareAndSetTail(node, pred)) {
- compareAndSetNext(pred, predNext, null);
- } else {
- int ws;
- if (pred != head &&
- ((ws = pred.waitStatus) == Node.SIGNAL ||
- (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
- pred.thread != null) {
- Node next = node.next;
- if (next != null && next.waitStatus <= 0)
- compareAndSetNext(pred, predNext, next);
- } else {
- unparkSuccessor(node);
- }
- node.next = node; // help GC
- }
- }
所以,對(duì) CountDownLatch 的 await 調(diào)用大致會(huì)有如下的調(diào)用過(guò)程。
一個(gè)和 await 重載的方法是 await(long timeout, TimeUnit unit),這個(gè)方法和 await 最主要的區(qū)別就是這個(gè)方法能夠可以等待計(jì)數(shù)器一段時(shí)間再執(zhí)行后續(xù)操作。
countDown 方法
countDown 是和 await 同等重要的方法,countDown 用于減少計(jì)數(shù)器的數(shù)量,如果計(jì)數(shù)減為 0 的話(huà),就會(huì)釋放所有的線(xiàn)程。
- public void countDown() {
- sync.releaseShared(1);
- }
這個(gè)方法會(huì)調(diào)用 releaseShared 方法,此方法用于共享模式下的釋放操作,首先會(huì)判斷是否能夠進(jìn)行釋放,判斷的方法就是 CountDownLatch 內(nèi)部類(lèi) Sync 的 tryReleaseShared 方法
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
- // ---- CountDownLatch ----
- protected boolean tryReleaseShared(int releases) {
- for (;;) {
- int c = getState();
- if (c == 0)
- return false;
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
tryReleaseShared 會(huì)進(jìn)行 for 循環(huán)判斷線(xiàn)程狀態(tài)值,使用 CAS 不斷嘗試進(jìn)行替換。
如果能夠釋放,就會(huì)調(diào)用 doReleaseShared 方法
- private void doReleaseShared() {
- for (;;) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- if (ws == Node.SIGNAL) {
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue; // loop to recheck cases
- unparkSuccessor(h);
- }
- else if (ws == 0 &&
- !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue; // loop on failed CAS
- }
- if (h == head) // loop if head changed
- break;
- }
- }
可以看到,doReleaseShared 其實(shí)也是一個(gè)無(wú)限循環(huán)不斷使用 CAS 嘗試替換的操作。
總結(jié)
本文是 CountDownLatch 的基本使用和源碼分析,CountDownLatch 就是一個(gè)基于 AQS 的計(jì)數(shù)器,它內(nèi)部的方法都是圍繞 AQS 框架來(lái)談的,除此之外還有其他比如 ReentrantLock、Semaphore 等都是 AQS 的實(shí)現(xiàn),所以要研究并發(fā)的話(huà),離不開(kāi)對(duì) AQS 的探討。CountDownLatch 的源碼看起來(lái)很少,比較簡(jiǎn)單,但是其內(nèi)部比如 await 方法的調(diào)用鏈路卻很長(zhǎng),也值得花費(fèi)時(shí)間深入研究。
本文轉(zhuǎn)載自微信公眾號(hào)「 Java建設(shè)者」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系 Java建設(shè)者公眾號(hào)。