成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

為了帶你精通 Java AQS,我畫了 40 張圖,從管程模型講起!

開發 前端
Java 的管程模型使用了 MESA 模型,基于 AQS 實現的 MESA 模型中,使用雙向隊列實現了入口等待隊列,使用變量 state 實現了并發鎖,使用 Condition 實現了條件等待隊列。

大家好,我是君哥。

Java中 AQS 是 AbstractQueuedSynchronizer 類,AQS 依賴 FIFO 隊列來提供一個框架,這個框架用于實現鎖以及鎖相關的同步器,比如信號量、事件等。

在 AQS 中,主要有兩部分功能,一部分是操作 state 變量,第二部分是實現排隊和阻塞機制。

注意,AQS 并沒有實現任何同步接口,它只是提供了類似 acquireInterruptible 的方法,調用這些方法可以實現鎖和同步器。

1、管程模型

Java 使用 MESA 管程模型來管理類的成員變量和方法,讓這個類的成員變量和方法的操作是線程安全的。下圖是 MESA 管程模型,里面除了定義共享變量外,還定義了條件變量和條件變量等待隊列:

圖片

上圖中有三個知識點:

  • MESA 管程模型封裝了共享變量和對共享變量的操作,線程要進入管程內部,必須獲取到鎖,如果獲取鎖失敗就進入入口等待隊列阻塞等待。
  • 如果線程獲取到鎖,就進入到管程內部。但是進入到管程內部,也不一定能立刻操作共享變量,而是要看條件變量是否滿足,如果不滿足,只能進入條件變量等待隊列阻塞等待。
  • 在條件變量等待隊列中,如果被其他線程喚醒,也不一定能立刻操作共享變量,而是需要去入口等待隊列重新排隊等待獲取鎖。

Java 中的 MESA 管程模型有一點改進,就是管程內部只有一個條件變量和一個等待隊列。下圖是 AQS 的管程模型:

圖片

AQS 的管程模型依賴 AQS 中的 FIFO 隊列實現入口等待隊列,要進入管程內部,就由各種并發鎖的限制。而 ConditionObject 則實現了條件隊列,這個隊列可以創建多個。

下面就從入口等待隊列、并發鎖、條件等待隊列三個方面來帶你徹底理解 AQS。

2、入口等待隊列

2.1 獲取獨占鎖

獨占, 忽略 interrupts

