JUC之阻塞隊列BlockingQueue竟然有8種類型?
隊列是一種特殊的線性表,是一種先進先出(FIFO)的數據結構。它只允許在表的前端(front)進行刪除操作,而在表的后端(rear)進行插入操作。進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭。隊列中沒有元素時,稱為空隊列。
下面是Queue類的繼承關系圖:

Queue

Queue:隊列的上層接口,提供了插入、刪除、獲取元素這3種類型的方法,而且對每一種類型都提供了兩種方式,先來看看插入方法:
- add(E e):插入元素到隊尾,插入成功返回true,沒有可用空間拋出異常 IllegalStateException。
- offer(E e): 插入元素到隊尾,插入成功返回true,否則返回false。
add和offer作為插入方法的唯一不同就在于隊列滿了之后的處理方式。add拋出異常,而offer返回false。
再來看看刪除和獲取元素方法(和插入方法類似):
- remove():獲取并移除隊首的元素,該方法和poll方法的不同之處在于,如果隊列為空該方法會拋出異常,而poll不會。
- poll():獲取并移除隊首的元素,如果隊列為空,返回null。
- element():獲取隊列首的元素,該方法和peek方法的不同之處在于,如果隊列為空該方法會拋出異常,而peek不會。
- peek():獲取隊列首的元素,如果隊列為空,返回null。
如果隊列是空,remove和element方法會拋出異常,而poll和peek返回null。
- Queue 是單向隊列,為了提供更強大的功能,JDK在1.6的時候新增了一個雙向隊列
- Deque,用來實現更靈活的隊列操作。
Deque
Deque在Queue的基礎上,增加了以下幾個方法:
- addFirst(E e):在前端插入元素,異常處理和add一樣;
- addLast(E e):在后端插入元素,和add一樣的效果;
- offerFirst(E e):在前端插入元素,異常處理和offer一樣;
- offerLast(E e):在后端插入元素,和offer一樣的效果;
- removeFirst():移除前端的一個元素,異常處理和remove一樣;
- removeLast():移除后端的一個元素,和remove一樣的效果;
- pollFirst():移除前端的一個元素,和poll一樣的效果;
- pollLast():移除后端的一個元素,異常處理和poll一樣;
- getFirst():獲取前端的一個元素,和element一樣的效果;
- getLast():獲取后端的一個元素,異常處理和element一樣;
- peekFirst():獲取前端的一個元素,和peek一樣的效果;
- peekLast():獲取后端的一個元素,異常處理和peek一樣;
- removeFirstOccurrence(Object o):從前端開始移除第一個是o的元素;
- removeLastOccurrence(Object o):從后端開始移除第一個是o的元素;
- push(E e):和addFirst一樣的效果;
- pop():和removeFirst一樣的效果。
可以發現,其實很多方法的效果都是一樣的,只不過名字不同。比如Deque為了實現Stack的語義,定義了push和pop兩個方法。
BlockingQueue阻塞隊列
BlockingQueue(阻塞隊列),在Queue的基礎上實現了阻塞等待的功能。它是JDK 1.5中加入的接口,它是指這樣的一個隊列:當生產者向隊列添加元素但隊列已滿時,生產者會被阻塞;當消費者從隊列移除元素但隊列為空時,消費者會被阻塞。
BlockingQueue ,是java.util.concurrent 包提供的用于解決并發 生產者 — 消費者 問題的最有用的類,很好的解決了多線程中,如何高效安全“傳輸”數據的問題。它的特性是在任意時刻只有一個線程可以進行take或者put操作,并且 BlockingQueue 提供類超時 return null 的機制,在許多生產場景里都可以看到這個工具的身影。
總體認識
一般我們用到的阻塞隊列有哪些?可以通過下面一個類圖來總體看下:

可以看到 BlockingQueue 是一個接口,繼承它的另外還有兩個接口 BlockingDeque(雙端隊列)、TransferQueue(兩個線程之間傳遞元素)。
阻塞隊列的成員如下:

隊列類型
- 無限隊列(unbounded queue)— 幾乎可以無限增長
- 有限隊列(bounded queue)— 定義了最大容量
隊列數據結構
隊列實質就是一種存儲數據的結構
- 通常用鏈表或者數組實現
- 一般而言隊列具備FIFO先進先出的特性,當然也有雙端隊列(Deque)優先級隊列
- 主要操作:入隊(Enqueue)與 出對(Dequeue)

