面試侃集合 | ArrayBlockingQueue篇
面試官:平常在工作中你都用過什么什么集合?
Hydra:用過 ArrayList、HashMap,呃…沒有了
面試官:好的,回家等通知吧…
不知道大家在面試中是否也有過這樣的經歷,工作中僅僅用過的那么幾種簡單的集合,被問到時就會感覺捉襟見肘。在面試中,如果能夠講清一些具有特殊使用場景的集合工具類的原理,一定能秀的面試官頭皮發麻。于是Hydra苦學半月,再次來和面試官對線
面試官:又來了老弟,讓我看看你這半個月學了些什么
Hydra:那就先從ArrayBlockingQueue 中開始聊吧,它是一個具有線程安全性和阻塞性的有界隊列
面試官:好啊,那先給我解釋一下它的線程安全性
Hydra:ArrayBlockingQueue的線程安全是通過底層的ReentrantLock保證的,因此在元素出入隊列操作時,無需額外加鎖。寫一段簡單的代碼舉個例子,從具體的使用來說明它的線程安全吧
- ArrayBlockingQueue<Integer> queue=new ArrayBlockingQueue(7,
- true, new ArrayList<>(Arrays.asList(new Integer[]{1,2,3,4,5,6,7})));
- @AllArgsConstructor
- class Task implements Runnable{
- String threadName;
- @Override
- public void run() {
- while(true) {
- try {
- System.out.println(threadName+" take: "+queue.take());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- private void queueTest(){
- new Thread(new Task("Thread 1")).start();
- new Thread(new Task("Thread 2")).start();
- }
在代碼中創建隊列時就往里放入了7個元素,然后創建兩個線程各自從隊列中取出元素。對隊列的操作也非常簡單,只用到了操作隊列中出隊方法take,運行結果如下:
- Thread 1 take: 1
- Thread 2 take: 2
- Thread 1 take: 3
- Thread 2 take: 4
- Thread 1 take: 5
- Thread 2 take: 6
- Thread 1 take: 7
可以看到在公平模式下,兩個線程交替對隊列中的元素執行出隊操作,并沒有出現重復取出的情況,即保證了多個線程對資源競爭的互斥訪問。它的過程如下:

面試官:那它的阻塞性呢?
Hydra:好的,還是寫段代碼通過例子來說明
- private static void queueTest() throws InterruptedException {
- ArrayBlockingQueue<Integer> queue=new ArrayBlockingQueue<>(3);
- int size=7;
- Thread putThread=new Thread(()->{
- for (int i = 0; i <size ; i++) {
- try {
- queue.put(i);
- System.out.println("PutThread put: "+i+" - Size:"+queue.size());
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- Thread takeThread = new Thread(() -> {
- for (int i = 0; i < size+1 ; i++) {
- try {
- Thread.sleep(3000);
- System.out.println("TakeThread take: "+queue.take());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- putThread.start();
- Thread.sleep(1000);
- takeThread.start();
- }
和第一個例子中的代碼不同,這次我們創建隊列時只指定長度,并不在初始化時就往隊列中放入元素。接下來創建兩個線程,一個線程充當生產者,生產產品放入到隊列中,另一個線程充當消費者,消費隊列中的產品。需要注意生產和消費的速度是不同的,生產者每一秒生產一個,而消費者每三秒才消費一個。執行上面的代碼,運行結果如下:
- PutThread put: 0 - Size:1
- PutThread put: 1 - Size:2
- PutThread put: 2 - Size:3
- TakeThread take: 0
- PutThread put: 3 - Size:3
- TakeThread take: 1
- PutThread put: 4 - Size:3
- TakeThread take: 2
- PutThread put: 5 - Size:3
- TakeThread take: 3
- PutThread put: 6 - Size:3
- TakeThread take: 4
- TakeThread take: 5
- TakeThread take: 6
來給你畫個比較直觀的圖來演示一下吧:

分析運行結果,能夠在兩個方面體現出隊列的阻塞性:
- 入隊阻塞:當隊列中的元素個數等于隊列長度時,會阻塞向隊列中放入元素的操作,當有出隊操作取走隊列中元素,隊列出現空缺位置后,才會再進行入隊
- 出隊阻塞:當隊列中的元素為空時,執行出隊操作的線程將被阻塞,直到隊列不為空時才會再次執行出隊操作。在上面的代碼的出隊線程中,我們故意將出隊的次數設為了隊列中元素數量加一,因此這個線程最后會被一直阻塞,程序將一直執行不會結束
面試官:你只會用put和take方法嗎,能不能講講其他的方法?
Hydra:方法太多了,簡單概括一下插入和移除相關的操作吧
面試官:方法記得還挺清楚,看樣子是個合格的 API caller。下面說說原理吧,先講一下ArrayBlockingQueue 的結構
Hydra:在ArrayBlockingQueue 中有下面四個比較重要的屬性
- final Object[] items;
- final ReentrantLock lock;
- private final Condition notEmpty;
- private final Condition notFull;
- public ArrayBlockingQueue(int capacity, boolean fair) {
- if (capacity <= 0) throw new IllegalArgumentException();
- this.items = new Object[capacity];
- lock = new ReentrantLock(fair);
- notEmpty = lock.newCondition();
- notFull = lock.newCondition();
- }
在構造函數中對它們進行了初始化:
- Object[] items:隊列的底層由數組組成,并且數組的長度在初始化就已經固定,之后無法改變
- ReentrantLock lock:控制隊列操作的獨占鎖,在操作隊列的元素前需要獲取鎖,保護競爭資源
- Condition notEmpty:條件對象,如果有線程從隊列中獲取元素時隊列為空,就會在此進行等待,直到其他線程向隊列后插入元素才會被喚醒
- Condition notFull:如果有線程試圖向隊列中插入元素,且此時隊列為滿時,就會在這進行等待,直到其他線程取出隊列中的元素才會被喚醒
Condition是一個接口,代碼中的notFull和notEmpty實例化的是AQS的內部類ConditionObject,它的內部是由AQS中的Node組成的等待鏈,ConditionObject中有一個頭節點firstWaiter和尾節點lastWaiter,并且每一個Node都有指向相鄰節點的指針。簡單的來說,它的結構是下面這樣的:
至于它的作用先賣個關子,放在后面講。除此之外,還有兩個int類型的屬性takeIndex和putIndex,表示獲取元素的索引位置和插入元素的索引位置。假設一個長度為5的隊列中已經有了3個元素,那么它的結構是這樣的:
面試官:說一下隊列的插入操作吧
Hydra:好的,那我們先說add和offer方法,在執行add方法時,調用了其父類AbstractQueue中的add方法。add方法則調用了offer方法,如果添加成功返回true,添加失敗時拋出異常,看一下源碼:
- public boolean add(E e) {
- if (offer(e))
- return true;
- else
- throw new IllegalStateException("Queue full");
- }
- public boolean offer(E e) {
- checkNotNull(e);//檢查元素非空
- final ReentrantLock lock = this.lock; //獲取鎖并加鎖
- lock.lock();
- try {
- if (count == items.length)//隊列已滿
- return false;
- else {
- enqueue(e);//入隊
- return true;
- }
- } finally {
- lock.unlock();
- }
- }
實際將元素加入隊列的核心方法enqueue:
- private void enqueue(E x) {
- final Object[] items = this.items;
- items[putIndex] = x;
- if (++putIndex == items.length)
- putIndex = 0;
- count++;
- notEmpty.signal();
- }
在enqueue中,首先將元素放入數組中下標為putIndex的位置,然后對putIndex自增,并判斷是否已處于隊列中最后一個位置,如果putIndex索引位置等于數組的長度時,那么將putIndex置為0,即下一次在元素入隊時,從隊列頭開始放置。
舉個例子,假設有一個長度為5的隊列,現在已經有4個元素,我們進行下面一系列的操作,來看一下索引下標的變化:
上面這個例子提前用到了隊列中元素被移除時takeIndex會自增的知識點,通過這個例子中索引的變化,可以看出ArrayBlockingQueue就是一個循環隊列,takeIndex就相當于隊列的頭指針,而putIndex相當于隊列的尾指針的下一個位置索引。并且這里不需要擔心在隊列已滿時還會繼續向隊列中添加元素,因為在offer方法中會首先判斷隊列是否已滿,只有在隊列不滿時才會執行enqueue方法。
面試官:這個過程我明白了,那enqueue方法里最后的notEmpty.signal()是什么意思?
Hydra:這是一個喚醒操作,等后面講完它的掛起后再說。我還是先把插入操作中的put方講完吧,看一下它的源碼:
- public void put(E e) throws InterruptedException {
- checkNotNull(e);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == items.length)
- notFull.await();
- enqueue(e);
- } finally {
- lock.unlock();
- }
- }
put方法是一個阻塞方法,當隊列中元素未滿時,會直接調用enqueue方法將元素加入隊列中。如果隊列已滿,就會調用notFull.await()方法將掛起當前線程,直到隊列不滿時才會被喚醒,繼續執行插入操作。
當隊列已滿,再執行put操作時,就會執行下面的流程:
這里提前劇透一下,當隊列中有元素被移除,在調用dequeue方法中的notFull.signal()時,會喚醒等待隊列中的線程,并把對應的元素添加到隊列中,流程如下:
做一個總結,在插入元素的幾個方法中,add、offer以及帶有超時的offer方法都是非阻塞的,會立即返回或超時后立即返回,而put方法是阻塞的,只有當隊列不滿添加成功后才會被返回。
面試官:講的不錯,講完插入操作了再講講移除操作怎么樣?
Hydra:還是老規矩,先說非阻塞的方法remove和poll,父類的remove方法還是會調用子類的poll方法,不同的是remove方法在隊列為空時拋出異常,而poll會直接返回null。這兩個方法的核心還是調用的dequeue方法,它的源碼如下:
- private E dequeue() {
- final Object[] items = this.items;
- E x = (E) items[takeIndex];
- items[takeIndex] = null;
- if (++takeIndex == items.length)
- takeIndex = 0;
- count--;
- if (itrs != null)
- //更新迭代器中的元素
- itrs.elementDequeued();
- notFull.signal();
- return x;
- }
在dequeue中,在獲取到數組下標為takeIndex的元素,并將該位置置為null。將takeIndex自增后判斷是否與數組長度相等,如果相等還是按之前循環隊列的理論,將它的索引置為0,并將隊列的中的計數減1。
有一個隊列初始化時有5個元素,我們對齊分別進行5次的出隊操作,查看索引下標的變化情況:
然后我們還是結合take方法來說明線程的掛起和喚醒的操作,與put方法相對,take用于阻塞獲取元素,來看一下它的源碼:
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == 0)
- notEmpty.await();
- return dequeue();
- } finally {
- lock.unlock();
- }
- }
take是一個可以被中斷的阻塞獲取元素的方法,首先判斷隊列是否為空,如果隊列不為空那么就調用dequeue方法移除元素,如果隊列為空時就調用notEmpty.await()就將當前線程掛起,直到有其他的線程調用了enqueue方法,才會喚醒等待隊列中被掛起的線程??梢詤⒖枷旅娴膱D來理解:
當有其他線程向隊列中插入元素后:
入隊的enqueue方法會調用notEmpty.signal(),喚醒等待隊列中firstWaiter指向的節中的線程,并且該線程會調用dequeue完成元素的出隊操作。到這移除的操作就也分析完了,至于開頭為什么說ArrayBlockingQueue是線程安全的,看到每個方法前都通過全局單例的lock加鎖,相信你也應該明白了