public final void acquire(int arg){
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

這里的 tryAcquire 是抽象方法,由 AQS 的子類來實現,因為每個子類實現的鎖是不一樣的。

2.1.1 入隊

上面的代碼可以看到,獲取鎖失敗后,會先執行 addWaiter 方法加入隊列,然后執行 acquireQueued 方法自旋地獲取鎖直到成功。

addWaiter 代碼邏輯如下圖,簡單說就是把 node 入隊,入隊后返回 node 參數給 acquireQueued 方法:

圖片

這里有一個點需要注意,如果隊列為空,則新建一個 Node 作為隊頭。

2.1.2 入隊后獲取鎖

acquireQueued 自旋獲取鎖邏輯如下圖:

圖片

這里有幾個細節:

(1)waitStatus

  • CANCELLED(1):當前節點取消獲取鎖。當等待超時或被中斷(響應中斷),會觸發變更為此狀態,進入該狀態后節點狀態不再變化;
  • SIGNAL(-1):后面節點等待當前節點喚醒;
  • CONDITION(-2):Condition 中使用,當前線程阻塞在 Condition,如果其他線程調用了 Condition 的 signal 方法,這個結點將從等待隊列轉移到同步隊列隊尾,等待獲取同步鎖;
  • PROPAGATE(-3):共享模式,前置節點喚醒后面節點后,喚醒操作無條件傳播下去;
  • 0:中間狀態,當前節點后面的節點已經喚醒,但是當前節點線程還沒有執行完成。

(2)獲取鎖失敗后掛起

如果前置節點不是頭節點,或者前置節點是頭節點但當前節點獲取鎖失敗,這時當前節點需要掛起,分三種情況:

  • 前置節點 waitStatus=-1,如下圖:

圖片

  • 前置節點 waitStatus > 0,如下圖:

圖片

  • 前置節點 waitStatus < 0 但不等于 -1,如下圖:

圖片

(3)取消獲取鎖

如果獲取鎖拋出異常,則取消獲取鎖,如果當前節點是 tail 節點,分兩種情況如下圖:

圖片

如果當前節點不是 tail 節點,也分兩種情況,如下圖:

圖片

4.對中斷狀態忽略

5.如果前置節點的狀態是 0 或 PROPAGATE,會被當前節點自旋過程中更新成 -1,以便之后通知當前節點。

2.1.3 獨占 + 響應中斷

對應方法 acquireInterruptibly(int arg)。

跟忽略中斷(acquire方法)不同的是要響應中斷,下面兩個地方響應中斷:

  • 獲取鎖之前會檢查當前線程是否中斷。
  • 獲取鎖失敗入隊,在隊列中自旋獲取鎖的過程中也會檢查當前線程是否中斷。如果檢查到當前線程已經中斷,則拋出 InterruptedException,當前線程退出。

2.1.4 獨占 + 響應中斷 + 考慮超時

對應方法 tryAcquireNanos(int arg, long nanosTimeout)。

這個方法具備了獨占 + 響應中斷 + 超時的功能,下面2個地方要判斷是否超時:

  • 自旋獲取鎖的過程中每次獲取鎖失敗都要判斷是否超時;
  • 獲取鎖失敗 park 之前要判斷超時時間是否大于自旋的閾值時間 (spinForTimeoutThreshold = 1ns) 另外,park 線程的操作使用 parkNanos 傳入阻塞時間。

2.2 釋放獨占鎖

獨占鎖釋放分兩步:釋放鎖,喚醒后繼節點。

釋放鎖的方法 tryRelease 是抽象的,由子類去實現。

我們看一下喚醒后繼節點的邏輯,首先需要滿足兩個條件:

  • head 節點不等于 null;
  • head 節點 waitStatus 不等于 0。這里有兩種情況(在方法 unparkSuccessor):
  • 情況一,后繼節點 waitStatus <= 0,直接喚醒后繼節點,如下圖:

圖片

  • 情況二:后繼節點為空或者 waitStatus > 0,從后往前查找最接近當前節點的節點進行喚醒,如下圖:

圖片

2.3 獲取共享鎖

之前我們講了獨占鎖,這一小節我們談共享鎖,有什么不同呢?

2.3.1 共享,忽略 interrupts

對應方法 acquireShared,代碼如下:

public final void acquireShared(int arg){
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

2.3.2 tryAcquireShared

這里獲取鎖使用的方法是 tryAcquireShared,獲取的是共享鎖。獲取共享鎖跟獲取獨占鎖不同的是,會返回一個整數值,說明如下:

  • 返回負數:獲取鎖失敗。
  • 返回 0:獲取鎖成功但是之后再由線程來獲取共享鎖時就會失敗。
  • 返回正數:獲取鎖成功而且之后再有線程來獲取共享鎖時也可能會成功。所以需要把喚醒操作傳播下去。tryAcquireShared 獲取鎖失敗后(返回負數),就需要入隊后自旋獲取,也就是執行方法 doAcquireShared。

2.3.3 doAcquireShared

怎么判斷隊列中等待節點是在等待共享鎖呢?nextWaiter == SHARED,這個參數值是入隊新建節點的時候構造函數傳入的。

自旋過程中,如果獲取鎖成功(返回正數),首先把自己設置成新的 head 節點,然后把通知傳播下去。如下圖:

圖片

之后會喚醒后面節點并保證喚醒操作可以傳播下去。但是需要滿足四個條件中的一個:

  • tryAcquireShared 返回值大于0,有多余的鎖,可以繼續喚醒后繼節點。
  • 舊的 head 節點 waitStatus < 0,應該是其他線程釋放共享鎖過程中把它的狀態更新成了 -3。
  • 新的 hade 節點 waitStatus < 0,只要不是 tail 節點,就可能是 -1。這里會造成不必要的喚醒,因為喚醒后獲取不到鎖只能繼續入隊等待。
  • 當前節點的后繼節點是空或者非空但正在等待共享鎖。

喚醒后面節點的操作,其實就是釋放共享鎖,對應方法是 doReleaseShared,見釋放共享鎖一節。

2.3.4 共享 + 響應中斷

對應方法 acquireSharedInterruptibly(int arg)。

跟共享忽略中斷(acquireShared 方法)不同的是要響應中斷,下面兩個地方響應中斷:

  • 獲取鎖之前會檢查當前線程是否中斷。
  • 獲取鎖失敗入隊,在隊列中自旋獲取鎖的過程中也會檢查當前線程是否中斷。

如果檢查到當前線程已經中斷,則拋出 InterruptedException,當前線程退出。

2.3.5 共享 + 響應中斷 + 考慮超時

對應方法 tryAcquireSharedNanos(int arg, long nanosTimeout)。

這個方法具備了共享 + 響應中斷 + 超時的功能,下面兩個個地方要判斷是否超時:

  • 自旋獲取鎖的過程中每次獲取鎖失敗都要判斷是否超時。
  • 獲取鎖失敗 park 之前要判斷超時時間是否大于自旋的閾值時間(spinForTimeoutThreshold = 1ns)。

另外,park 線程的操作使用 parkNanos 傳入阻塞時間。

2.4 釋放共享鎖

釋放共享鎖代碼如下:

public final boolean releaseShared(int arg){
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

首先嘗試釋放共享鎖,tryReleaseShared 代碼由子類來實現。釋放成功后執行AQS中的 doReleaseShared 方法,是一個自旋操作。

自旋的條件是隊列中至少有兩個節點,這里分三種情況。

情況一:當前節點 waitStatus 是 -1,如下圖:

圖片

情況二:當前節點 waitStatus 是 0(被其他線程更新新成了中間狀態),如下圖:

圖片

情況三:當前節點 waitStatus 是 -3,為什么會這樣呢?需要解釋一下,head節點喚醒后繼節點之前 waitStatus 已經被更新中間態 0 了,喚醒后繼節點動作還沒有執行,又被其他線程更成了 -3,也就是其他線程釋放鎖執行了上面情況二。這時需要先把 waitStatus 再更成 0 (在方法 unparkSuccessor),如下圖:

圖片

2.5 抽象方法

上面的講解可以看出,如果要基于 AQS 來實現并發鎖,可以根據需求重寫下面四個方法來實現,這四個方法在 AQS 中沒有具體實現:

  • tryAcquire(int arg):獲取獨占鎖
  • tryRelease(int arg):釋放獨占鎖
  • tryAcquireShared(int arg):獲取共享鎖
  • tryReleaseShared(int arg):釋放共享鎖

AQS 的子類需要重寫上面的方法來修改 state 值,并且定義獲取鎖或者釋放鎖時 state 值的變化。子類也可以定義自己的 state 變量,但是只有更新 AQS 中的 state變量才會對同步起作用。

還有一個判斷當前線程是否持有獨占鎖的方法 isHeldExclusively,也可以供子類重寫后使用。

獲取/釋放鎖的具體實現放到下篇文章講解。

2.6 總結

AQS 使用 FIFO 隊列實現了一個鎖相關的并發器模板,可以基于這個模板來實現各種鎖,包括獨占鎖、共享鎖、信號量等。

AQS 中,有一個核心狀態是 waitStatus,這個代表節點的狀態,決定了當前節點的后續操作,比如是否等待喚醒,是否要喚醒后繼節點。

3 并發鎖

這一章節講解 Java AQS 中的并發鎖。其實 Java AQS 中的并發鎖主要是基于 state 這個變量值來實現的。

3.1 ReentrantLock

我們先來看一下 UML 類圖:

圖片

從圖中可以看到,ReentrantLock 使用抽象內部類 Sync 來實現了 AQS 的方法,然后基于 Sync 這個同步器實現了公平鎖和非公平鎖。主要實現了下面 3 個方法:

  • tryAcquire(int arg):獲取獨占鎖
  • tryRelease(int arg):釋放獨占鎖
  • isHeldExclusively:當前線程是否占有獨占鎖。ReentrantLock 默認實現的是非公平鎖,可以在構造函數指定。

從實現的方法可以看到,ReentrantLock 中獲取的鎖是獨占鎖,我們再來看一下獲取和釋放獨占鎖的代碼:

public final void acquire(int arg){
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

獨占鎖的特點是調用上面 acquire 方法,傳入的參數是 1。

3.1.1 獲取公平鎖

獲取鎖首先判斷同步狀態(state)的值。

3.1.1.1 state 等于 0

這說明沒有線程占用鎖,當前線程如果符合下面兩個條件,就可以獲取到鎖:

沒有前任節點,如下圖:

圖片

CAS 的方式更新 state 值(把 0 更新成 1)成功。如果獲取獨占鎖成功,會更新 AQS 中 exclusiveOwnerThread 為當前線程,這個很容易理解。

3.1.1.2 state 不等于 0

這說明已經有線程占有鎖,判斷占有鎖的線程是不是當前線程,如下圖:

圖片

state += 1 值如果小于 0,會拋出異常。

如果獲取鎖失敗,則進入 AQS 隊列等待喚醒。

3.1.2 獲取非公平鎖

跟公平鎖相比,非公平鎖的唯一不同是如果判斷到 state 等于 0,不用判斷有沒有前任節點,只要 CAS 設置 state 值(把 0 更新成 1)成功,就獲取到了鎖。

3.1.3 釋放鎖

公平鎖和非公平鎖,釋放邏輯完全一樣,都是在內部類 Sync 中實現的。釋放鎖需要注意兩點,如下圖:

圖片

為什么 state 會大于 1,因為是可以重入的,占有鎖的線程可以多次獲取鎖。

3.1.4 總結

公平鎖的特點是每個線程都要進行排隊,不用擔心線程永遠獲取不到鎖,但有個缺點是每個線程入隊后都需要阻塞和被喚醒,這一定程度上影響了效率。非公平鎖的特點是每個線程入隊前都會先嘗試獲取鎖,如果獲取成功就不會入隊了,這比公平鎖效率高。但也有一個缺點,隊列中的線程有可能等待很長時間,高并發下甚至可能永遠獲取不到鎖。

3.2 ReentrantReadWriteLock

我們先來看一下 UML 類圖:

圖片

從圖中可以看到,ReentrantReadWriteLock 使用抽象內部類Sync來實現了 AQS 的方法,然后基于 Sync 這個同步器實現了公平鎖和非公平鎖。主要實現了下面 3 個方法:

  • tryAcquire(int arg):獲取獨占鎖
  • tryRelease(int arg):釋放獨占鎖
  • tryAcquireShared(int arg):獲取共享鎖
  • tryReleaseShared(int arg):釋放共享鎖
  • isHeldExclusively:當前線程是否占有獨占鎖 可見ReentrantReadWriteLock里面同時用到了共享鎖和獨占鎖。

下圖是定義的幾個常用變量:

圖片

下面這 2 個方法用戶獲取共享鎖和獨占鎖的數量:

static int sharedCount(int c){ return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c){ return c & EXCLUSIVE_MASK; }

從sharedCount 可以看到,共享鎖的數量要右移 16 位獲取,也就是說共享鎖占了高 16 位。從上圖 EXCLUSIVE_MASK 的定義看到,跟 EXCLUSIVE_MASK 進行與運算,得到的是低 16 位的值,所以獨占鎖占了低 16 位。如下圖:

圖片

這樣上面獲取鎖數量的方法就很好理解了。

3.2.1 讀鎖

讀鎖的實現對應內部類 ReadLock。

3.2.1.1 獲取讀鎖

獲取讀鎖實際上是 ReadLock 調用了 AQS 的下面方法,傳入參數是 1:

public final void acquireShared(int arg){
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

ReentrantReadWriteLock 內部類 Sync 實現了 tryAcquireShared 方法,主要包括如下三種情況:

  1. 使用 exclusiveCount 方法查看 state 中是否有獨占鎖,如果有并且獨占線程不是當前線程,返回 -1,獲取失敗;
  2. 使用 sharedCount 查看 state 中共享鎖數量,如果讀鎖數量小于最大值(MAX_COUNT=65535),則再滿足下面 3 個條件就可以獲取成功并返回 1:

a.當前線程不需要阻塞(readerShouldBlock)。在公平鎖中,需要判斷是否有前置節點,如下圖就需要阻塞:

圖片

在非公平鎖中,則是判斷第一個節點是不是有獨占鎖,如下圖就需要阻塞:

圖片

b.使用 CAS 把 state 的值加 SHARED_UNIT(65536)。這里是不是就更理解讀鎖占高位的說法了,獲取一個讀鎖,state 的值就要加 SHARED_UNIT 這么多個。

c.給當前線程的 holdCount 加 1。

  1. 如果 2 失敗,自旋,重復上面的步驟直到獲取到鎖。tryAcquireShared (獲取共享鎖)會返回一個整數,如下:
  • 返回負數:獲取鎖失敗。
  • 返回 0:獲取鎖成功但是之后再由線程來獲取共享鎖時就會失敗。
  • 返回正數:獲取鎖成功而且之后再有線程來獲取共享鎖時也可能會成功。
3.2.1.2 釋放讀鎖

ReentrantReadWriteLock 釋放讀鎖是在 ReadLock 中調用了 AQS 下面方法,傳入的參數是1:

public final boolean releaseShared(int arg){
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

ReentrantReadWriteLock 內部類 Sync 實現了 releaseShared 方法,具體邏輯分為下面兩步:

  1. 當前線程 holdCounter 值減 1。
  2. CAS的方式將 state 的值減去 SHARED_UNIT。

3.2.2 寫鎖

寫鎖的實現對應內部類 WriteLock。

3.2.2.1 獲取寫鎖

ReentrantReadWriteLock 獲取寫鎖其實是在 WriteLock 中調用了 AQS 的下面方法,傳入參數 1:

public final void acquire(int arg){
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

在ReentrantReadWriteLock 內部類 Sync 實現了 tryAcquire 方法,首先獲取 state 值和獨占鎖數量(exclusiveCount),之后分如下兩種情況,如下圖:

圖片

1.state 不等于 0:

  • 獨占鎖數量等于 0,這時說明有線程占用了共享鎖,如果當前線程不是獨占線程,獲取鎖失敗。
  • 獨占鎖數量不等于 0,獨占鎖數量加 1 后大于 MAX_COUNT,獲取鎖失敗。
  • 上面 2 種情況不符合,獲取鎖成功,state 值加 1。2.state 等于 0,判斷當前線程是否需要阻塞(writerShouldBlock)。在公平鎖中,跟 readerShouldBlock 的邏輯完全一樣,就是判斷隊列中 head 節點的后繼節點是不是當前線程。在非公平鎖中,直接返回 false,即可以直接嘗試獲取鎖。

如果當前線程不需要阻塞,并且給 state 賦值成功,使用 CAS 方式把 state 值加 1,把獨占線程置為當前線程。

3.2.2.2 釋放寫鎖

ReentrantReadWriteLock 釋放寫鎖其實是在 WriteLock 中調用了 AQS 的下面方法,傳入參數 1:

public final boolean release(int arg){
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

ReentrantReadWriteLock 在 Sync 中實現了 tryRelease(arg) 方法,邏輯如下:

  1. 判斷當前線程是不是獨占線程,如果不是,拋出異常。
  2. state值減1后,用新state值判斷獨占鎖數量是否等于0
  • 如果等于0,則把獨占線程置為空,返回true,這樣上面的代碼就可以喚醒隊列中的后置節點了
  • 如果不等于0,返回false,不喚醒后繼節點。

3.3 CountDownLatch

我們先來看一下UML類圖:

圖片

從上面的圖中看出,CountDownLatch 的內部類 Sync 實現了獲取共享鎖和釋放共享鎖的邏輯。

使用 CountDownLatch 時,構造函數會傳入一個 int 類型的參數 count,表示調動 count 次的 countDown 后主線程才可以被喚醒。

public CountDownLatch(int count){
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

上面的 Sync(count) 就是將 AQS 中的 state 賦值為 count。

3.3.1 await

CountDownLatch 的 await 方法調用了 AQS 中的 acquireSharedInterruptibly(int arg),傳入參數 1,不過這個參數并沒有用。代碼如下:

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

Sync 中實現了 tryAcquireShared 方法,await 邏輯如下圖:

圖片

上面的自旋過程就是等待 state 的值不斷減小,只有 state 值成為 0 的時候,主線程才會跳出自旋執行之后的邏輯。

3.3.2 countDown

CountDownLatch 的 countDown 方法調用了 AQS 的 releaseShared(int arg),傳入參數 1,不過這個參數并沒有用。內部類 Sync 實現了 tryReleaseShared 方法,邏輯如下圖:

圖片

3.3.3 總結

CountDownLatch 的構造函數入參值會賦值給 state 變量,入隊操作是主線程入隊,每個子線程調用了countDown 后 state 值減 1,當 state 值成為 0 后喚醒主線程。

3.4 Semaphore

Semaphore 是一個信號量,用來保護共享資源。如果線程要訪問共享資源,首先從 Semaphore 獲取鎖(信號量),如果信號量的計數器等于 0,則當前線程進入 AQS 隊列阻塞等待。否則,線程獲取鎖成功,信號量減 1。使用完共享資源后,釋放鎖(信號量加 1)。

Semaphore 跟管程模型不一樣的是,允許多個(構造函數的 permits)線程進入管程內部,因此也常用它來做限流。

UML 類圖如下:

圖片

Semaphore的構造函數會傳入一個int類型參數,用來初始化state的值。

3.4.1 acquire

獲取鎖的操作調用了 AQS 中的 acquireSharedInterruptibly 方法,傳入參數 1,代碼見 CountDownLatch 中 await 小節。Semaphore 在公平鎖和非公平鎖中分別實現了 tryAcquireShared 方法。

3.4.1.1 公平鎖

Semaphore 默認使用非公平鎖,如果使用公平鎖,需要在構造函數指定。獲取公平鎖邏輯比較簡單,如下圖:

圖片

3.4.1.2 非公平鎖

acquire 在非公平的鎖唯一的區別就是不會判斷 AQS 隊列是否有前置節點(hasQueuedPredecessors),而是直接嘗試獲取鎖。

除了 acquire 方法外,還有其他幾個獲取鎖的方法,原理類似,只是調用了 AQS 中的不同方法。

3.4.2 release

釋放鎖的操作調用了 AQS 中的 releaseShared(int arg) 方法,傳入參數 1,在內部類 Sync 中實現了 tryReleaseShared 方法,邏輯很簡單:使用 CAS 的方式將 state 的值加 1,之后喚醒隊列中的后繼節點。

3.5 ThreadPoolExecutor

ThreadPoolExecutor 中也用到了 AQS,看下面的 UML 類圖:

圖片

Worker 主要在 ThreadPoolExecutor 中斷線程的時候使用。Worker 自己實現了獨占鎖,在中斷線程時首先進行加鎖,中斷操作后釋放鎖。按照官方說法,這里不直接使用 ReentrantLock 的原因是防止調用控制線程池的方法(類似 setCorePoolSize)時能夠重新獲取到鎖,

3.5.1 tryAcquire

使用 CAS 的方式把 AQS 中 state 從 0 改為 1,把當前線程置為獨占線程。

3.5.2 tryRelease

把獨占線程置為空,把 AQS 中 state 改為 0。

Worker 初始化的時候會把 state 置為 -1,這樣是不能獲取鎖成功的。只有調用了 runWorker 方法,才會通過釋放鎖操作把 state 更為 0。這樣保證了只中斷運行中的線程,而不會中斷等待中的線程。

3.6 總結

AQS 基于雙向隊列實現了入口等待隊列,基于 state 變量實現了各種并發鎖,上篇文章講了入口等待隊列,而這篇文章主要講了基于 AQS 的并發鎖原理。

4、條件變量等待隊列

本章節主要講解管程模型中條件變量等待隊列。

4.1 官方示例

首先我們看一下官方給出的示例代碼:

public class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}

這個代碼定義了兩個條件變量,notFull 和 notEmpty,說明如下:

  1. 如果 items 數組已經滿了,則 notFull 變量不滿足,線程需要進入 notFull 條件等待隊列進行等待。當 take 方法取走一個數組元素時,notFull 條件滿足了,喚醒 notFull 條件等待隊列中等待線程。
  2. 如果 items 數組為空,則 notEmpty 變量不滿足,線程需要進入 notEmpty 條件等待隊列進行等待。當 put 方法加入一個數組元素時,notEmpty 條件滿足了,喚醒 notEmpty 條件等待隊列中等待線程。
  3. 條件變量是綁定在 Lock 上的,示例代碼使用了 ReentrantLock。在執行 await 和 signal 方法時首先要獲取到鎖。

4.2 原理簡介

Java AQS 的條件變量等待隊列是基于接口 Condition 和 ConditionObject 來實現的,URM 類圖如下:

圖片

Condition 接口主要定義了下面3個方法:

  • await:進入條件等待隊列
  • signal:喚醒條件等待隊列中的元素
  • signalAll:喚醒條件等待隊列中的所有元素

4.3 await

條件等待隊列跟入口等待隊列有兩個不同:

  • 雖然二者共用了 Node 類,但是條件等待隊列是單向隊列,入口等待隊列是雙向隊列,條件隊列中下一個節點的引用是 nextWaiter,入口等待隊列中下一個節點的引用是 next。
  • 條件等待隊列中元素的 waitStatus 必須是 -2。await 方法的流程如下圖:

圖片

4.3.1 進入條件等待隊列

入隊方法對應方法 addConditionWaiter,這里有三種情況:

  • 隊列為空,則新建一個節點,如下圖:

圖片

  • 隊列非空,最后一個元素的 waitStatus 是 -2,如下圖:

圖片

  • 隊列非空,最后一個元素的 waitStatus 不是 -2,如下圖:

圖片

可以看到,這種情況會從隊列第一個元素開始檢查 waitStatus 不是 -2 的元素,并從隊列中移除。

4.3.2 釋放鎖

AQS 的并發鎖是基于 state 變量實現的,線程進入條件等待隊列后,要釋放鎖,即 state 會變為 0,釋放操作會喚醒入口等待隊列中的線程。對應方法 fullyRelease,返回值是釋放鎖減掉的 state 值 savedState。

4.3.3 阻塞等待

釋放鎖后,線程阻塞,自旋等待被喚醒。

4.3.4 喚醒之后

喚醒之后,當前線程主要有四個動作:

  • 轉入入口等待隊列,并把 waitStatus 改為 0。waitStatus 等于 0 表示中間狀態,當前節點后面的節點已經喚醒,但是當前節點線程還沒有執行完成。
  • 重新獲取鎖,如果獲取成功,則當前線程成為入口等待隊列頭結點,interruptMode 置為 1。
  • 如果當前節點在條件等待隊列中有后繼節點,則剔除條件等待隊列中 waitStatus!=-2 的節點,即隊列中狀態為取消的節點。
  • interruptMode 如果不等于 0,則處理中斷。

4.3.5 一個細節

上面提到了 interruptMode,這個屬性有三個值:

  • 0:沒有被中斷
  • -1:中斷后拋出 InterruptedException,這種情況是當前線程阻塞,沒有被 signal 之前發生了中斷
  • 1:重新進入中斷狀態,這種情況是指當前線程阻塞,被 signal 之后發生了中斷

4.3.6 擴展

AQS 還提供了其他幾個 await 方法,如下:

  • awaitUninterruptibly:不用處理中斷。
  • awaitNanos:自旋等待喚醒過程中有超時時間限制,超時則轉入入口等待隊列。
  • awaitUntil:自旋等待喚醒過程中有截止時間,時間到則轉入入口等待隊列。

4.4 signal

喚醒條件等待隊列中的元素,首先判斷當前線程是否持有獨占鎖,如果沒有,拋出異常。

喚醒條件隊列中的元素,會從第一個元素也就是 firstWaiter 開始,根據 firstWaiter 的 waitStatus 是不是 -2,分兩種情況。

4.4.1 waitStatus==-2

條件隊列第一個節點進入入口等待隊列,等待獲取鎖,如下圖:

圖片

這里有兩個注意點:

  • 如果入口等待隊列中 tail 節點的 waitStatus 小于等于 0,則 firstWaiter 加入后需要把舊 tail 節點置為 -1 (表示后面節點等待當前節點喚醒),如下圖:

圖片

如果重置 waitStatus 狀態失敗,則 unpark 節點 firstWaiter。

  • 如果入口等待隊列中 tail 節點的 waitStatus 大于 0,則 unpark 節點 firstWaiter。

4.4.2 waitStatus!=-2

如果 firstWaiter 的 waitStatus 不等于 -2,則查找 firstWaiter 的 nextWaiter,直到找到一個 waitStatus 等于 -2 的節點,然后將這個節點加入入口等待隊列隊尾,如下圖:

圖片

4.4.3 waitStatus 修改

上面的兩種情況無論哪種,進入入口等待隊列之前都要用 CAS 的方式把 waitStatus 改為 0。

4.5 signalAll

理解了 signal 的邏輯,signalAll 的邏輯就非常容易理解了。首先判斷當前線程是否持有獨占鎖,如果沒有,拋出異常。

將條件等待隊列中的所有節點依次加入入口等待隊列。如下圖:

圖片

4.6 使用案例

4.6.1 示例代碼

Java 并發包下有很多類使用到了 AQS 中的 Condition,如下圖:

圖片

這里我們以 CyclicBarrier 為例來講解。CyclicBarrier 是讓一組線程相互等待共同達到一個屏障點。從 Cyclic 可以看出 Barrier 可以循環利用,也就是當線程釋放之后可以繼續使用。

看下面這段示例代碼:

public static void main(String[] args){
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
System.out.println("柵欄中的線程執行完成");
});
ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.submit(() -> {
try {
System.out.println("線程1:" + Thread.currentThread().getName());
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
});

executorService.submit(() -> {
try {
System.out.println("線程2:" + Thread.currentThread().getName());
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
});

executorService.shutdown();
}

執行結果:

線程1:pool-1-thread-1
線程2:pool-1-thread-2
柵欄中的線程執行完成

4.6.2 原理講解

CyclicBarrier 初始化的時候,會指定線程的數量 count,每個線程執行完邏輯后,調用 CyclicBarrier 的 await 方法,這個方法首先將 count 減 1,然后調用 Condition的 await,讓當前線程進入條件等待隊列。當最后一個線程將 count 減 1 后,count 數量等于 0,這時就會調用 Condition 的 signalAll 方法喚醒所有線程。

4.7 總結

Java 的管程模型使用了 MESA 模型,基于 AQS 實現的 MESA 模型中,使用雙向隊列實現了入口等待隊列,使用變量 state 實現了并發鎖,使用 Condition 實現了條件等待隊列。

在 AQS 的實現中,使用同步隊列這個術語來表示雙向隊列,本文中使用入口等待隊列來描述是為了更好的配合管程模型來講解。

AQS 的 Condition 中,使用 await 方法將當前線程放入條件變量等待隊列阻塞等待,使用 notify 來喚醒條件等待隊列中的線程,被喚醒之后,線程并不能立刻執行,而是進入入口等待隊列等待獲取鎖。

責任編輯:武曉燕 來源: 君哥聊技術
相關推薦

2020-05-06 09:10:46

AQS同步器CAS

2021-12-02 15:20:49

Redis數據結構

2021-05-18 06:55:07

Java AQS源碼

2021-03-23 10:25:05

Redis數據結構

2020-01-02 09:14:23

Kubernetes內部容器

2021-06-16 17:45:24

javaMESA模型

2021-12-17 17:52:02

MySQL B+面試

2020-06-28 07:39:44

Kafka分布式消息

2020-11-16 10:50:27

KubernetesIngressLinux

2020-09-23 11:23:25

推薦系統廣告

2021-11-29 07:47:56

RocketMQ分布式消息

2022-02-28 11:10:42

ZGCG1收集器

2020-10-16 08:26:38

AQS通信協作

2015-04-08 10:44:27

微軟win10

2021-09-07 05:04:53

HTTPHTTP3.0面試

2021-05-07 17:11:19

負載均衡運維服務

2022-12-21 19:06:55

機器學習人工智能

2024-02-29 09:37:25

Java并發編程

2021-01-20 08:34:37

HBaseNoSQL數據庫

2024-07-03 08:28:44

HWKafkaLEO
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 日本一区二区高清不卡 | aaaaaaa片毛片免费观看 | 欧美男人天堂 | 亚洲国产精品99久久久久久久久 | 欧洲一区二区三区 | 黄色免费在线观看网址 | 国产免费观看一区 | 欧美成年人 | 国产网站在线免费观看 | 成人a在线 | 国产成人福利在线观看 | 成人h视频在线观看 | 国产亚洲精品综合一区 | 国产一区二区三区久久久久久久久 | 免费欧美 | 一区中文| 天天干,夜夜操 | 一级二级三级在线观看 | 一区二区三区精品在线 | 91久操网 | 欧美久久久网站 | 无码一区二区三区视频 | 日日人人 | 日韩av一区二区在线观看 | 国产欧美在线播放 | 在线播放一区 | 免费午夜电影 | 欧美一区二区在线观看 | 国产片一区二区三区 | 成人特级毛片 | 久久视频精品在线 | 成人亚洲片 | 日本午夜在线视频 | 精产国产伦理一二三区 | 欧美片网站免费 | 99精品亚洲国产精品久久不卡 | 日本精品在线一区 | 三a毛片 | 久久国产精品一区二区三区 | 成人综合伊人 | 亚洲精品一区中文字幕 |