常見的5種阻塞隊列
- ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
- LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列。
- PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。
- DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
BlockingQueue API
BlockingQueue的核心方法
- public interface BlockingQueue<E> extends Queue<E> {
- //將給定元素設置到隊列中,如果設置成功返回true, 否則返回false。如果是往限定了長度的隊列中設置值,推薦使用offer()方法。
- boolean add(E e);
- //將給定的元素設置到隊列中,如果設置成功返回true, 否則返回false. e的值不能為空,否則拋出空指針異常。
- boolean offer(E e);
- //將元素設置到隊列中,如果隊列中沒有多余的空間,該方法會一直阻塞,直到隊列中有多余的空間。
- void put(E e) throws InterruptedException;
- //將給定元素在給定的時間內設置到隊列中,如果設置成功返回true, 否則返回false.
- boolean offer(E e, long timeout, TimeUnit unit)
- throws InterruptedException;
- //從隊列中獲取值,如果隊列中沒有值,線程會一直阻塞,直到隊列中有值,并且該方法取得了該值。
- E take() throws InterruptedException;
- //在給定的時間里,從隊列中獲取值,時間到了直接調用普通的poll方法,為null則直接返回null。
- E poll(long timeout, TimeUnit unit)
- throws InterruptedException;
- //獲取隊列中剩余的空間。
- int remainingCapacity();
- //從隊列中移除指定的值。
- boolean remove(Object o);
- //判斷隊列中是否擁有該值。
- public boolean contains(Object o);
- //將隊列中值,全部移除,并發設置到給定的集合中。
- int drainTo(Collection<? super E> c);
- //指定最多數量限制將隊列中值,全部移除,并發設置到給定的集合中。
- int drainTo(Collection<? super E> c, int maxElements);
- }
BlockingQueue 接口的所有方法可以分為兩大類:負責向隊列添加元素的方法和 檢索這些元素的方法。在隊列滿/空的情況下,來自這兩個組的每個方法的行為都不同。
添加元素

檢索元素

BlockingQueue最重要的也就是關于阻塞等待的幾個方法,而這幾個方法正好可以用來實現生產-消費的模型。
ArrayBlockingQueue
ArrayBlockingQueue 由數組支持的有界阻塞隊列,隊列基于數組實現,容量大小在創建 ArrayBlockingQueue 對象時已經定義好。 此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖,默認采用非公平鎖。
ArrayBlockingQueue 內部由ReentrantLock來實現線程安全,由Condition的await和signal來實現等待喚醒的功能。它的數據結構是數組,準確的說是一個循環數組(可以類比一個圓環),所有的下標在到達最大長度時自動從0繼續開始。
深入理解ArrayBlockingQueue可以閱讀《阻塞隊列 — ArrayBlockingQueue源碼分析》

LinkedBlockingQueue
LinkedBlockingQueue 由鏈表節點支持的可選有界隊列,是一個基于鏈表的無界隊列(理論上有界),隊列按照先進先出的順序進行排序。LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量,默認為 Integer.MAX_VALUE,也就是無界隊列。所以為了避免隊列過大造成機器負載或者內存爆滿的情況出現,我們在使用的時候建議手動傳一個隊列的大小。
LinkedBlockingQueue 內部由單鏈表實現,只能從head取元素,從tail添加元素。添加元素和獲取元素都有獨立的鎖,也就是說LinkedBlockingQueue是讀寫分離的,讀寫操作可以并行執行。LinkedBlockingQueue采用可重入鎖(ReentrantLock)來保證在并發情況下的線程安全。
向無限隊列添加元素的所有操作都將永遠不會阻塞,[注意這里不是說不會加鎖保證線程安全],因此它可以增長到非常大的容量。
使用無限 BlockingQueue 設計生產者 - 消費者模型時最重要的是 消費者應該能夠像生產者向隊列添加消息一樣快地消費消息。否則,內存可能會填滿,然后就會得到一個 OutOfMemory 異常。
深入理解LinkedBlockingQueue可以閱讀《阻塞隊列 — LinkedBlockingQueue源碼分析》

