哈嘍大家好,我是阿Q!
??20張圖圖解ReentrantLock加鎖解鎖原理???文章一發,便引發了大家激烈的討論,更有小伙伴前來彈窗:平時加解鎖都是直接使用Synchronized?關鍵字來實現的,簡單好用,為啥還要引用ReentrantLock呢?
為了解決小伙伴的疑問,我們來對兩者做個簡單的比較吧:
相同點
兩者都是“可重入鎖”,即當前線程獲取到鎖對象之后,如果想繼續獲取鎖對象還是可以繼續獲取的,只不過鎖對象的計數器進行“+1”操作就可以了。
不同點
- ReentrantLock?是基于API?實現的,Synchronized?是依賴于JVM實現的;
- ReentrantLock?可以響應中斷,Synchronized是不可以的;
- ReentrantLock?可以指定是公平鎖還是非公平鎖,而Synchronized只能是非公平鎖;
- ReentrantLock的lock?是同步非阻塞,采用的是樂觀并發策略,Synchronized是同步阻塞的,使用的是悲觀并發策略;
- ReentrantLock?借助Condition?可以實現多路選擇通知,Synchronized?通過wait()和notify()/notifyAll()方法可以實現等待/通知機制(單路通知);
綜上所述,ReentrantLock?還是有區別于Synchronized的使用場景的,今天我們就來聊一聊它的多路選擇通知功能。
實戰
沒有實戰的“紙上談兵”都是扯淡,今天我們反其道而行,先拋出實戰Demo。
場景描述
加油站為了吸引更多的車主前來加油,在加油站投放了自動洗車機來為加油的汽車提供免費洗車服務。我們規定汽車必須按照“加油->洗車->駛離”的流程來加油,等前一輛汽車駛離之后才允許下一輛車進來加油。
代碼實現
首先創建鎖對象并生成三個Condition
/**
* 控制線程喚醒的標志
*/
private int flag = 1;
/**
* 創建鎖對象
*/
private Lock lock = new ReentrantLock();
/**
* 等待隊列
* c1對應加油
* c2對應洗車
* c3對應開車
*/
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
Condition c3 = lock.newCondition();
然后聲明加油、清洗、駛離的方法,并規定加完油之后去洗車并駛離加油站
/**
* 汽車加油
*/
public void fuelUp(int num){
lock.lock();
try {
while (flag!=1){
c1.await();
}
System.out.println("第"+num+"輛車開始加油");
flag = 2;
c2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 汽車清洗
*/
public void carWash(int num){
lock.lock();
try {
while (flag!=2){
c2.await();
}
System.out.println("第"+num+"輛車開始清洗");
flag = 3;
c3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 駛離
*/
public void drive(int num){
lock.lock();
try {
while (flag!=3){
c3.await();
}
System.out.println("第"+num+"輛車已經駛離加油站");
flag = 1;
c1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
其中await?為等待方法,signal為喚醒方法。
最后我們來定義main方法,模擬一下3輛車同時到達加油站的場景
public static void main(String[] args){
CarOperation carOperation = new CarOperation();
//汽車加油
new Thread(()->{
for (int i = 1; i < 4; i++) {
carOperation.fuelUp(i);
}
},"fuelUp").start();
//汽車清洗
new Thread(()->{
for (int i = 1; i < 4; i++) {
carOperation.carWash(i);
}
},"carRepair").start();
//駛離
new Thread(()->{
for (int i = 1; i < 4; i++) {
carOperation.drive(i);
}
},"drive").start();
}
使用是不是很絲滑?為了加深大家對Condition?的理解,接下來我們用圖解的方式分析一波Condition的原理~
圖解
大家都看到了,上邊的案例都是圍繞Condition?來操作的,那什么是Condition?呢?Condition是一個接口,里邊定義了線程等待和喚醒的方法。

代碼中調用的lock.newCondition()?實際調用的是Sync?類中的newCondition?方法,而ConditionObject?就是Condition的實現類。
final ConditionObject newCondition(){
return new ConditionObject();
}
我們發現它處于AQS?的內部,沒法直接實例化,所以需要配合ReentrantLock來使用。
ConditionObject

ConditionObject?內部維護了一個基于Node的FIFO?單向隊列,我們把它稱為等待隊列。firstWaiter?指向首節點,lastWaiter?指向尾節點,Node?中的nextWaiter?指向隊列中的下一個元素,并且等待隊列中節點的waitStatus都是-2。
了解了ConditionObject?的數據結構之后,我們就從源碼角度來圖解一下ReentrantLock的等待/喚醒機制。
await
首先找到AQS?類中await的源碼
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//將當前線程封裝成node加入等待隊列尾部
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
//檢測此節點的線程是否在同步隊上,如果不在,則說明該線程還不具備競爭鎖的資格,則繼續等待直到檢測到此節點在同步隊列上
while (!isOnSyncQueue(node)) {
//當node處于等待隊列時,掛起當前線程。
LockSupport.park(this);
//如果發生了中斷,則跳出循環,結束等待
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被喚醒后該節點一定會在AQS隊列上,
//之前分析過acquireQueued方法獲取不到鎖會繼續阻塞
//獲取到了鎖,中斷過返回true,未中斷過返回false
//獲取到鎖存在中斷并且不是中斷喚醒的線程將中斷模式設置為重新中斷
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
//清除條件隊列中所有狀態不為 CONDITION 的結點
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
如果線程中斷,清除中斷標記并拋出異常。
查看addConditionWaiter
該方法的作用是將當前線程封裝成node加入等待隊列尾部
private Node addConditionWaiter(){
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
//將不處于等待狀態的結點從等待隊列中移除
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//尾節點為空
if (t == null)
//將首節點指向node
firstWaiter = node;
else
//將尾節點的nextWaiter指向node節點
t.nextWaiter = node;
//尾節點指向node
lastWaiter = node;
return node;
}
首先將t指向尾節點,如果尾節點不為空并且它的waitStatus!=-2,則將不處于等待狀態的結點從等待隊列中移除,并且將t指向新的尾節點。
將當前線程封裝成waitStatus為-2的節點追加到等待隊列尾部。
如果尾節點為空,則隊列為空,將首尾節點都指向當前節點。

如果尾節點不為空,證明隊列中有其他節點,則將當前尾節點的nextWaiter指向當前節點,將當前節點置為尾節點。

接著我們來查看下unlinkCancelledWaiters()方法——將不處于等待狀態的結點從等待隊列中移除。
private void unlinkCancelledWaiters(){
Node t = firstWaiter;
//trail是t的前驅結點
Node trail = null;
while (t != null) {
//next為t的后繼結點
Node next = t.nextWaiter;
//如果t節點的waitStatus不為-2即失效節點
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
//如果t的前驅節點為空,則將首節點指向next
if (trail == null)
firstWaiter = next;
else
//t的前驅結點不為空,將前驅節點的后繼指針指向next
trail.nextWaiter = next;
//如果next為null,則將尾節點指向t的前驅節點
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
t為當前節點,trail?為t的前驅節點,next為t的后繼節點。
while?方法會從首節點順著等待隊列往后尋找waitStatus!=-2?的節點,將當前節點的nextWaiter置為空。
如果當前節點的前驅節點為空,代表當前節點為首節點,則將next設置為首節點;

如果不為空,則將前驅節點的nextWaiter指向后繼節點。

如果后繼節點為空,則直接將前驅節點設置為尾節點。

查看fullyRelease
從名字也差不多能明白該方法的作用是徹底釋放鎖資源。
final int fullyRelease(Node node){
//釋放鎖失敗為true,釋放鎖成功為false
boolean failed = true;
try {
//獲取當前鎖的state
int savedState = getState();
//釋放鎖成功的話
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
//釋放鎖失敗的話將節點狀態置為取消
node.waitStatus = Node.CANCELLED;
}
}
最重要的就是release?方法,而我們上文中已經講過了,release執行成功的話,當前線程已經釋放了鎖資源。
查看isOnSyncQueue
判斷當前線程所在的Node?是否在同步隊列中(同步隊列即AQS隊列)。在這里有必要給大家看一下同步隊列與等待隊列的關系圖了。

final boolean isOnSyncQueue(Node node){
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null)
return true;
//node節點的next為null
return findNodeFromTail(node);
}
如果當前節點的waitStatus=-2?,說明它在等待隊列中,返回false?;如果當前節點有前驅節點,則證明它在AQS?隊列中,但是前驅節點為空,說明它是頭節點,而頭節點是不參與鎖競爭的,也返回false。
如果當前節點既不在等待隊列中,又不是AQS?中的頭結點且存在next?節點,說明它存在于AQS?中,直接返回true。
接著往下看,如果當前節點的next?為空,該節點可能是tail?節點,也可能是該節點的next還未賦值,所以需要從后往前遍歷節點。
private boolean findNodeFromTail(Node node){
Node t = tail;
for (;;) {
//先用尾節點來判斷,然后用隊列中的節點依次來判斷
if (t == node)
return true;
//節點為空,說明找到頭也不在AQS隊列中,返回false
if (t == null)
return false;
t = t.prev;
}
}
在遍歷過程中,如果隊列中有節點等于當前節點,返回true?;如果找到頭節點也沒找到,則返回false。
我們回到await的while?循環處,如果返回false,說明該節點不在同步隊列中,進入循環中掛起該線程。
知識點補充
阿Q的理解是線程被喚醒會存在兩種情況:一種是調用signal/signalAll喚醒線程;一種是通過線程中斷信號,喚醒線程并拋出中斷異常。
查看checkInterruptWhileWaiting(難點)
該方法的作用是判斷當前線程是否發生過中斷,如果未發生中斷返回0?,如果發生了中斷返回1?或者-1。
private int checkInterruptWhileWaiting(Node node){
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
我們來看看transferAfterCancelledWait?方法是如果區分1和-1的
final boolean transferAfterCancelledWait(Node node){
//cas嘗試將node的waitStatus設置為0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//將node節點由等待隊列加入AQS隊列
enq(node);
return true;
}
//cas失敗后,看看隊列是不是已經在AQS隊列中,如果不在,則通過yield方法給其它線程讓路
while (!isOnSyncQueue(node))
Thread.yield();
//如果已經在AQS隊列中,則返回false
return false;
}
那什么情況下cas操作會成功?什么情況下又會失敗呢?
當線程接收到中斷信號時會被喚醒,此時node的waitStatus=-2?,所以會cas?成功,同時會將node?從等待隊列轉移到AQS隊列中。
當線程先通過signal?喚醒后接收到中斷信號,由于signal?已經將node的waitStatus?設置為-2了,所以此時會cas失敗。
舉例
大家可以用下邊的例子在transferAfterCancelledWait中打斷點測試一下,相信就明了了。
public class CarOperation {
//創建一個重入鎖
private Lock lock = new ReentrantLock();
//聲明等待隊列
Condition c1 = lock.newCondition();
/*
* 等待操作
*/
public void await(){
lock.lock();
try {
System.out.println("開始阻塞");
c1.await();
System.out.println("喚醒之后繼續執行");
} catch (InterruptedException e) {
System.out.println("喚醒但是拋出異常了");
e.printStackTrace();
} finally {
lock.unlock();
}
}
/*
* 喚醒操作
*/
public void signal(){
lock.lock();
try {
c1.signal();
System.out.println("喚醒了。。。。。。。。。。。。。。");
} finally {
lock.unlock();
}
}
}
中斷測試
public static void main(String[] args){
CarOperation carOperation = new CarOperation();
Thread t1 = new Thread(()->{
//等待,掛起線程
carOperation.await();
});
t1.start();
try {
//模擬其它線程搶占資源執行過程
Thread.sleep(10000);
//發出線程中斷信號
t1.interrupt();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}

先喚醒后中斷測試
public static void main(String[] args){
CarOperation carOperation = new CarOperation();
Thread t1 = new Thread(()->{
carOperation.await();
});
t1.start();
try {
Thread.sleep(10000);
//先喚醒線程
carOperation.signal();
//后中斷
t1.interrupt();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}

查看reportInterruptAfterWait
//要么拋出異常,要么重新中斷。
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
以上就是await的全部內容了,我們先來做個簡單的總結。
總結
- 將當前線程封裝成node加入等待隊列尾部;
- 徹底釋放鎖資源,也就是將它的同步隊列節點從同步隊列隊首移除;
- 如果當前節點不在同步隊列中,掛起當前線程;
- 自旋,直到該線程被中斷或者被喚醒移動到同步隊列中;
- 阻塞當前節點,直到它獲取到鎖資源;
如果你哪個地方存在疑問可以小窗阿Q!
signal
接下來我們再來捋一捋喚醒的過程
public final void signal(){
//當前線程是否是鎖的持有者,不是的話拋出異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//具體的喚醒過程
doSignal(first);
}
private void doSignal(Node first){
do {
//獲取頭結點的下一個節點并賦值為頭結點
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//將之前的頭節點置為空
first.nextWaiter = null;
//將頭結點從等待隊列轉移到AQS隊列中,如果轉移失敗,則尋找下一個節點繼續轉移
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
首先將等待隊列的頭結點從等待隊列中取出來

然后執行transferForSignal方法進行轉移
final boolean transferForSignal(Node node){
//將node的waitStatus設置為0,如果設置失敗說明node的節點已經不在等待隊列中了,返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//將node從等待隊列轉移到AQS隊列,并返回node的前驅節點
Node p = enq(node);
//獲取node前驅節點的狀態
int ws = p.waitStatus;
//如果該節點是取消狀態或者將其設置為喚醒狀態失敗(說明本身已經是喚醒狀態了),所以可以去喚醒node節點所在的線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//喚醒當前節點
LockSupport.unpark(node.thread);
return true;
}
將等待隊列的頭結點從等待隊列轉移到AQS?隊列中,如果轉移失敗,說明該節點已被取消,直接返回false?,然后將first指向新的頭結點重新進行轉移。如果轉移成功則根據前驅節點的狀態判斷是否直接喚醒當前線程。

怎么樣?喚醒的邏輯是不是超級簡單?我們也按例做個簡單的總結。
總結
從等待隊列的隊首開始,嘗試對隊首節點執行喚醒操作,如果節點已經被取消了,就嘗試喚醒下一個節點。
對首節點執行喚醒操作時,首先將節點轉移到同步隊列,如果前驅節點的狀態為取消狀態或設置前驅節點的狀態為喚醒狀態失敗,那么就立即喚醒當前節點對應的線程,否則不執行喚醒操作。
以上就是今天的全部內容了,我們下期再見。感興趣的可以關注下公眾號,也可以來技術群討論問題呦!