成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

1.1w字,10圖徹底掌握阻塞隊列(并發必備)

開發 前端
隊列是一種 先進先出的特殊線性表,簡稱 FIFO。特殊之處在于只允許在一端插入,在另一端刪除,進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭。隊列中沒有元素時,稱為空隊列。

[[374753]]

本文轉載自微信公眾號「源碼興趣圈」,作者malt  。轉載本文請聯系源碼興趣圈公眾號。

什么是隊列

隊列是一種 先進先出的特殊線性表,簡稱 FIFO。特殊之處在于只允許在一端插入,在另一端刪除

進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭。隊列中沒有元素時,稱為空隊列

隊列在程序設計中使用非常的多,包括一些中間件底層數據結構就是隊列(基礎內容沒有過多講解)

 

什么是阻塞隊列

隊列就隊列唄,阻塞隊列又是什么鬼

阻塞隊列是在隊列的基礎上額外添加兩個操作的隊列,分別是:

 

  1. 支持阻塞的插入方法:隊列容量滿時,插入元素線程會被阻塞,直到隊列有多余容量為止
  2. 支持阻塞的移除方法:當隊列中無元素時,移除元素的線程會被阻塞,直到隊列有元素可被移除

文章以 LinkedBlockingQueue 為例,講述隊列之間實現的不同點,為方便小伙伴閱讀,LinkedBlockingQueue 取別名 LBQ

因為是源碼解析文章,建議小伙伴們在 PC 端觀看。當然,如果屏足夠大當我沒說~

阻塞隊列繼承關系

阻塞隊列是一個抽象的叫法,阻塞隊列底層數據結構 可以是數組,可以是單向鏈表,亦或者是雙向鏈表...

LBQ 是一個以 單向鏈表組成的隊列,下圖為 LBQ 上下繼承關系圖

從圖中得知,LBQ 實現了 BlockingQueue 接口,BlockingQueue 實現了 Queue 接口

 

Queue 接口分析

我們以自上而下的方式,先分析一波 Queue 接口里都定義了哪些方法

  1. // 如果隊列容量允許,立即將元素插入隊列,成功后返回 
  2. // 🌟如果隊列容量已滿,則拋出異常 
  3. boolean add(E e); 
  4.  
  5. //  如果隊列容量允許,立即將元素插入隊列,成功后返回 
  6. // 🌟如果隊列容量已滿,則返回 false 
  7. // 當使用有界隊列時,offer 比 add 方法更何時 
  8. boolean offer(E e); 
  9.  
  10. // 檢索并刪除隊列的頭節點,返回值為刪除的隊列頭節點 
  11. // 🌟如果隊列為空則拋出異常 
  12. E remove(); 
  13.  
  14. // 檢索并刪除隊列的頭節點,返回值為刪除的隊列頭節點 
  15. // 🌟如果隊列為空則返回 null 
  16. E poll(); 
  17.  
  18. // 檢查但不刪除隊列頭節點 
  19. // 🌟如果隊列為空則拋出異常 
  20. E element(); 
  21.  
  22. // 檢查但不刪除隊列頭節點 
  23. // 🌟如果隊列為空則返回 null 
  24. E peek(); 

總結一下 Queue 接口的方法,分為三個大類:

  1. 新增元素到隊列容器中:add、offer
  2. 從隊列容器中移除元素:remove、poll
  3. 查詢隊列頭節點是否為空:element、peek

從接口 API 的程序健壯性考慮,可以分為兩大類:

  1. 健壯 API:offer、poll、peek
  2. 非健壯 API:add、remove、element

接口 API 并無健壯可言,這里說的健壯界限指得是,使用了非健壯性的 API 接口,程序會出錯的幾率大了點,所以我們 更應該關注的是如何捕獲可能出現的異常,以及對應異常處理

BlockingQueue 接口分析