PriorityBlockingQueue
PriorityBlockingQueue 優先級隊列,線程安全(添加、讀取都進行了加鎖)、無界、讀阻塞的隊列,底層采用的堆結構實現(二叉樹),默認是小根堆,最小的或者最大的元素會一直置頂,每次獲取都取最頂端的數據。可以實現優先出隊。最特別的是它只有一個鎖,入隊操作永遠成功,而出隊只有在空隊列的時候才會進行線程阻塞。可以說有一定的應用場景吧,比如:有任務要執行,可以對任務加一個優先級的權重,這樣隊列會識別出來,對該任務優先進行出隊。
深入理解PriorityBlockingQueue可以閱讀《阻塞隊列 —PriorityBlockingQueue源碼分析》

DelayQueue
DelayQueue 由優先級支持的、基于時間的調度隊列,內部使用非線程安全的優先隊列(PriorityQueue)實現,而無界隊列基于數組的擴容實現。在創建元素時,可以指定多久才能從隊列中獲取當前元素。只有延時期滿后才能從隊列中獲取元素。
深入理解DelayQueue可以閱讀《阻塞隊列 — DelayQueue源碼分析》
SynchronousQueue
SynchronousQueue 一個不存儲元素的阻塞隊列,每一個 put 操作必須等待 take 操作,否則不能繼續添加元素。支持公平鎖和非公平鎖2種策略來訪問隊列。默認是采用非公平性策略訪問隊列。公平性策略底層使用了類似隊列的數據結構,而非公平策略底層使用了類似棧的數據結構。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
深入理解SynchronousQueue可以閱讀《阻塞隊列 — SynchronousQueue源碼分析》

LinkedTransferQueue
LinkedTransferQueue 是一個由鏈表結構組成的無界阻塞傳輸隊列,它是一個很多隊列的結合體(ConcurrentLinkedQueue,LinkedBlockingQueue,SynchronousQueue),在除了有基本阻塞隊列的功能(但是這個阻塞隊列沒有使用鎖)之外;隊列實現了TransferQueue接口重寫了transfer 和 tryTransfer 方法,這組方法和SynchronousQueue公平模式的隊列類似,具有匹配的功能。
深入理解LinkedTransferQueue可以閱讀《阻塞隊列 — LinkedTransferQueue源碼分析》

LinkedBlockingDeque
LinkedBlockingDeque 一個由于鏈表結構組成的雙向阻塞隊列,隊列頭部和尾部都可以添加和移除元素,多線程并發時,可以將鎖的競爭對多降到一半。
深入理解LinkedBlockingDeque可以閱讀《阻塞隊列 — LinkedBlockingDeque源碼分析》

DelayedWorkQueue
DelayedWorkQueue 也是一種設計為定時任務的延遲隊列,其實現原理和DelayQueue 基本一樣,核心數據結構是二叉最小堆的優先隊列,隊列滿時會自動擴容,不過是將優先級隊列和DelayQueue的實現過程遷移到本身方法體中,從而可以在該過程當中靈活的加入定時任務特有的方法調用。
深入理解DelayedWorkQueue可以閱讀《阻塞隊列 — DelayedWorkQueue源碼分析》

