想必大家都使用過wait()和notify()這兩個方法吧,這兩個方法主要用于多線程間的協同處理,即控制線程之間的等待、通知、切換及喚醒。而RenentrantLock也支持這樣條件變量的能力,而且相對于synchronized 更加強大,能夠支持多個條件變量。
概述
想必大家都使用過wait()和notify()這兩個方法吧,這兩個方法主要用于多線程間的協同處理,即控制線程之間的等待、通知、切換及喚醒。而RenentrantLock也支持這樣條件變量的能力,而且相對于synchronized 更加強大,能夠支持多個條件變量。
ReentrantLock條件變量使用
ReentrantLock類API
- Condition newCondition(): 創建條件變量對象
Condition類API
- void await(): 當前線程從運行狀態進入等待狀態,同時釋放鎖,該方法可以被中斷
- void awaitUninterruptibly():當前線程從運行狀態進入等待狀態,該方法不能夠被中斷
- void signal(): 喚醒一個等待在 Condition 條件隊列上的線程
- void signalAll(): 喚醒阻塞在條件隊列上的所有線程
@Test
public void testCondition() throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
//創建新的條件變量
Condition condition = lock.newCondition();
Thread thread0 = new Thread(() -> {
lock.lock();
try {
System.out.println("線程0獲取鎖");
// sleep不會釋放鎖
Thread.sleep(500);
//進入休息室等待
System.out.println("線程0釋放鎖,進入等待");
condition.await();
System.out.println("線程0被喚醒了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
thread0.start();
//叫醒
Thread thread1 = new Thread(() -> {
lock.lock();
try {
System.out.println("線程1獲取鎖");
//喚醒
condition.signal();
System.out.println("線程1喚醒線程0");
} finally {
lock.unlock();
System.out.println("線程1釋放鎖");
}
});
thread1.start();
thread0.join();
thread1.join();
}
運行結果:

- condition的wait和notify必須在lock范圍內
- 實現條件變量的等待和喚醒,他們必須是同一個condition。
- 線程1執行conidtion.notify()后,并沒有釋放鎖,需要等釋放鎖后,線程0重新獲取鎖成功后,才能繼續向下執行。
圖解實現原理
await過程
- 線程0(Thread-0)一開始獲取鎖,exclusiveOwnerThread字段是Thread-0, 如下圖中的深藍色節點

- Thread-0調用await方法,Thread-0封裝成Node進入ConditionObject的隊列,因為此時只有一個節點,所有firstWaiter和lastWaiter都指向Thread-0,會釋放鎖資源,NofairSync中的state會變成0,同時exclusiveOwnerThread設置為null。如下圖所示。

- 線程1(Thread-1)被喚醒,重新獲取鎖,如下圖的深藍色節點所示。

- Thread-0被park阻塞,如下圖灰色節點所示:

源碼如下:
下面是await()方法的整體流程,其中LockSupport.park(this)進行阻塞當前線程,后續喚醒,也會在這個程序點恢復執行。
public final void await() throws InterruptedException {
// 判斷當前線程是否是中斷狀態,是就直接給個中斷異常
if (Thread.interrupted())
throw new InterruptedException();
// 將調用 await 的線程包裝成 Node,添加到條件隊列并返回
Node node = addConditionWaiter();
// 完全釋放節點持有的鎖,因為其他線程喚醒當前線程的前提是【持有鎖】
int savedState = fullyRelease(node);
// 設置打斷模式為沒有被打斷,狀態碼為 0
int interruptMode = 0;
// 如果該節點還沒有轉移至 AQS 阻塞隊列, park 阻塞,等待進入阻塞隊列
while (!isOnSyncQueue(node)) {
// 阻塞當前線程,待會
LockSupport.park(this);
// 如果被打斷,退出等待隊列,對應的 node 【也會被遷移到阻塞隊列】尾部,狀態設置為 0
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 邏輯到這說明當前線程退出等待隊列,進入【阻塞隊列】
// 嘗試槍鎖,釋放了多少鎖就【重新獲取多少鎖】,獲取鎖成功判斷打斷模式
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// node 在條件隊列時 如果被外部線程中斷喚醒,會加入到阻塞隊列,但是并未設 nextWaiter = null
if (node.nextWaiter != null)
// 清理條件隊列內所有已取消的 Node
unlinkCancelledWaiters();
// 條件成立說明掛起期間發生過中斷
if (interruptMode != 0)
// 應用打斷模式
reportInterruptAfterWait(interruptMode);
}
- 將線程封裝成Node, 加入到ConditionObject隊列尾部,此時節點的等待狀態時-2。
private Node addConditionWaiter() {
// 獲取當前條件隊列的尾節點的引用,保存到局部變量 t 中
Node t = lastWaiter;
// 當前隊列中不是空,并且節點的狀態不是 CONDITION(-2),說明當前節點發生了中斷
if (t != null && t.waitStatus != Node.CONDITION) {
// 清理條件隊列內所有已取消的 Node
unlinkCancelledWaiters();
// 清理完成重新獲取 尾節點 的引用
t = lastWaiter;
}
// 創建一個關聯當前線程的新 node, 設置狀態為 CONDITION(-2),添加至隊列尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node; // 空隊列直接放在隊首【不用CAS因為執行線程是持鎖線程,并發安全】
else
t.nextWaiter = node; // 非空隊列隊尾追加
lastWaiter = node; // 更新隊尾的引用
return node;
}
- 清理條件隊列中的cancel類型的節點,比如中斷、超時等會導致節點轉換為Cancel
// 清理條件隊列內所有已取消(不是CONDITION)的 node,【鏈表刪除的邏輯】
private void unlinkCancelledWaiters(){
// 從頭節點開始遍歷【FIFO】
Node t = firstWaiter;
// 指向正常的 CONDITION 節點
Node trail = null;
// 等待隊列不空
while (t != null) {
// 獲取當前節點的后繼節點
Node next = t.nextWaiter;
// 判斷 t 節點是不是 CONDITION 節點,條件隊列內不是 CONDITION 就不是正常的
if (t.waitStatus != Node.CONDITION) {
// 不是正常節點,需要 t 與下一個節點斷開
t.nextWaiter = null;
// 條件成立說明遍歷到的節點還未碰到過正常節點
if (trail == null)
// 更新 firstWaiter 指針為下個節點
firstWaiter = next;
else
// 讓上一個正常節點指向 當前取消節點的 下一個節點,【刪除非正常的節點】
trail.nextWaiter = next;
// t 是尾節點了,更新 lastWaiter 指向最后一個正常節點
if (next == null)
lastWaiter = trail;
} else {
// trail 指向的是正常節點
trail = t;
}
// 把 t.next 賦值給 t,循環遍歷
t = next;
}
}
- fullyRelease方法將r讓Thread-0釋放鎖, 這個時候Thread-1就會去競爭鎖
// 線程可能重入,需要將 state 全部釋放
final int fullyRelease(Node node) {
// 完全釋放鎖是否成功,false 代表成功
boolean failed = true;
try {
// 獲取當前線程所持有的 state 值總數
int savedState = getState();
// release -> tryRelease 解鎖重入鎖
if (release(savedState)) {
// 釋放成功
failed = false;
// 返回解鎖的深度
return savedState;
} else {
// 解鎖失敗拋出異常
throw new IllegalMonitorStateException();
}
} finally {
// 沒有釋放成功,將當前 node 設置為取消狀態
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
final boolean isOnSyncQueue(Node node) {
// node 的狀態是 CONDITION,signal 方法是先修改狀態再遷移,所以前驅節點為空證明還【沒有完成遷移】
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 說明當前節點已經成功入隊到阻塞隊列,且當前節點后面已經有其它 node,因為條件隊列的 next 指針為 null
if (node.next != null)
return true;
// 說明【可能在阻塞隊列,但是是尾節點】
// 從阻塞隊列的尾節點開始向前【遍歷查找 node】,如果查找到返回 true,查找不到返回 false
return findNodeFromTail(node);
}
signal過程
- Thread-1執行signal方法喚醒條件隊列中的第一個節點,即Thread-0,條件隊列置空

- Thread-0的節點的等待狀態變更為0, 重新加入到AQS隊列尾部。

- 后續就是Thread-1釋放鎖,其他線程重新搶鎖。
源碼如下:
public final void signal() {
// 判斷調用 signal 方法的線程是否是獨占鎖持有線程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 獲取條件隊列中第一個 Node
Node first = firstWaiter;
// 不為空就將第該節點【遷移到阻塞隊列】
if (first != null)
doSignal(first);
}
// 喚醒 - 【將沒取消的第一個節點轉移至 AQS 隊列尾部】
private void doSignal(Node first){
do {
// 成立說明當前節點的下一個節點是 null,當前節點是尾節點了,隊列中只有當前一個節點了
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 將等待隊列中的 Node 轉移至 AQS 隊列,不成功且還有節點則繼續循環
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
// signalAll() 會調用這個函數,喚醒所有的節點
private void doSignalAll(Node first){
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
// 喚醒所有的節點,都放到阻塞隊列中
} while (first != null);
}
- 調用transferForSignal()方法,先將節點的 waitStatus 改為 0,然后加入 AQS 阻塞隊列尾部,將 Thread-3 的 waitStatus 改為 -1。
// 如果節點狀態是取消, 返回 false 表示轉移失敗, 否則轉移成功
final boolean transferForSignal(Node node) {
// CAS 修改當前節點的狀態,修改為 0,因為當前節點馬上要遷移到阻塞隊列了
// 如果狀態已經不是 CONDITION, 說明線程被取消(await 釋放全部鎖失?。┗蛘弑恢袛啵纱驍?cancelAcquire)
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
// 返回函數調用處繼續尋找下一個節點
return false;
// 【先改狀態,再進行遷移】
// 將當前 node 入阻塞隊列,p 是當前節點在阻塞隊列的【前驅節點】
Node p = enq(node);
int ws = p.waitStatus;
// 如果前驅節點被取消或者不能設置狀態為 Node.SIGNAL,就 unpark 取消當前節點線程的阻塞狀態,
// 讓 thread-0 線程競爭鎖,重新同步狀態
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
總結
本文講解了ReentrantLock中條件變量的使用和原理實現,希望對大家有幫助。