BlockingQueue 接口繼承自 Queue 接口,所以有些語義相同的 API 接口就沒有放上來解讀

  1. // 將指定元素插入隊列,如果隊列已滿,等待直到有空間可用;通過 throws 異常得知,可在等待時打斷 
  2. // 🌟相對于 Queue 接口而言,是一個全新的方法 
  3. void put(E e) throws InterruptedException; 
  4.  
  5. // 將指定元素插入隊列,如果隊列已滿,在等待指定的時間內等待騰出空間;通過 throws 異常得知,可在等待時打斷 
  6. // 🌟相當于是 offer(E e) 的擴展方法 
  7. boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; 
  8.  
  9. // 檢索并除去此隊列的頭節點,如有必要,等待直到元素可用;通過 throws 異常得知,可在等待時打斷 
  10. E take() throws InterruptedException; 
  11.  
  12. // 檢索并刪除此隊列的頭,如果有必要使元素可用,則等待指定的等待時間;通過 throws 異常得知,可在等待時打斷 
  13. // 🌟相當于是 poll() 的擴展方法 
  14. E poll(long timeout, TimeUnit unit) throws InterruptedException; 
  15.  
  16. // 返回隊列剩余容量,如果為無界隊列,返回 Integer.MAX_VALUE 
  17. int remainingCapacity(); 
  18.  
  19. // 如果此隊列包含指定的元素,則返回 true 
  20. public boolean contains(Object o); 
  21.  
  22. // 從此隊列中刪除所有可用元素,并將它們添加到給定的集合中 
  23. int drainTo(Collection<? super E> c); 
  24.  
  25. // 從此隊列中最多移除給定數量的可用元素,并將它們添加到給定的集合中 
  26. int drainTo(Collection<? super E> c, int maxElements); 

可以看到 BlockingQueue 接口中個性化的方法還是挺多的。本文的豬腳 LBQ 就是實現自 BlockingQueue 接口

源碼解析

變量分析LBQ 為了保證并發添加、移除等操作,使用了 JUC 包下的 ReentrantLock、Condition 控制

  1. // take, poll 等移除操作需要持有的鎖 
  2. private final ReentrantLock takeLock = new ReentrantLock(); 
  3. // 當隊列沒有數據時,刪除元素線程被掛起 
  4. private final Condition notEmpty = takeLock.newCondition(); 
  5. // put, offer 等新增操作需要持有的鎖 
  6. private final ReentrantLock putLock = new ReentrantLock(); 
  7. // 當隊列為空時,添加元素線程被掛起 
  8. private final Condition notFull = putLock.newCondition(); 

ArrayBlockingQueue(ABQ)內部元素個數字段為什么使用的是 int 類型的 count 變量?不擔心并發么

  1. 因為 ABQ 內部使用的一把鎖控制入隊、出隊操作,同一時刻只會有單線程執行 count 變量修改
  2. LBQ 使用的兩把鎖,所以會出現兩個線程同時修改 count 數值,如果像 ABQ 使用 int 類型,兩個流程同時執行修改 count 個數,會造成數據不準確,所以需要使用并發原子類修飾

如果不太明白為什么要用原子類統計數量,猛戳這里

接下來從結構體入手,知道它是由什么元素組成,每個元素是做啥使的。如果數據結構還不錯的小伙伴,應該可以猜出來

  1. // 綁定的容量,如果無界,則為 Integer.MAX_VALUE 
  2. private final int capacity; 
  3. // 當前隊列中元素個數 
  4. private final AtomicInteger count = new AtomicInteger(); 
  5. // 當前隊列的頭節點 
  6. transient Node<E> head; 
  7. // 當前隊列的尾節點 
  8. private transient Node<E> last