對比分析
LinkedBlockingQueue與ArrayBlockingQueue區別
- 隊列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無界的(Integer.MAX_VALUE),對于后者而言,當添加速度大于移除速度時,在無界的情況下,可能會造成內存溢出等問題。
- 數據存儲容器不同,ArrayBlockingQueue采用的是數組作為數據存儲容器,而LinkedBlockingQueue采用的則是以Node節點作為連接對象的鏈表。
- 由于ArrayBlockingQueue采用的是數組的存儲容器,因此在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而LinkedBlockingQueue則會生成一個額外的Node對象。這可能在長時間內需要高效并發地處理大批量數據的時,對于GC可能存在較大影響。
- 兩者的實現隊列添加或移除的鎖不一樣,ArrayBlockingQueue實現的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,而LinkedBlockingQueue實現的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量,也意味著在高并發的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高整個隊列的并發性能。
LinkedTransferQueue和SynchronousQueue(公平模式)區別
- LinkedTransferQueue 和SynchronousQueue 其實基本是差不多的,兩者都是無鎖帶阻塞功能的隊列,都是使用的雙重隊列;
- SynchronousQueue 通過內部類Transferer 來實現公平和非公平隊列,在LinkedTransferQueue 中沒有公平與非公平的區分;
- LinkedTransferQueue 實現了TransferQueue接口,該接口定義的是帶阻塞操作的操作,相比SynchronousQueue 中的Transferer 功能更豐富。
- SynchronousQueue 中放數據操作和取數據操作都是阻塞的,當隊列中的操作和本次操作不匹配時,線程會阻塞,直到匹配的操作到來。LinkedTransferQueue 是無界隊列,放數據操作不會阻塞,取數據操作如果沒有匹配操作可能會阻塞,通過參數決定是否阻塞(ASYNC,SYNC,NOW,TIMED)。
LinkedBlockingDeque與LinkedList區別
- package com.niuh.deque;
- import java.util.Iterator;
- import java.util.LinkedList;
- import java.util.Queue;
- import java.util.concurrent.LinkedBlockingDeque;
- /*
- * LinkedBlockingDeque是“線程安全”的隊列,而LinkedList是非線程安全的。
- *
- * 下面是“多個線程同時操作并且遍歷queue”的示例
- * (1) 當queue是LinkedBlockingDeque對象時,程序能正常運行。
- * (2) 當queue是LinkedList對象時,程序會產生ConcurrentModificationException異常。
- *
- */
- public class LinkedBlockingDequeRunner {
- // TODO: queue是LinkedList對象時,程序會出錯。
- // private static Queue<String> queue = new LinkedList<String>();
- private static Queue<String> queue = new LinkedBlockingDeque<String>();
- public static void main(String[] args) {
- // 同時啟動兩個線程對queue進行操作!
- new MyThread("A").start();
- new MyThread("B").start();
- }
- private static void printAll() {
- String value;
- Iterator iter = queue.iterator();
- while (iter.hasNext()) {
- value = (String) iter.next();
- System.out.print(value + ", ");
- }
- System.out.println();
- }
- private static class MyThread extends Thread {
- MyThread(String name) {
- super(name);
- }
- @Override
- public void run() {
- int i = 0;
- while (i++ < 6) {
- // “線程名” + "-" + "序號"
- String val = Thread.currentThread().getName() + i;
- queue.add(val);
- // 通過“Iterator”遍歷queue。
- printAll();
- }
- }
- }
- }
輸出結果:
- A1,
- A1, A2,
- A1, A2, A3,
- A1, A2, A3, A4,
- A1, A2, A3, A4, A5,
- A1, A2, A3, A4, A5, A6,
- A1, A2, A3, A4, A5, A6, B1,
- A1, A2, A3, A4, A5, A6, B1, B2,
- A1, A2, A3, A4, A5, A6, B1, B2, B3,
- A1, A2, A3, A4, A5, A6, B1, B2, B3, B4,
- A1, A2, A3, A4, A5, A6, B1, B2, B3, B4, B5,
- A1, A2, A3, A4, A5, A6, B1, B2, B3, B4, B5, B6,
結果說明:示例程序中,啟動兩個線程(線程A和線程B)分別對LinkedBlockingDeque進行操作:
以線程A而言,它會先獲取“線程名”+“序號”,然后將該字符串添加到LinkedBlockingDeque中;
接著,遍歷并輸出LinkedBlockingDeque中的全部元素。
線程B的操作和線程A一樣,只不過線程B的名字和線程A的名字不同。
當queue是LinkedBlockingDeque對象時,程序能正常運行。
如果將queue改為LinkedList時,程序會產生ConcurrentModificationException異常。
BlockingQueue應用
多線程生產者-消費者示例
接下來我們創建一個由兩部分組成的程序: 生產者 ( Producer ) 和消費者 ( Consumer ) 。
生產者(Producer)
生產者將生成一個 0 到 100 的隨機數(十全大補丸的編號),并將該數字放在 BlockingQueue 中。我們將創建 16 個線程(潘金蓮)用于生成隨機數并使用 put() 方法阻塞,直到隊列中有可用空間。
需要記住的重要一點是,我們需要阻止我們的消費者線程無限期地等待元素出現在隊列中。
從生產者(潘金蓮)向消費者(武大郎)發出信號的好方法是,不需要處理消息,而是發送稱為毒 ( poison ) 丸 ( pill ) 的特殊消息。 我們需要發送盡可能多的毒 ( poison ) 丸 ( pill ) ,因為我們有消費者(武大郎)。然后當消費者從隊列中獲取特殊的毒 ( poison ) 丸 ( pill )消息時,它將優雅地完成執行。
以下生產者的代碼:
- package com.niuh.queue;
- import lombok.extern.slf4j.Slf4j;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.ThreadLocalRandom;
- /**
- * 生產者(Producer)
- **/
- @Slf4j
- public class NumbersProducer implements Runnable {
- private BlockingQueue<Integer> numbersQueue;
- private final int poisonPill;
- private final int poisonPillPerProducer;
- public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
- this.numbersQueue = numbersQueue;
- this.poisonPill = poisonPill;
- this.poisonPillPerProducer = poisonPillPerProducer;
- }
- public void run() {
- try {
- generateNumbers();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- private void generateNumbers() throws InterruptedException {
- for (int i = 0; i < 100; i++) {
- numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
- log.info("潘金蓮-{}號,給武大郎的泡藥!", Thread.currentThread().getId());
- }
- /*while (true) {
- numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
- if (false) {
- break;
- }
- }*/
- for (int j = 0; j < poisonPillPerProducer; j++) {
- numbersQueue.put(poisonPill);
- log.info("潘金蓮-{}號,往武大郎的藥里放入第{}顆毒丸!", Thread.currentThread().getId(), j + 1);
- }
- }
- }
我們的生成器構造函數將 BlockingQueue 作為參數,用于協調生產者和使用者之間的處理,我們看到方法generateNumbers() 將 100 個元素(生產100副藥給武大郎吃)放入隊列中。它還需要有毒 ( poison ) 丸 ( pill ) (潘金蓮給武大郎下毒)消息,以便知道在執行完成時放入隊列的消息類型。該消息需要將 poisonPillPerProducer 次放入隊列中。
消費者(Consumer)
每個消費者將使用 take() 方法從 BlockingQueue 獲取一個元素,因此它將阻塞,直到隊列中有一個元素。從隊列中取出一個 Integer 后,它會檢查該消息是否是毒 ( poison ) 丸 ( pill )(武大郎看潘金蓮有沒有下毒) ,如果是,則完成一個線程的執行。否則,它將在標準輸出上打印出結果以及當前線程的名稱。
- package com.niuh.queue;
- import lombok.extern.slf4j.Slf4j;
- import java.util.concurrent.BlockingQueue;
- /**
- * 消費者(Consumer)
- **/
- @Slf4j
- public class NumbersConsumer implements Runnable {
- private BlockingQueue<Integer> queue;
- private final int poisonPill;
- public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
- this.queue = queue;
- this.poisonPill = poisonPill;
- }
- public void run() {
- try {
- while (true) {
- Integer number = queue.take();
- if (number.equals(poisonPill)) {
- return;
- }
- log.info("武大郎-{}號,喝藥-編號:{}", Thread.currentThread().getId(), number);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
需要注意的重要事項是隊列的使用。與生成器構造函數中的相同,隊列作為參數傳遞。我們可以這樣做,是因為 BlockingQueue 可以在線程之間共享而無需任何顯示同步。
驗證測試
既然我們有生產者和消費者,我們就可以開始我們的計劃。我們需要定義隊列的容量,并將其設置為 10個元素。 我們創建4 個生產者線程,并且創建等于可用處理器數量的消費者線程:
- package com.niuh.queue;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.LinkedBlockingQueue;
- /**
- * 多線程生產者-消費者示例
- **/
- public class Main {
- public static void main(String[] args) {
- int BOUND = 10;
- int N_PRODUCERS = 16;
- int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); //=8
- int poisonPill = Integer.MAX_VALUE;
- int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; // =0
- int mod = N_CONSUMERS % N_PRODUCERS;//0+8=8
- BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(BOUND);
- //潘金蓮給武大郎熬藥
- for (int i = 1; i < N_PRODUCERS; i++) {
- new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
- }
- //武大郎開始喝藥
- for (int j = 0; j < N_CONSUMERS; j++) {
- new Thread(new NumbersConsumer(queue, poisonPill)).start();
- }
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- //潘金蓮開始投毒,武大郎喝完毒藥GG
- new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
- }
- }
BlockingQueue 是使用具有容量的構造創建的。我們正在創造 4 個生產者和 N 個消費者(武大郎)。我們將我們的毒 ( poison ) 丸 ( pill )消息指定為 Integer.MAX_VALUE,因為我們的生產者在正常工作條件下永遠不會發送這樣的值。這里要注意的最重要的事情是 BlockingQueue 用于協調它們之間的工作。
PS:以上代碼提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git