10張圖詳解管程內部,進去看看
java對共享變量的操作管理使用了MESA管程模型。下圖是Java基于AQS實現的MESA管程模型:
上圖中有三個知識點:
- MESA管程模型封裝了共享變量和對共享變量的操作,線程要進入管程內部,必須獲取到鎖,如果獲取鎖失敗就進入入口等待隊列阻塞等待。
- 如果線程獲取到鎖,就進入到管程內部。但是進入到管程內部,也不一定能立刻操作共享變量,而是要看條件變量是否滿足,如果不滿足,只能進入條件變量等待隊列阻塞等待。
- 在條件變量等待隊列中,如果被其他線程喚醒,也不一定能立刻操作共享變量,而是需要去入口等待隊列重新排隊等待獲取鎖。
本文主要講解管程模型中條件變量等待隊列。
1 官方示例
首先我們看一下官方給出的示例代碼:
- public class BoundedBuffer {
- final Lock lock = new ReentrantLock();
- final Condition notFull = lock.newCondition();
- final Condition notEmpty = lock.newCondition();
- final Object[] items = new Object[100];
- int putptr, takeptr, count;
- public void put(Object x) throws InterruptedException {
- lock.lock();
- try {
- while (count == items.length)
- notFull.await();
- items[putptr] = x;
- if (++putptr == items.length) putptr = 0;
- ++count;
- notEmpty.signal();
- } finally {
- lock.unlock();
- }
- }
- public Object take() throws InterruptedException {
- lock.lock();
- try {
- while (count == 0)
- notEmpty.await();
- Object x = items[takeptr];
- if (++takeptr == items.length) takeptr = 0;
- --count;
- notFull.signal();
- return x;
- } finally {
- lock.unlock();
- }
- }
- }
這個代碼定義了兩個條件變量,notFull和notEmpty,說明如下:
- 如果items數組已經滿了,則notFull變量不滿足,線程需要進入notFull條件等待隊列進行等待。當take方法取走一個數組元素時,notFull條件滿足了,喚醒notFull條件等待隊列中等待線程。
- 如果items數組為空,則notEmpty變量不滿足,線程需要進入notEmpty條件等待隊列進行等待。當put方法加入一個數組元素時,notEmpty條件滿足了,喚醒notEmpty條件等待隊列中等待線程。
- 條件變量是綁定在Lock上的,示例代碼使用了ReentrantLock。在執行await和signal方法時首先要獲取到鎖。
2 原理簡介
Java AQS的條件變量等待隊列是基于接口Condition和ConditionObject來實現的,URM類圖如下:
Condition接口主要定義了下面3個方法:
- await:進入條件等待隊列
- signal:喚醒條件等待隊列中的元素
- signalAll:喚醒條件等待隊列中的所有元素
3 await
條件等待隊列跟入口等待隊列有兩個不同:
- 雖然二者共用了Node類,但是條件等待隊列是單向隊列,入口等待隊列是雙向隊列,條件隊列中下一個節點的引用是nextWaiter,入口等待隊列中下一個節點的引用是next。
- 條件等待隊列中元素的waitStatus必須是-2。
await方法的流程如下圖:
3.1 進入條件等待隊列
入隊方法對應方法addConditionWaiter,這里有三種情況:
- 隊列為空,則新建一個節點,如下圖:
- 隊列非空,最后一個元素的waitStatus是-2,如下圖:
- 隊列非空,最后一個元素的waitStatus不是-2,如下圖:
可以看到,這種情況會從隊列第一個元素開始檢查waitStatus不是-2的元素,并從隊列中移除。
3.2 釋放鎖
AQS的并發鎖是基于state變量實現的,線程進入條件等待隊列后,要釋放鎖,即state會變為0,釋放操作會喚醒入口等待隊列中的線程。對應方法fullyRelease,返回值是釋放鎖減掉的state值savedState。
3.3 阻塞等待
釋放鎖后,線程阻塞,自旋等待被喚醒。
3.4 喚醒之后
喚醒之后,當前線程主要有四個動作:
- 轉入入口等待隊列,并把waitStatus改為0。
waitStatus等于0表示中間狀態,當前節點后面的節點已經喚醒,但是當前節點線程還沒有執行完成。
- 重新獲取鎖,如果獲取成功,則當前線程成為入口等待隊列頭結點,interruptMode置為1。
- 如果當前節點在條件等待隊列中有后繼節點,則剔除條件等待隊列中waitStatus!=-2的節點,即隊列中狀態為取消的節點。
- interruptMode如果不等于0,則處理中斷。
3.5 一個細節
上面提到了interruptMode,這個屬性有三個值:
- 0:沒有被中斷
- -1:中斷后拋出InterruptedException,這種情況是當前線程阻塞,沒有被signal之前發生了中斷
- 1:重新進入中斷狀態,這種情況是指當前線程阻塞,被signal之后發生了中斷
3.6 擴展
AQS還提供了其他幾個await方法,如下:
- awaitUninterruptibly:不用處理中斷。
- awaitNanos:自旋等待喚醒過程中有超時時間限制,超時則轉入入口等待隊列。
- awaitUntil:自旋等待喚醒過程中有截止時間,時間到則轉入入口等待隊列。
4 signal
喚醒條件等待隊列中的元素,首先判斷當前線程是否持有獨占鎖,如果沒有,拋出異常。
喚醒條件隊列中的元素,會從第一個元素也就是firstWaiter開始,根據firstWaiter的waitStatus是不是-2,分兩種情況。
4.1 waitStatus==-2
條件隊列第一個節點進入入口等待隊列,等待獲取鎖,如下圖:
這里有兩個注意點:
- 如果入口等待隊列中tail節點的waitStatus小于等于0,則firstWaiter加入后需要把舊tail節點置為-1(表示后面節點等待當前節點喚醒),如下圖:
如果重置waitStatus狀態失敗,則unpark節點firstWaiter。
- 如果入口等待隊列中tail節點的waitStatus大于0,則unpark節點firstWaiter。
4.2 waitStatus!=-2
如果firstWaiter的waitStatus不等于-2,則查找firstWaiter的nextWaiter,直到找到一個waitStatus等于-2的節點,然后將這個節點加入入口等待隊列隊尾,如下圖:
4.3 waitStatus修改
上面的兩種情況無論哪種,進入入口等待隊列之前都要用CAS的方式把waitStatus改為0。
5 signalAll
理解了signal的邏輯,signalAll的邏輯就非常容易理解了。首先判斷當前線程是否持有獨占鎖,如果沒有,拋出異常。
將條件等待隊列中的所有節點依次加入入口等待隊列。如下圖:
6 使用案例
6.1 示例代碼
java并發包下有很多類使用到了AQS中的Condition,如下圖:
這里我們以CyclicBarrier為例來講解。CyclicBarrier是讓一組線程相互等待共同達到一個屏障點。從Cyclic可以看出Barrier可以循環利用,也就是當線程釋放之后可以繼續使用。
看下面這段示例代碼:
- public static void main(String[] args) {
- CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
- System.out.println("柵欄中的線程執行完成");
- });
- ExecutorService executorService = Executors.newFixedThreadPool(2);
- executorService.submit(() -> {
- try {
- System.out.println("線程1:" + Thread.currentThread().getName());
- cyclicBarrier.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
- executorService.submit(() -> {
- try {
- System.out.println("線程2:" + Thread.currentThread().getName());
- cyclicBarrier.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
- executorService.shutdown();
- }
執行結果:
- 線程1:pool-1-thread-1
- 線程2:pool-1-thread-2
- 柵欄中的線程執行完成
6.2 原理講解
CyclicBarrier初始化的時候,會指定線程的數量count,每個線程執行完邏輯后,調用CyclicBarrier的await方法,這個方法首先將count減1,然后調用Condition的await,讓當前線程進入條件等待隊列。當最后一個線程將count減1后,count數量等于0,這時就會調用Condition的signalAll方法喚醒所有線程。
7 總結
java的管程模型使用了MESA模型,基于AQS實現的MESA模型中,使用雙向隊列實現了入口等待隊列,使用變量state實現了并發鎖,使用Condition實現了條件等待隊列。
在AQS的實現中,使用同步隊列這個術語來表示雙向隊列,本文中使用入口等待隊列來描述是為了更好的配合管程模型來講解。
AQS的Condition中,使用await方法將當前線程放入條件等待隊列阻塞等待,使用notify來喚醒條件等待隊列中的線程,被喚醒之后,線程并不能立刻執行,而是進入入口等待隊列等待獲取鎖。