為了帶你精通 Java AQS,我畫了 40 張圖,從管程模型講起!
大家好,我是君哥。
Java中 AQS 是 AbstractQueuedSynchronizer 類,AQS 依賴 FIFO 隊列來提供一個框架,這個框架用于實現鎖以及鎖相關的同步器,比如信號量、事件等。
在 AQS 中,主要有兩部分功能,一部分是操作 state 變量,第二部分是實現排隊和阻塞機制。
注意,AQS 并沒有實現任何同步接口,它只是提供了類似 acquireInterruptible 的方法,調用這些方法可以實現鎖和同步器。
1、管程模型
Java 使用 MESA 管程模型來管理類的成員變量和方法,讓這個類的成員變量和方法的操作是線程安全的。下圖是 MESA 管程模型,里面除了定義共享變量外,還定義了條件變量和條件變量等待隊列:
上圖中有三個知識點:
- MESA 管程模型封裝了共享變量和對共享變量的操作,線程要進入管程內部,必須獲取到鎖,如果獲取鎖失敗就進入入口等待隊列阻塞等待。
- 如果線程獲取到鎖,就進入到管程內部。但是進入到管程內部,也不一定能立刻操作共享變量,而是要看條件變量是否滿足,如果不滿足,只能進入條件變量等待隊列阻塞等待。
- 在條件變量等待隊列中,如果被其他線程喚醒,也不一定能立刻操作共享變量,而是需要去入口等待隊列重新排隊等待獲取鎖。
Java 中的 MESA 管程模型有一點改進,就是管程內部只有一個條件變量和一個等待隊列。下圖是 AQS 的管程模型:
AQS 的管程模型依賴 AQS 中的 FIFO 隊列實現入口等待隊列,要進入管程內部,就由各種并發鎖的限制。而 ConditionObject 則實現了條件隊列,這個隊列可以創建多個。
下面就從入口等待隊列、并發鎖、條件等待隊列三個方面來帶你徹底理解 AQS。
2、入口等待隊列
2.1 獲取獨占鎖
獨占, 忽略 interrupts
這里的 tryAcquire 是抽象方法,由 AQS 的子類來實現,因為每個子類實現的鎖是不一樣的。
2.1.1 入隊
上面的代碼可以看到,獲取鎖失敗后,會先執行 addWaiter 方法加入隊列,然后執行 acquireQueued 方法自旋地獲取鎖直到成功。
addWaiter 代碼邏輯如下圖,簡單說就是把 node 入隊,入隊后返回 node 參數給 acquireQueued 方法:
這里有一個點需要注意,如果隊列為空,則新建一個 Node 作為隊頭。
2.1.2 入隊后獲取鎖
acquireQueued 自旋獲取鎖邏輯如下圖:
這里有幾個細節:
(1)waitStatus
- CANCELLED(1):當前節點取消獲取鎖。當等待超時或被中斷(響應中斷),會觸發變更為此狀態,進入該狀態后節點狀態不再變化;
- SIGNAL(-1):后面節點等待當前節點喚醒;
- CONDITION(-2):Condition 中使用,當前線程阻塞在 Condition,如果其他線程調用了 Condition 的 signal 方法,這個結點將從等待隊列轉移到同步隊列隊尾,等待獲取同步鎖;
- PROPAGATE(-3):共享模式,前置節點喚醒后面節點后,喚醒操作無條件傳播下去;
- 0:中間狀態,當前節點后面的節點已經喚醒,但是當前節點線程還沒有執行完成。
(2)獲取鎖失敗后掛起
如果前置節點不是頭節點,或者前置節點是頭節點但當前節點獲取鎖失敗,這時當前節點需要掛起,分三種情況:
- 前置節點 waitStatus=-1,如下圖:
- 前置節點 waitStatus > 0,如下圖:
- 前置節點 waitStatus < 0 但不等于 -1,如下圖:
(3)取消獲取鎖
如果獲取鎖拋出異常,則取消獲取鎖,如果當前節點是 tail 節點,分兩種情況如下圖:
如果當前節點不是 tail 節點,也分兩種情況,如下圖:
4.對中斷狀態忽略
5.如果前置節點的狀態是 0 或 PROPAGATE,會被當前節點自旋過程中更新成 -1,以便之后通知當前節點。
2.1.3 獨占 + 響應中斷
對應方法 acquireInterruptibly(int arg)。
跟忽略中斷(acquire方法)不同的是要響應中斷,下面兩個地方響應中斷:
- 獲取鎖之前會檢查當前線程是否中斷。
- 獲取鎖失敗入隊,在隊列中自旋獲取鎖的過程中也會檢查當前線程是否中斷。如果檢查到當前線程已經中斷,則拋出 InterruptedException,當前線程退出。
2.1.4 獨占 + 響應中斷 + 考慮超時
對應方法 tryAcquireNanos(int arg, long nanosTimeout)。
這個方法具備了獨占 + 響應中斷 + 超時的功能,下面2個地方要判斷是否超時:
- 自旋獲取鎖的過程中每次獲取鎖失敗都要判斷是否超時;
- 獲取鎖失敗 park 之前要判斷超時時間是否大于自旋的閾值時間 (spinForTimeoutThreshold = 1ns) 另外,park 線程的操作使用 parkNanos 傳入阻塞時間。
2.2 釋放獨占鎖
獨占鎖釋放分兩步:釋放鎖,喚醒后繼節點。
釋放鎖的方法 tryRelease 是抽象的,由子類去實現。
我們看一下喚醒后繼節點的邏輯,首先需要滿足兩個條件:
- head 節點不等于 null;
- head 節點 waitStatus 不等于 0。這里有兩種情況(在方法 unparkSuccessor):
- 情況一,后繼節點 waitStatus <= 0,直接喚醒后繼節點,如下圖:
- 情況二:后繼節點為空或者 waitStatus > 0,從后往前查找最接近當前節點的節點進行喚醒,如下圖:
2.3 獲取共享鎖
之前我們講了獨占鎖,這一小節我們談共享鎖,有什么不同呢?
2.3.1 共享,忽略 interrupts
對應方法 acquireShared,代碼如下:
2.3.2 tryAcquireShared
這里獲取鎖使用的方法是 tryAcquireShared,獲取的是共享鎖。獲取共享鎖跟獲取獨占鎖不同的是,會返回一個整數值,說明如下:
- 返回負數:獲取鎖失敗。
- 返回 0:獲取鎖成功但是之后再由線程來獲取共享鎖時就會失敗。
- 返回正數:獲取鎖成功而且之后再有線程來獲取共享鎖時也可能會成功。所以需要把喚醒操作傳播下去。tryAcquireShared 獲取鎖失敗后(返回負數),就需要入隊后自旋獲取,也就是執行方法 doAcquireShared。
2.3.3 doAcquireShared
怎么判斷隊列中等待節點是在等待共享鎖呢?nextWaiter == SHARED,這個參數值是入隊新建節點的時候構造函數傳入的。
自旋過程中,如果獲取鎖成功(返回正數),首先把自己設置成新的 head 節點,然后把通知傳播下去。如下圖:
之后會喚醒后面節點并保證喚醒操作可以傳播下去。但是需要滿足四個條件中的一個:
- tryAcquireShared 返回值大于0,有多余的鎖,可以繼續喚醒后繼節點。
- 舊的 head 節點 waitStatus < 0,應該是其他線程釋放共享鎖過程中把它的狀態更新成了 -3。
- 新的 hade 節點 waitStatus < 0,只要不是 tail 節點,就可能是 -1。這里會造成不必要的喚醒,因為喚醒后獲取不到鎖只能繼續入隊等待。
- 當前節點的后繼節點是空或者非空但正在等待共享鎖。
喚醒后面節點的操作,其實就是釋放共享鎖,對應方法是 doReleaseShared,見釋放共享鎖一節。
2.3.4 共享 + 響應中斷
對應方法 acquireSharedInterruptibly(int arg)。
跟共享忽略中斷(acquireShared 方法)不同的是要響應中斷,下面兩個地方響應中斷:
- 獲取鎖之前會檢查當前線程是否中斷。
- 獲取鎖失敗入隊,在隊列中自旋獲取鎖的過程中也會檢查當前線程是否中斷。
如果檢查到當前線程已經中斷,則拋出 InterruptedException,當前線程退出。
2.3.5 共享 + 響應中斷 + 考慮超時
對應方法 tryAcquireSharedNanos(int arg, long nanosTimeout)。
這個方法具備了共享 + 響應中斷 + 超時的功能,下面兩個個地方要判斷是否超時:
- 自旋獲取鎖的過程中每次獲取鎖失敗都要判斷是否超時。
- 獲取鎖失敗 park 之前要判斷超時時間是否大于自旋的閾值時間(spinForTimeoutThreshold = 1ns)。
另外,park 線程的操作使用 parkNanos 傳入阻塞時間。
2.4 釋放共享鎖
釋放共享鎖代碼如下:
首先嘗試釋放共享鎖,tryReleaseShared 代碼由子類來實現。釋放成功后執行AQS中的 doReleaseShared 方法,是一個自旋操作。
自旋的條件是隊列中至少有兩個節點,這里分三種情況。
情況一:當前節點 waitStatus 是 -1,如下圖:
情況二:當前節點 waitStatus 是 0(被其他線程更新新成了中間狀態),如下圖:
情況三:當前節點 waitStatus 是 -3,為什么會這樣呢?需要解釋一下,head節點喚醒后繼節點之前 waitStatus 已經被更新中間態 0 了,喚醒后繼節點動作還沒有執行,又被其他線程更成了 -3,也就是其他線程釋放鎖執行了上面情況二。這時需要先把 waitStatus 再更成 0 (在方法 unparkSuccessor),如下圖:
2.5 抽象方法
上面的講解可以看出,如果要基于 AQS 來實現并發鎖,可以根據需求重寫下面四個方法來實現,這四個方法在 AQS 中沒有具體實現:
- tryAcquire(int arg):獲取獨占鎖
- tryRelease(int arg):釋放獨占鎖
- tryAcquireShared(int arg):獲取共享鎖
- tryReleaseShared(int arg):釋放共享鎖
AQS 的子類需要重寫上面的方法來修改 state 值,并且定義獲取鎖或者釋放鎖時 state 值的變化。子類也可以定義自己的 state 變量,但是只有更新 AQS 中的 state變量才會對同步起作用。
還有一個判斷當前線程是否持有獨占鎖的方法 isHeldExclusively,也可以供子類重寫后使用。
獲取/釋放鎖的具體實現放到下篇文章講解。
2.6 總結
AQS 使用 FIFO 隊列實現了一個鎖相關的并發器模板,可以基于這個模板來實現各種鎖,包括獨占鎖、共享鎖、信號量等。
AQS 中,有一個核心狀態是 waitStatus,這個代表節點的狀態,決定了當前節點的后續操作,比如是否等待喚醒,是否要喚醒后繼節點。
3 并發鎖
這一章節講解 Java AQS 中的并發鎖。其實 Java AQS 中的并發鎖主要是基于 state 這個變量值來實現的。
3.1 ReentrantLock
我們先來看一下 UML 類圖:
從圖中可以看到,ReentrantLock 使用抽象內部類 Sync 來實現了 AQS 的方法,然后基于 Sync 這個同步器實現了公平鎖和非公平鎖。主要實現了下面 3 個方法:
- tryAcquire(int arg):獲取獨占鎖
- tryRelease(int arg):釋放獨占鎖
- isHeldExclusively:當前線程是否占有獨占鎖。ReentrantLock 默認實現的是非公平鎖,可以在構造函數指定。
從實現的方法可以看到,ReentrantLock 中獲取的鎖是獨占鎖,我們再來看一下獲取和釋放獨占鎖的代碼:
獨占鎖的特點是調用上面 acquire 方法,傳入的參數是 1。
3.1.1 獲取公平鎖
獲取鎖首先判斷同步狀態(state)的值。
3.1.1.1 state 等于 0
這說明沒有線程占用鎖,當前線程如果符合下面兩個條件,就可以獲取到鎖:
沒有前任節點,如下圖:
CAS 的方式更新 state 值(把 0 更新成 1)成功。如果獲取獨占鎖成功,會更新 AQS 中 exclusiveOwnerThread 為當前線程,這個很容易理解。
3.1.1.2 state 不等于 0
這說明已經有線程占有鎖,判斷占有鎖的線程是不是當前線程,如下圖:
state += 1 值如果小于 0,會拋出異常。
如果獲取鎖失敗,則進入 AQS 隊列等待喚醒。
3.1.2 獲取非公平鎖
跟公平鎖相比,非公平鎖的唯一不同是如果判斷到 state 等于 0,不用判斷有沒有前任節點,只要 CAS 設置 state 值(把 0 更新成 1)成功,就獲取到了鎖。
3.1.3 釋放鎖
公平鎖和非公平鎖,釋放邏輯完全一樣,都是在內部類 Sync 中實現的。釋放鎖需要注意兩點,如下圖:
為什么 state 會大于 1,因為是可以重入的,占有鎖的線程可以多次獲取鎖。
3.1.4 總結
公平鎖的特點是每個線程都要進行排隊,不用擔心線程永遠獲取不到鎖,但有個缺點是每個線程入隊后都需要阻塞和被喚醒,這一定程度上影響了效率。非公平鎖的特點是每個線程入隊前都會先嘗試獲取鎖,如果獲取成功就不會入隊了,這比公平鎖效率高。但也有一個缺點,隊列中的線程有可能等待很長時間,高并發下甚至可能永遠獲取不到鎖。
3.2 ReentrantReadWriteLock
我們先來看一下 UML 類圖:
從圖中可以看到,ReentrantReadWriteLock 使用抽象內部類Sync來實現了 AQS 的方法,然后基于 Sync 這個同步器實現了公平鎖和非公平鎖。主要實現了下面 3 個方法:
- tryAcquire(int arg):獲取獨占鎖
- tryRelease(int arg):釋放獨占鎖
- tryAcquireShared(int arg):獲取共享鎖
- tryReleaseShared(int arg):釋放共享鎖
- isHeldExclusively:當前線程是否占有獨占鎖 可見ReentrantReadWriteLock里面同時用到了共享鎖和獨占鎖。
下圖是定義的幾個常用變量:
下面這 2 個方法用戶獲取共享鎖和獨占鎖的數量:
從sharedCount 可以看到,共享鎖的數量要右移 16 位獲取,也就是說共享鎖占了高 16 位。從上圖 EXCLUSIVE_MASK 的定義看到,跟 EXCLUSIVE_MASK 進行與運算,得到的是低 16 位的值,所以獨占鎖占了低 16 位。如下圖:
這樣上面獲取鎖數量的方法就很好理解了。
3.2.1 讀鎖
讀鎖的實現對應內部類 ReadLock。
3.2.1.1 獲取讀鎖
獲取讀鎖實際上是 ReadLock 調用了 AQS 的下面方法,傳入參數是 1:
ReentrantReadWriteLock 內部類 Sync 實現了 tryAcquireShared 方法,主要包括如下三種情況:
- 使用 exclusiveCount 方法查看 state 中是否有獨占鎖,如果有并且獨占線程不是當前線程,返回 -1,獲取失敗;
- 使用 sharedCount 查看 state 中共享鎖數量,如果讀鎖數量小于最大值(MAX_COUNT=65535),則再滿足下面 3 個條件就可以獲取成功并返回 1:
a.當前線程不需要阻塞(readerShouldBlock)。在公平鎖中,需要判斷是否有前置節點,如下圖就需要阻塞:
在非公平鎖中,則是判斷第一個節點是不是有獨占鎖,如下圖就需要阻塞:
b.使用 CAS 把 state 的值加 SHARED_UNIT(65536)。這里是不是就更理解讀鎖占高位的說法了,獲取一個讀鎖,state 的值就要加 SHARED_UNIT 這么多個。
c.給當前線程的 holdCount 加 1。
- 如果 2 失敗,自旋,重復上面的步驟直到獲取到鎖。tryAcquireShared (獲取共享鎖)會返回一個整數,如下:
- 返回負數:獲取鎖失敗。
- 返回 0:獲取鎖成功但是之后再由線程來獲取共享鎖時就會失敗。
- 返回正數:獲取鎖成功而且之后再有線程來獲取共享鎖時也可能會成功。
3.2.1.2 釋放讀鎖
ReentrantReadWriteLock 釋放讀鎖是在 ReadLock 中調用了 AQS 下面方法,傳入的參數是1:
ReentrantReadWriteLock 內部類 Sync 實現了 releaseShared 方法,具體邏輯分為下面兩步:
- 當前線程 holdCounter 值減 1。
- CAS的方式將 state 的值減去 SHARED_UNIT。
3.2.2 寫鎖
寫鎖的實現對應內部類 WriteLock。
3.2.2.1 獲取寫鎖
ReentrantReadWriteLock 獲取寫鎖其實是在 WriteLock 中調用了 AQS 的下面方法,傳入參數 1:
在ReentrantReadWriteLock 內部類 Sync 實現了 tryAcquire 方法,首先獲取 state 值和獨占鎖數量(exclusiveCount),之后分如下兩種情況,如下圖:
1.state 不等于 0:
- 獨占鎖數量等于 0,這時說明有線程占用了共享鎖,如果當前線程不是獨占線程,獲取鎖失敗。
- 獨占鎖數量不等于 0,獨占鎖數量加 1 后大于 MAX_COUNT,獲取鎖失敗。
- 上面 2 種情況不符合,獲取鎖成功,state 值加 1。2.state 等于 0,判斷當前線程是否需要阻塞(writerShouldBlock)。在公平鎖中,跟 readerShouldBlock 的邏輯完全一樣,就是判斷隊列中 head 節點的后繼節點是不是當前線程。在非公平鎖中,直接返回 false,即可以直接嘗試獲取鎖。
如果當前線程不需要阻塞,并且給 state 賦值成功,使用 CAS 方式把 state 值加 1,把獨占線程置為當前線程。
3.2.2.2 釋放寫鎖
ReentrantReadWriteLock 釋放寫鎖其實是在 WriteLock 中調用了 AQS 的下面方法,傳入參數 1:
ReentrantReadWriteLock 在 Sync 中實現了 tryRelease(arg) 方法,邏輯如下:
- 判斷當前線程是不是獨占線程,如果不是,拋出異常。
- state值減1后,用新state值判斷獨占鎖數量是否等于0
- 如果等于0,則把獨占線程置為空,返回true,這樣上面的代碼就可以喚醒隊列中的后置節點了
- 如果不等于0,返回false,不喚醒后繼節點。
3.3 CountDownLatch
我們先來看一下UML類圖:
從上面的圖中看出,CountDownLatch 的內部類 Sync 實現了獲取共享鎖和釋放共享鎖的邏輯。
使用 CountDownLatch 時,構造函數會傳入一個 int 類型的參數 count,表示調動 count 次的 countDown 后主線程才可以被喚醒。
上面的 Sync(count) 就是將 AQS 中的 state 賦值為 count。
3.3.1 await
CountDownLatch 的 await 方法調用了 AQS 中的 acquireSharedInterruptibly(int arg),傳入參數 1,不過這個參數并沒有用。代碼如下:
Sync 中實現了 tryAcquireShared 方法,await 邏輯如下圖:
上面的自旋過程就是等待 state 的值不斷減小,只有 state 值成為 0 的時候,主線程才會跳出自旋執行之后的邏輯。
3.3.2 countDown
CountDownLatch 的 countDown 方法調用了 AQS 的 releaseShared(int arg),傳入參數 1,不過這個參數并沒有用。內部類 Sync 實現了 tryReleaseShared 方法,邏輯如下圖:
3.3.3 總結
CountDownLatch 的構造函數入參值會賦值給 state 變量,入隊操作是主線程入隊,每個子線程調用了countDown 后 state 值減 1,當 state 值成為 0 后喚醒主線程。
3.4 Semaphore
Semaphore 是一個信號量,用來保護共享資源。如果線程要訪問共享資源,首先從 Semaphore 獲取鎖(信號量),如果信號量的計數器等于 0,則當前線程進入 AQS 隊列阻塞等待。否則,線程獲取鎖成功,信號量減 1。使用完共享資源后,釋放鎖(信號量加 1)。
Semaphore 跟管程模型不一樣的是,允許多個(構造函數的 permits)線程進入管程內部,因此也常用它來做限流。
UML 類圖如下:
Semaphore的構造函數會傳入一個int類型參數,用來初始化state的值。
3.4.1 acquire
獲取鎖的操作調用了 AQS 中的 acquireSharedInterruptibly 方法,傳入參數 1,代碼見 CountDownLatch 中 await 小節。Semaphore 在公平鎖和非公平鎖中分別實現了 tryAcquireShared 方法。
3.4.1.1 公平鎖
Semaphore 默認使用非公平鎖,如果使用公平鎖,需要在構造函數指定。獲取公平鎖邏輯比較簡單,如下圖:
3.4.1.2 非公平鎖
acquire 在非公平的鎖唯一的區別就是不會判斷 AQS 隊列是否有前置節點(hasQueuedPredecessors),而是直接嘗試獲取鎖。
除了 acquire 方法外,還有其他幾個獲取鎖的方法,原理類似,只是調用了 AQS 中的不同方法。
3.4.2 release
釋放鎖的操作調用了 AQS 中的 releaseShared(int arg) 方法,傳入參數 1,在內部類 Sync 中實現了 tryReleaseShared 方法,邏輯很簡單:使用 CAS 的方式將 state 的值加 1,之后喚醒隊列中的后繼節點。
3.5 ThreadPoolExecutor
ThreadPoolExecutor 中也用到了 AQS,看下面的 UML 類圖:
Worker 主要在 ThreadPoolExecutor 中斷線程的時候使用。Worker 自己實現了獨占鎖,在中斷線程時首先進行加鎖,中斷操作后釋放鎖。按照官方說法,這里不直接使用 ReentrantLock 的原因是防止調用控制線程池的方法(類似 setCorePoolSize)時能夠重新獲取到鎖,
3.5.1 tryAcquire
使用 CAS 的方式把 AQS 中 state 從 0 改為 1,把當前線程置為獨占線程。
3.5.2 tryRelease
把獨占線程置為空,把 AQS 中 state 改為 0。
Worker 初始化的時候會把 state 置為 -1,這樣是不能獲取鎖成功的。只有調用了 runWorker 方法,才會通過釋放鎖操作把 state 更為 0。這樣保證了只中斷運行中的線程,而不會中斷等待中的線程。
3.6 總結
AQS 基于雙向隊列實現了入口等待隊列,基于 state 變量實現了各種并發鎖,上篇文章講了入口等待隊列,而這篇文章主要講了基于 AQS 的并發鎖原理。
4、條件變量等待隊列
本章節主要講解管程模型中條件變量等待隊列。
4.1 官方示例
首先我們看一下官方給出的示例代碼:
這個代碼定義了兩個條件變量,notFull 和 notEmpty,說明如下:
- 如果 items 數組已經滿了,則 notFull 變量不滿足,線程需要進入 notFull 條件等待隊列進行等待。當 take 方法取走一個數組元素時,notFull 條件滿足了,喚醒 notFull 條件等待隊列中等待線程。
- 如果 items 數組為空,則 notEmpty 變量不滿足,線程需要進入 notEmpty 條件等待隊列進行等待。當 put 方法加入一個數組元素時,notEmpty 條件滿足了,喚醒 notEmpty 條件等待隊列中等待線程。
- 條件變量是綁定在 Lock 上的,示例代碼使用了 ReentrantLock。在執行 await 和 signal 方法時首先要獲取到鎖。
4.2 原理簡介
Java AQS 的條件變量等待隊列是基于接口 Condition 和 ConditionObject 來實現的,URM 類圖如下:
Condition 接口主要定義了下面3個方法:
- await:進入條件等待隊列
- signal:喚醒條件等待隊列中的元素
- signalAll:喚醒條件等待隊列中的所有元素
4.3 await
條件等待隊列跟入口等待隊列有兩個不同:
- 雖然二者共用了 Node 類,但是條件等待隊列是單向隊列,入口等待隊列是雙向隊列,條件隊列中下一個節點的引用是 nextWaiter,入口等待隊列中下一個節點的引用是 next。
- 條件等待隊列中元素的 waitStatus 必須是 -2。await 方法的流程如下圖:
4.3.1 進入條件等待隊列
入隊方法對應方法 addConditionWaiter,這里有三種情況:
- 隊列為空,則新建一個節點,如下圖:
- 隊列非空,最后一個元素的 waitStatus 是 -2,如下圖:
- 隊列非空,最后一個元素的 waitStatus 不是 -2,如下圖:
可以看到,這種情況會從隊列第一個元素開始檢查 waitStatus 不是 -2 的元素,并從隊列中移除。
4.3.2 釋放鎖
AQS 的并發鎖是基于 state 變量實現的,線程進入條件等待隊列后,要釋放鎖,即 state 會變為 0,釋放操作會喚醒入口等待隊列中的線程。對應方法 fullyRelease,返回值是釋放鎖減掉的 state 值 savedState。
4.3.3 阻塞等待
釋放鎖后,線程阻塞,自旋等待被喚醒。
4.3.4 喚醒之后
喚醒之后,當前線程主要有四個動作:
- 轉入入口等待隊列,并把 waitStatus 改為 0。waitStatus 等于 0 表示中間狀態,當前節點后面的節點已經喚醒,但是當前節點線程還沒有執行完成。
- 重新獲取鎖,如果獲取成功,則當前線程成為入口等待隊列頭結點,interruptMode 置為 1。
- 如果當前節點在條件等待隊列中有后繼節點,則剔除條件等待隊列中 waitStatus!=-2 的節點,即隊列中狀態為取消的節點。
- interruptMode 如果不等于 0,則處理中斷。
4.3.5 一個細節
上面提到了 interruptMode,這個屬性有三個值:
- 0:沒有被中斷
- -1:中斷后拋出 InterruptedException,這種情況是當前線程阻塞,沒有被 signal 之前發生了中斷
- 1:重新進入中斷狀態,這種情況是指當前線程阻塞,被 signal 之后發生了中斷
4.3.6 擴展
AQS 還提供了其他幾個 await 方法,如下:
- awaitUninterruptibly:不用處理中斷。
- awaitNanos:自旋等待喚醒過程中有超時時間限制,超時則轉入入口等待隊列。
- awaitUntil:自旋等待喚醒過程中有截止時間,時間到則轉入入口等待隊列。
4.4 signal
喚醒條件等待隊列中的元素,首先判斷當前線程是否持有獨占鎖,如果沒有,拋出異常。
喚醒條件隊列中的元素,會從第一個元素也就是 firstWaiter 開始,根據 firstWaiter 的 waitStatus 是不是 -2,分兩種情況。
4.4.1 waitStatus==-2
條件隊列第一個節點進入入口等待隊列,等待獲取鎖,如下圖:
這里有兩個注意點:
- 如果入口等待隊列中 tail 節點的 waitStatus 小于等于 0,則 firstWaiter 加入后需要把舊 tail 節點置為 -1 (表示后面節點等待當前節點喚醒),如下圖:
如果重置 waitStatus 狀態失敗,則 unpark 節點 firstWaiter。
- 如果入口等待隊列中 tail 節點的 waitStatus 大于 0,則 unpark 節點 firstWaiter。
4.4.2 waitStatus!=-2
如果 firstWaiter 的 waitStatus 不等于 -2,則查找 firstWaiter 的 nextWaiter,直到找到一個 waitStatus 等于 -2 的節點,然后將這個節點加入入口等待隊列隊尾,如下圖:
4.4.3 waitStatus 修改
上面的兩種情況無論哪種,進入入口等待隊列之前都要用 CAS 的方式把 waitStatus 改為 0。
4.5 signalAll
理解了 signal 的邏輯,signalAll 的邏輯就非常容易理解了。首先判斷當前線程是否持有獨占鎖,如果沒有,拋出異常。
將條件等待隊列中的所有節點依次加入入口等待隊列。如下圖:
4.6 使用案例
4.6.1 示例代碼
Java 并發包下有很多類使用到了 AQS 中的 Condition,如下圖:
這里我們以 CyclicBarrier 為例來講解。CyclicBarrier 是讓一組線程相互等待共同達到一個屏障點。從 Cyclic 可以看出 Barrier 可以循環利用,也就是當線程釋放之后可以繼續使用。
看下面這段示例代碼:
執行結果:
4.6.2 原理講解
CyclicBarrier 初始化的時候,會指定線程的數量 count,每個線程執行完邏輯后,調用 CyclicBarrier 的 await 方法,這個方法首先將 count 減 1,然后調用 Condition的 await,讓當前線程進入條件等待隊列。當最后一個線程將 count 減 1 后,count 數量等于 0,這時就會調用 Condition 的 signalAll 方法喚醒所有線程。
4.7 總結
Java 的管程模型使用了 MESA 模型,基于 AQS 實現的 MESA 模型中,使用雙向隊列實現了入口等待隊列,使用變量 state 實現了并發鎖,使用 Condition 實現了條件等待隊列。
在 AQS 的實現中,使用同步隊列這個術語來表示雙向隊列,本文中使用入口等待隊列來描述是為了更好的配合管程模型來講解。
AQS 的 Condition 中,使用 await 方法將當前線程放入條件變量等待隊列阻塞等待,使用 notify 來喚醒條件等待隊列中的線程,被喚醒之后,線程并不能立刻執行,而是進入入口等待隊列等待獲取鎖。