看到 head 和 last 元素,是不是對 LBQ 就有個大致的雛形了,這個時候還差一個結構體 Node

  1. static class Node<E> { 
  2.     // 節點存儲的元素 
  3.     E item; 
  4.     // 當前節點的后繼節點 
  5.     LinkedBlockingQueue.Node<E> next
  6.  
  7.     Node(E x) { item = x; } 

構造器分析這里畫一張圖來理解下 LBQ 默認構造方法是如何初始化隊列的

  1. public LinkedBlockingQueue() { 
  2.     this(Integer.MAX_VALUE); 
  3.  
  4. public LinkedBlockingQueue(int capacity) { 
  5.     if (capacity <= 0) throw new IllegalArgumentException(); 
  6.     this.capacity = capacity; 
  7.     last = head = new Node<E>(null); 

可以看出,默認構造方法會將容量設置為 Integer.MAX_VALUE,也就是大家常說的無界隊列

內部其實調用的是重載的有參構造,方法內部設置了容量大小,以及初始化了 item 為空的 Node 節點,把 head last 兩節點進行一個關聯

初始化的隊列 head last 節點指向的 Node 中 item、next 都為空,此時添加一條記錄,隊列會發生什么樣的變化

 

節點入隊

需要添加的元素會被封裝為 Node 添加到隊列中, put 入隊方法語義,如果隊列元素已滿,阻塞當前插入線程,直到隊列中有空缺位置被喚醒

  1. public void put(E e) throws InterruptedException { 
  2.     if (e == null) throw new NullPointerException(); 
  3.     int c = -1; 
  4.     Node<E> node = new Node<E>(e);  // 將需要添加的數據封裝為 Node 
  5.     final ReentrantLock putLock = this.putLock;  // 獲取添加操作的鎖 
  6.     final AtomicInteger count = this.count;  // 獲取隊列實際元素數量 
  7.     putLock.lockInterruptibly();  // 運行可被中斷加鎖 API 
  8.     try { 
  9.         while (count.get() == capacity) {  // 如果隊列元素數量 == 隊列最大值,則將線程放入條件隊列阻塞 
  10.             notFull.await(); 
  11.         } 
  12.         enqueue(node);  // 執行入隊流程 
  13.         c = count.getAndIncrement();  // 獲取值并且自增,舉例:count = 0,執行后結果值 count+1 = 2,返回 0 
  14.         if (c + 1 < capacity)  // 如果自增過的隊列元素 +1 小于隊列容器最大數量,喚醒一條被阻塞在插入等待隊列的線程 
  15.             notFull.signal(); 
  16.     } finally { 
  17.         putLock.unlock();  // 解鎖操作 
  18.     } 
  19.     if (c == 0)  // 當隊列中有一條數據,則喚醒消費組線程進行消費 
  20.         signalNotEmpty(); 

入隊方法整體流程比較清晰,做了以下幾件事:

  1. 隊列已滿,則將當前線程阻塞
  2. 隊列中如果有空缺位置,將數據封裝的 Node 執行入隊操作
  3. 如果 Node 執行入隊操作后,隊列還有空余位置,則喚醒等待隊列中的添加線程
  4. 如果數據入隊前隊列沒有元素,入隊成功后喚醒消費阻塞隊列中的線程

繼續看一下入隊方法 LBQ#enqueue 都做了什么操作

  1. private void enqueue(Node<E> node) { 
  2.     last = last.next = node; 

代碼比較簡單,先把 node 賦值為當前 last 節點的 next 屬性,然后再把 last 節點指向 node,就完成了節點入隊操作

假設 LBQ 的范型是 String 字符串,首先插入元素 a,隊列如下圖所示:

什么?一條數據不過癮?沒有什么是再來一條解決不了的,元素 b 入隊如下:

隊列入隊如上圖所示,head 中 item 永為空,last 中 next 永為空

 

LBQ#offer 也是入隊方法,不同的是:如果隊列元素已滿,則直接返回 false,不阻塞線程

節點出隊

LBQ#take 出隊方法,如果隊列中元素為空,阻塞當前出隊線程,直到隊列中有元素為止

  1. public E take() throws InterruptedException { 
  2.     E x; 
  3.     int c = -1; 
  4.     final AtomicInteger count = this.count;  // 獲取當前隊列實際元素個數 
  5.     final ReentrantLock takeLock = this.takeLtakeLocock;  // 獲取 takeLock 鎖實例 
  6.     takeLock.lockInterruptibly();  // 獲取 takeLock 鎖,獲取不到阻塞過程中,可被中斷 
  7.     try { 
  8.         while (count.get() == 0) {  // 如果當前隊列元素 == 0,當前獲取節點線程加入等待隊列 
  9.             notEmpty.await(); 
  10.         } 
  11.         x = dequeue();  // 當前隊列元素 > 0,執行頭節點出隊操作 
  12.         c = count.getAndDecrement();  // 獲取當前隊列元素個數,并將數量 - 1 
  13.         if (c > 1)  // 當隊列中還有還有元素時,喚醒下一個消費線程進行消費 
  14.             notEmpty.signal(); 
  15.     } finally { 
  16.         takeLock.unlock();  // 釋放鎖 
  17.     } 
  18.     if (c == capacity)  // 移除元素之前隊列是滿的,喚醒生產者線程添加元素 
  19.         signalNotFull(); 
  20.     return x;  // 返回頭節點 

出隊操作整體流程清晰明了,和入隊操作執行流程相似

隊列已滿,則將當前出隊線程阻塞

隊列中如果有元素可消費,執行節點出隊操作

如果節點出隊后,隊列中還有可出隊元素,則喚醒等待隊列中的出隊線程

如果移除元素之前隊列是滿的,喚醒生產者線程添加元素

LBQ#dequeue 出隊操作相對于入隊操作稍顯復雜一些

  1. private E dequeue() { 
  2.     Node<E> h = head;  // 獲取隊列頭節點 
  3.     Node<E> first = h.next;  // 獲取頭節點的后繼節點 
  4.     h.next = h; // help GC 
  5.     head = first;  // 相當于把頭節點的后繼節點,設置為新的頭節點 
  6.     E x = first.item;  // 獲取到新的頭節點 item 
  7.     first.item = null;  // 因為頭節點 item 為空,所以 item 賦值為 null 
  8.     return x; 

出隊流程中,會將原頭節點自己指向自己本身,這么做是為了幫助 GC 回收當前節點,接著將原 head 的 next 節點設置為新的 head,下圖為一個完整的出隊流程

出隊流程圖如上,流程中沒有特別注意的點。另外一個 LBQ#poll 出隊方法,如果隊列中元素為空,返回 null,不會像 take 一樣阻塞

 

節點查詢

因為 element 查找方法在父類 AbstractQueue 里實現的,LBQ 里只對 peek 方法進行了實現,節點查詢就用 peek 做代表了

peek 和 element 都是獲取隊列頭節點數據,兩者的區別是,前者如果隊列為空返回 null,后者拋出相關異常

  1. public E peek() { 
  2.     if (count.get() == 0)  // 隊列為空返回 null 
  3.         return null
  4.     final ReentrantLock takeLock = this.takeLock; 
  5.     takeLock.lock();  // 獲取鎖 
  6.     try { 
  7.         LinkedBlockingQueue.Node<E> first = head.next;  // 獲取頭節點的 next 后繼節點 
  8.         if (first == null)  // 如果后繼節點為空,返回 null,否則返回后繼節點的 item 
  9.             return null
  10.         else 
  11.             return first.item; 
  12.     } finally { 
  13.         takeLock.unlock();  // 解鎖 
  14.     } 

看到這里,能夠得到結論,雖然 head 節點 item 永遠為 null,但是 peek 方法獲取的是 head.next 節點 item

節點刪除

刪除操作需要獲得兩把鎖,所以關于獲取節點、節點出隊、節點入隊等操作都會被阻塞

  1. public boolean remove(Object o) { 
  2.     if (o == nullreturn false
  3.     fullyLock();  // 獲取兩把鎖 
  4.     try { 
  5.         // 從頭節點開始,循環遍歷隊列 
  6.         for (Node<E> trail = head, p = trail.next
  7.              p != null
  8.              trail = p, p = p.next) { 
  9.             if (o.equals(p.item)) {  // item == o 執行刪除操作 
  10.                 unlink(p, trail);  // 刪除操作 
  11.                 return true
  12.             } 
  13.         } 
  14.         return false
  15.     } finally { 
  16.         fullyUnlock();  // 釋放兩把鎖 
  17.     } 

鏈表刪除操作,一般而言都是循環逐條遍歷,而這種的 遍歷時間復雜度為 O(n),最壞情況就是遍歷了鏈表全部節點

看一下 LBQ#remove 中 unlink 是如何取消節點關聯的

  1. void unlink(Node<E> p, Node<E> trail) { 
  2.     p.item = null;  // 以第一次遍歷而言,trail 是頭節點,p 為頭節點的后繼節點 
  3.     trail.next = p.next;  // 把頭節點的后繼指針,設置為 p 節點的后繼指針 
  4.     if (last == p)  // 如果 p == last 設置 last == trail 
  5.         last = trail; 
  6.     // 如果刪除元素前隊列是滿的,刪除后就有了空余位置,喚醒生產線程 
  7.     if (count.getAndDecrement() == capacity) 
  8.         notFull.signal(); 

remove 方法和 take 方法是有相似之處,如果 remove 方法的元素是頭節點,效果和 take 一致,頭節點元素出隊

為了更好的理解,我們刪除中間元素。畫兩張圖理解下其中原委,代碼如下:

  1. public static void main(String[] args) { 
  2.     BlockingQueue<String> blockingQueue = new LinkedBlockingQueue(); 
  3.     blockingQueue.offer("a"); 
  4.     blockingQueue.offer("b"); 
  5.     blockingQueue.offer("c"); 
  6.     // 刪除隊列中間元素 
  7.     blockingQueue.remove("b"); 

執行完上述代碼中三個 offer 操作,隊列結構圖如下:

執行刪除元素 b 操作后隊列結構如下圖:

如果 p 節點就是 last 尾節點,則把 p 的前驅節點設置為新的尾節點。刪除操作大致如此

 

應用場景

上文說了阻塞隊列被大量業務場景所應用,這里例舉兩個實際工作中的例子幫助大家理解

生產者-消費者模式

生產者-消費者模式是一個典型的多線程并發寫作模式,生產者和消費者中間需要一個容器來解決強耦合關系,生產者向容器放數據,消費者消費容器數據

生產者-消費者實現有多種方式

Object 類中的 wait、notify、notifyAll

Lock 中 Condition 的 await、signal、signalAll

BlockingQueue

阻塞隊列實現生產者-消費者模型代碼如下:

  1. @Slf4j 
  2. public class BlockingQueueTest { 
  3.  
  4.     private static final int MAX_NUM = 10; 
  5.     private static final BlockingQueue<String> QUEUE = new LinkedBlockingQueue<>(MAX_NUM); 
  6.  
  7.     public void produce(String str) { 
  8.         try { 
  9.             QUEUE.put(str); 
  10.             log.info("  🔥🔥🔥 隊列放入元素 :: {}, 隊列元素數量 :: {}", str, QUEUE.size()); 
  11.         } catch (InterruptedException ie) { 
  12.             // ignore 
  13.         } 
  14.     } 
  15.  
  16.     public String consume() { 
  17.         String str = null
  18.         try { 
  19.             str = QUEUE.take(); 
  20.             log.info("  🔥🔥🔥 隊列移出元素 :: {}, 隊列元素數量 :: {}", str, QUEUE.size()); 
  21.         } catch (InterruptedException ie) { 
  22.             // ignore 
  23.         } 
  24.         return str; 
  25.     } 
  26.  
  27.     public static void main(String[] args) { 
  28.         BlockingQueueTest queueTest = new BlockingQueueTest(); 
  29.         for (int i = 0; i < 5; i++) { 
  30.             int finalI = i; 
  31.             new Thread(() -> { 
  32.                 String str = "元素-"
  33.                 while (true) { 
  34.                     queueTest.produce(str + finalI); 
  35.                 } 
  36.             }).start(); 
  37.         } 
  38.         for (int i = 0; i < 5; i++) { 
  39.             new Thread(() -> { 
  40.                 while (true) { 
  41.                     queueTest.consume(); 
  42.                 } 
  43.             }).start(); 
  44.         } 
  45.     } 

線程池應用

阻塞隊列在線程池中的具體應用屬于是生產者-消費者的實際場景

  1. 線程池在 Java 應用里的重要性不言而喻,這里簡要說下線程池的運行原理
  2. 線程池線程數量小于核心線程數執行新增核心線程操作
  3. 線程池線程數量大于或等于核心線程數時,將任務存放阻塞隊列

滿足線程池中線程數大于或等于核心線程數并且阻塞隊列已滿, 線程池創建非核心線程

重點在于第二點,當線程池核心線程都在運行任務時,會把任務存放阻塞隊列中。線程池源碼如下:

  1. if (isRunning(c) && workQueue.offer(command)) {} 

看到使用的 offer 方法,通過上面講述,如果阻塞隊列已滿返回 false。那何時進行消費隊列中的元素呢。涉及線程池中線程執行過程原理,這里簡單說明

線程池內線程執行任務有兩種方式,一種是創建核心線程時 自帶 的任務,另一種就是從阻塞隊列獲取

當核心線程執行一次任務后,其實和非核心線程就沒什么區別了

線程池獲取阻塞隊列任務使用了兩種 API,分別是 poll 和 take

  1. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); 

Q:為啥要用兩個 API?一個不香么?

A:take 是為了要維護線程池內核心線程的重要手段,如果獲取不到任務,線程被掛起,等待下一次任務添加

至于帶時間的 pool 則是為了回收非核心線程準備的

結言LBQ 阻塞隊列到這里就講解完成了,總結下文章所講述的 LBQ 基本特征

LBQ 是基于鏈表實現的阻塞隊列,可以進行讀寫并發執行

LBQ 隊列容量可以自己設置,如果不設置默認 Integer 最大值,也可以稱為無界隊列

文章結合源碼,針對 LBQ 的入隊、出隊、查詢、刪除等操作進行了詳細講解

LBQ 只是一個引子,更希望大家能夠通過文章 掌握阻塞隊列核心思想,繼而查看其它實現類的代碼,鞏固知識

 

小伙伴現在已經知道 LBQ 是通過鎖的機制來實現并發安全控制,思考一下 不使用鎖,能否實現以及如何實現?

 

責任編輯:武曉燕 來源: 源碼興趣圈
相關推薦

2020-10-16 08:26:38

AQS通信協作

2023-12-15 09:45:21

阻塞接口

2025-04-22 08:32:50

2025-05-20 08:31:19

2021-01-22 17:57:31

SQL數據庫函數

2024-10-14 12:34:08

2021-08-11 22:17:48

負載均衡LVS機制

2024-08-06 08:13:26

2020-10-14 11:31:41

Docker

2019-07-23 11:01:57

Python同步異步

2024-06-21 09:27:05

2021-01-13 14:42:36

GitHub代碼Java

2020-11-20 06:22:02

LinkedBlock

2020-11-24 09:04:55

PriorityBlo

2021-07-24 11:15:19

開發技能代碼

2020-11-19 07:41:51

ArrayBlocki

2020-11-25 14:28:56

DelayedWork

2020-07-08 08:07:23

高并發系統消息隊列

2020-11-03 10:32:48

回調函數模塊

2023-11-03 18:03:54

Web應用Python
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 日韩在线欧美 | 亚洲精品久久久久国产 | 精品www | 日日摸日日碰夜夜爽亚洲精品蜜乳 | 91精品久久久久久久久久入口 | 国产欧美精品区一区二区三区 | 国产成人a亚洲精品 | 成人激情免费视频 | 国产 日韩 欧美 中文 在线播放 | 日日摸夜夜添夜夜添精品视频 | 国产精品区二区三区日本 | 国产精品激情在线 | 精品免费国产视频 | av中文字幕在线 | 精品日本久久久久久久久久 | 一二区成人影院电影网 | 狠狠干2020| 日韩欧美国产一区二区三区 | 久久综合一区 | 欧美xxxx性 | 国产精品国产a级 | 国产一级视频免费播放 | 黄色一级免费 | 成人午夜在线视频 | 五月天国产在线 | 成人在线视频免费播放 | 欧美在线综合 | 国产精品一区久久久 | 亚洲欧美综合网 | 一区二区视频免费观看 | 午夜性色a√在线视频观看9 | 全免一级毛片 | 国产在线看片 | 日日骚av | 国产综合久久 | 亚洲网站在线观看 | 日韩在线中文字幕 | 中文字幕在线一区 | 嫩草视频在线 | 亚洲欧美日韩网站 | 亚洲精品1区 |