成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Java阻塞隊列實現原理分析

開發 后端
本文以ArrayBlockingQueue和LinkedBlockingQueue為例,分析它們的實現原理。

Java 阻塞隊列實現原理分析

Java中的阻塞隊列接口BlockingQueue繼承自Queue接口。

BlockingQueue接口提供了3個添加元素方法:

  • add:添加元素到隊列里,添加成功返回true,由于容量滿了添加失敗會拋出IllegalStateException異常;
  • offer:添加元素到隊列里,添加成功返回true,添加失敗返回false;
  • put:添加元素到隊列里,如果容量滿了會阻塞直到容量不滿。

3個刪除方法:

  • poll:刪除隊列頭部元素,如果隊列為空,返回null。否則返回元素;
  • remove:基于對象找到對應的元素,并刪除。刪除成功返回true,否則返回false;
  • take:刪除隊列頭部元素,如果隊列為空,一直阻塞到隊列有元素并刪除。

常用的阻塞隊列具體類有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、LinkedBlockingDeque等。

本文以ArrayBlockingQueue和LinkedBlockingQueue為例,分析它們的實現原理。

ArrayBlockingQueue

ArrayBlockingQueue的原理就是使用一個可重入鎖和這個鎖生成的兩個條件對象進行并發控制(classic two-condition algorithm)。

ArrayBlockingQueue是一個帶有長度的阻塞隊列,初始化的時候必須要指定隊列長度,且指定長度之后不允許進行修改。

它帶有的屬性如下:

  1. // 存儲隊列元素的數組,是個循環數組 
  2.  
  3. final Object[] items; 
  4.  
  5.   
  6.  
  7. // 拿數據的索引,用于take,poll,peek,remove方法 
  8.  
  9. int takeIndex; 
  10.  
  11.   
  12.  
  13. // 放數據的索引,用于put,offer,add方法 
  14.  
  15. int putIndex; 
  16.  
  17.   
  18.  
  19. // 元素個數 
  20.  
  21. int count
  22.  
  23.   
  24.  
  25. // 可重入鎖 
  26.  
  27. final ReentrantLock lock; 
  28.  
  29. // notEmpty條件對象,由lock創建 
  30.  
  31. private final Condition notEmpty; 
  32.  
  33. // notFull條件對象,由lock創建 
  34.  
  35. private final Condition notFull;  

數據的添加

ArrayBlockingQueue有不同的幾個數據添加方法,add、offer、put方法。

add方法:

  1. public boolean add(E e) { 
  2.  
  3.     if (offer(e)) 
  4.  
  5.         return true
  6.  
  7.     else 
  8.  
  9.         throw new IllegalStateException("Queue full"); 
  10.  
  11.  

add方法內部調用offer方法如下:

  1. public boolean offer(E e) { 
  2.  
  3.     checkNotNull(e); // 不允許元素為空 
  4.  
  5.     final ReentrantLock lock = this.lock; 
  6.  
  7.     lock.lock(); // 加鎖,保證調用offer方法的時候只有1個線程 
  8.  
  9.     try { 
  10.  
  11.         if (count == items.length) // 如果隊列已滿 
  12.  
  13.             return false; // 直接返回false,添加失敗 
  14.  
  15.         else { 
  16.  
  17.             insert(e); // 數組沒滿的話調用insert方法 
  18.  
  19.             return true; // 返回true,添加成功 
  20.  
  21.         } 
  22.  
  23.     } finally { 
  24.  
  25.         lock.unlock(); // 釋放鎖,讓其他線程可以調用offer方法 
  26.  
  27.     } 
  28.  
  29.  

insert方法如下:

  1. private void insert(E x) { 
  2.  
  3.     items[putIndex] = x; // 元素添加到數組里 
  4.  
  5.     putIndex = inc(putIndex); // 放數據索引+1,當索引滿了變成0 
  6.  
  7.     ++count; // 元素個數+1 
  8.  
  9.     notEmpty.signal(); // 使用條件對象notEmpty通知,比如使用take方法的時候隊列里沒有數據,被阻塞。這個時候隊列insert了一條數據,需要調用signal進行通知 
  10.  
  11.  

put方法:

  1. public void put(E e) throws InterruptedException { 
  2.  
  3.     checkNotNull(e); // 不允許元素為空 
  4.  
  5.     final ReentrantLock lock = this.lock; 
  6.  
  7.     lock.lockInterruptibly(); // 加鎖,保證調用put方法的時候只有1個線程 
  8.  
  9.     try { 
  10.  
  11.         while (count == items.length) // 如果隊列滿了,阻塞當前線程,并加入到條件對象notFull的等待隊列里 
  12.  
  13.             notFull.await(); // 線程阻塞并被掛起,同時釋放鎖 
  14.  
  15.         insert(e); // 調用insert方法 
  16.  
  17.     } finally { 
  18.  
  19.         lock.unlock(); // 釋放鎖,讓其他線程可以調用put方法 
  20.  
  21.     } 
  22.  
  23.  

ArrayBlockingQueue的添加數據方法有add,put,offer這3個方法,總結如下:

add方法內部調用offer方法,如果隊列滿了,拋出IllegalStateException異常,否則返回true

offer方法如果隊列滿了,返回false,否則返回true

add方法和offer方法不會阻塞線程,put方法如果隊列滿了會阻塞線程,直到有線程消費了隊列里的數據才有可能被喚醒。

這3個方法內部都會使用可重入鎖保證原子性。

數據的刪除

ArrayBlockingQueue有不同的幾個數據刪除方法,poll、take、remove方法。

poll方法:

  1. public E poll() { 
  2.  
  3.     final ReentrantLock lock = this.lock; 
  4.  
  5.     lock.lock(); // 加鎖,保證調用poll方法的時候只有1個線程 
  6.  
  7.     try { 
  8.  
  9.         return (count == 0) ? null : extract(); // 如果隊列里沒元素了,返回null,否則調用extract方法 
  10.  
  11.     } finally { 
  12.  
  13.         lock.unlock(); // 釋放鎖,讓其他線程可以調用poll方法 
  14.  
  15.     } 
  16.  
  17.  

poll方法內部調用extract方法:

  1. private E extract() { 
  2.  
  3.     final Object[] items = this.items; 
  4.  
  5.     E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素 
  6.  
  7.     items[takeIndex] = null; // 對應取索引上的數據清空 
  8.  
  9.     takeIndex = inc(takeIndex); // 取數據索引+1,當索引滿了變成0 
  10.  
  11.     --count; // 元素個數-1 
  12.  
  13.     notFull.signal(); // 使用條件對象notFull通知,比如使用put方法放數據的時候隊列已滿,被阻塞。這個時候消費了一條數據,隊列沒滿了,就需要調用signal進行通知 
  14.  
  15.     return x; // 返回元素 
  16.  
  17.  

take方法:

  1. public E take() throws InterruptedException { 
  2.  
  3.     final ReentrantLock lock = this.lock; 
  4.  
  5.     lock.lockInterruptibly(); // 加鎖,保證調用take方法的時候只有1個線程 
  6.  
  7.     try { 
  8.  
  9.         while (count == 0) // 如果隊列空,阻塞當前線程,并加入到條件對象notEmpty的等待隊列里 
  10.  
  11.             notEmpty.await(); // 線程阻塞并被掛起,同時釋放鎖 
  12.  
  13.         return extract(); // 調用extract方法 
  14.  
  15.     } finally { 
  16.  
  17.         lock.unlock(); // 釋放鎖,讓其他線程可以調用take方法 
  18.  
  19.     } 
  20.  
  21.  

remove方法:

  1. public boolean remove(Object o) { 
  2.  
  3.     if (o == nullreturn false
  4.  
  5.     final Object[] items = this.items; 
  6.  
  7.     final ReentrantLock lock = this.lock; 
  8.  
  9.     lock.lock(); // 加鎖,保證調用remove方法的時候只有1個線程 
  10.  
  11.     try { 
  12.  
  13.         for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍歷元素 
  14.  
  15.             if (o.equals(items[i])) { // 兩個對象相等的話 
  16.  
  17.                 removeAt(i); // 調用removeAt方法 
  18.  
  19.                 return true; // 刪除成功,返回true 
  20.  
  21.             } 
  22.  
  23.         } 
  24.  
  25.         return false; // 刪除成功,返回false 
  26.  
  27.     } finally { 
  28.  
  29.         lock.unlock(); // 釋放鎖,讓其他線程可以調用remove方法 
  30.  
  31.     } 
  32.  
  33.  

removeAt方法:

  1. void removeAt(int i) { 
  2.  
  3.     final Object[] items = this.items; 
  4.  
  5.     if (i == takeIndex) { // 如果要刪除數據的索引是取索引位置,直接刪除取索引位置上的數據,然后取索引+1即可 
  6.  
  7.         items[takeIndex] = null
  8.  
  9.         takeIndex = inc(takeIndex); 
  10.  
  11.     } else { // 如果要刪除數據的索引不是取索引位置,移動元素元素,更新取索引和放索引的值 
  12.  
  13.         for (;;) { 
  14.  
  15.             int nexti = inc(i); 
  16.  
  17.             if (nexti != putIndex) { 
  18.  
  19.                 items[i] = items[nexti]; 
  20.  
  21.                 i = nexti; 
  22.  
  23.             } else { 
  24.  
  25.                 items[i] = null
  26.  
  27.                 putIndex = i; 
  28.  
  29.                 break; 
  30.  
  31.             } 
  32.  
  33.         } 
  34.  
  35.     } 
  36.  
  37.     --count; // 元素個數-1 
  38.  
  39.     notFull.signal(); // 使用條件對象notFull通知,比如使用put方法放數據的時候隊列已滿,被阻塞。這個時候消費了一條數據,隊列沒滿了,就需要調用signal進行通知  
  40.  
  41.  

ArrayBlockingQueue的刪除數據方法有poll,take,remove這3個方法,總結如下:

poll方法對于隊列為空的情況,返回null,否則返回隊列頭部元素。

remove方法取的元素是基于對象的下標值,刪除成功返回true,否則返回false。

poll方法和remove方法不會阻塞線程。

take方法對于隊列為空的情況,會阻塞并掛起當前線程,直到有數據加入到隊列中。

這3個方法內部都會調用notFull.signal方法通知正在等待隊列滿情況下的阻塞線程。

LinkedBlockingQueue

LinkedBlockingQueue是一個使用鏈表完成隊列操作的阻塞隊列。鏈表是單向鏈表,而不是雙向鏈表。

內部使用放鎖和拿鎖,這兩個鎖實現阻塞(“two lock queue” algorithm)。

它帶有的屬性如下:

  1. // 容量大小 
  2.  
  3. private final int capacity; 
  4.  
  5.   
  6.  
  7. // 元素個數,因為有2個鎖,存在競態條件,使用AtomicInteger 
  8.  
  9. private final AtomicInteger count = new AtomicInteger(0); 
  10.  
  11.   
  12.  
  13. // 頭結點 
  14.  
  15. private transient Node<E> head; 
  16.  
  17.   
  18.  
  19. // 尾節點 
  20.  
  21. private transient Node<E> last
  22.  
  23.   
  24.  
  25. // 拿鎖 
  26.  
  27. private final ReentrantLock takeLock = new ReentrantLock(); 
  28.  
  29.   
  30.  
  31. // 拿鎖的條件對象 
  32.  
  33. private final Condition notEmpty = takeLock.newCondition(); 
  34.  
  35.   
  36.  
  37. // 放鎖 
  38.  
  39. private final ReentrantLock putLock = new ReentrantLock(); 
  40.  
  41.   
  42.  
  43. // 放鎖的條件對象 
  44.  
  45. private final Condition notFull = putLock.newCondition();  

ArrayBlockingQueue只有1個鎖,添加數據和刪除數據的時候只能有1個被執行,不允許并行執行。

而LinkedBlockingQueue有2個鎖,放鎖和拿鎖,添加數據和刪除數據是可以并行進行的,當然添加數據和刪除數據的時候只能有1個線程各自執行。

數據的添加

LinkedBlockingQueue有不同的幾個數據添加方法,add、offer、put方法。

add方法內部調用offer方法:

  1. public boolean offer(E e) { 
  2.  
  3.     if (e == null) throw new NullPointerException(); // 不允許空元素 
  4.  
  5.     final AtomicInteger count = this.count
  6.  
  7.     if (count.get() == capacity) // 如果容量滿了,返回false 
  8.  
  9.         return false
  10.  
  11.     int c = -1; 
  12.  
  13.     Node<E> node = new Node(e); // 容量沒滿,以新元素構造節點 
  14.  
  15.     final ReentrantLock putLock = this.putLock; 
  16.  
  17.     putLock.lock(); // 放鎖加鎖,保證調用offer方法的時候只有1個線程 
  18.  
  19.     try { 
  20.  
  21.         if (count.get() < capacity) { // 再次判斷容量是否已滿,因為可能拿鎖在進行消費數據,沒滿的話繼續執行 
  22.  
  23.             enqueue(node); // 節點添加到鏈表尾部 
  24.  
  25.             c = count.getAndIncrement(); // 元素個數+1 
  26.  
  27.             if (c + 1 < capacity) // 如果容量還沒滿 
  28.  
  29.                 notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示可以再次往隊列里面加數據了,隊列還沒滿 
  30.  
  31.         } 
  32.  
  33.     } finally { 
  34.  
  35.         putLock.unlock(); // 釋放放鎖,讓其他線程可以調用offer方法 
  36.  
  37.     } 
  38.  
  39.     if (c == 0) // 由于存在放鎖和拿鎖,這里可能拿鎖一直在消費數據,count會變化。這里的if條件表示如果隊列中還有1條數據 
  40.  
  41.         signalNotEmpty(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列里還有1條數據,可以進行消費 
  42.  
  43.     return c >= 0; // 添加成功返回true,否則返回false 
  44.  
  45.  

put方法:

  1. public void put(E e) throws InterruptedException { 
  2.  
  3.     if (e == null) throw new NullPointerException(); // 不允許空元素 
  4.  
  5.     int c = -1; 
  6.  
  7.     Node<E> node = new Node(e); // 以新元素構造節點 
  8.  
  9.     final ReentrantLock putLock = this.putLock; 
  10.  
  11.     final AtomicInteger count = this.count
  12.  
  13.     putLock.lockInterruptibly(); // 放鎖加鎖,保證調用put方法的時候只有1個線程 
  14.  
  15.     try { 
  16.  
  17.         while (count.get() == capacity) { // 如果容量滿了 
  18.  
  19.             notFull.await(); // 阻塞并掛起當前線程 
  20.  
  21.         } 
  22.  
  23.         enqueue(node); // 節點添加到鏈表尾部 
  24.  
  25.         c = count.getAndIncrement(); // 元素個數+1 
  26.  
  27.         if (c + 1 < capacity) // 如果容量還沒滿 
  28.  
  29.             notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示可以再次往隊列里面加數據了,隊列還沒滿 
  30.  
  31.     } finally { 
  32.  
  33.         putLock.unlock(); // 釋放放鎖,讓其他線程可以調用put方法 
  34.  
  35.     } 
  36.  
  37.     if (c == 0) // 由于存在放鎖和拿鎖,這里可能拿鎖一直在消費數據,count會變化。這里的if條件表示如果隊列中還有1條數據 
  38.  
  39.         signalNotEmpty(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列里還有1條數據,可以進行消費 
  40.  
  41.  

LinkedBlockingQueue的添加數據方法add,put,offer跟ArrayBlockingQueue一樣,不同的是它們的底層實現不一樣。

ArrayBlockingQueue中放入數據阻塞的時候,需要消費數據才能喚醒。

而LinkedBlockingQueue中放入數據阻塞的時候,因為它內部有2個鎖,可以并行執行放入數據和消費數據,不僅在消費數據的時候進行喚醒插入阻塞的線程,同時在插入的時候如果容量還沒滿,也會喚醒插入阻塞的線程。

數據的刪除

LinkedBlockingQueue有不同的幾個數據刪除方法,poll、take、remove方法。

poll方法:

  1. public E poll() { 
  2.  
  3.     final AtomicInteger count = this.count
  4.  
  5.     if (count.get() == 0) // 如果元素個數為0 
  6.  
  7.         return null; // 返回null 
  8.  
  9.     E x = null
  10.  
  11.     int c = -1; 
  12.  
  13.     final ReentrantLock takeLock = this.takeLock; 
  14.  
  15.     takeLock.lock(); // 拿鎖加鎖,保證調用poll方法的時候只有1個線程 
  16.  
  17.     try { 
  18.  
  19.         if (count.get() > 0) { // 判斷隊列里是否還有數據 
  20.  
  21.             x = dequeue(); // 刪除頭結點 
  22.  
  23.             c = count.getAndDecrement(); // 元素個數-1 
  24.  
  25.             if (c > 1) // 如果隊列里還有元素 
  26.  
  27.                 notEmpty.signal(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列里還有數據,可以再次消費 
  28.  
  29.         } 
  30.  
  31.     } finally { 
  32.  
  33.         takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調用poll方法 
  34.  
  35.     } 
  36.  
  37.     if (c == capacity) // 由于存在放鎖和拿鎖,這里可能放鎖一直在添加數據,count會變化。這里的if條件表示如果隊列中還可以再插入數據 
  38.  
  39.         signalNotFull(); // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列里還能再次添加數據 
  40.  
  41.                 return x; 
  42.  

 

take方法:

  1. public E take() throws InterruptedException { 
  2.  
  3.     E x; 
  4.  
  5.     int c = -1; 
  6.  
  7.     final AtomicInteger count = this.count
  8.  
  9.     final ReentrantLock takeLock = this.takeLock; 
  10.  
  11.     takeLock.lockInterruptibly(); // 拿鎖加鎖,保證調用take方法的時候只有1個線程 
  12.  
  13.     try { 
  14.  
  15.         while (count.get() == 0) { // 如果隊列里已經沒有元素了 
  16.  
  17.             notEmpty.await(); // 阻塞并掛起當前線程 
  18.  
  19.         } 
  20.  
  21.         x = dequeue(); // 刪除頭結點 
  22.  
  23.         c = count.getAndDecrement(); // 元素個數-1 
  24.  
  25.         if (c > 1) // 如果隊列里還有元素 
  26.  
  27.             notEmpty.signal(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列里還有數據,可以再次消費 
  28.  
  29.     } finally { 
  30.  
  31.         takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調用take方法 
  32.  
  33.     } 
  34.  
  35.     if (c == capacity) // 由于存在放鎖和拿鎖,這里可能放鎖一直在添加數據,count會變化。這里的if條件表示如果隊列中還可以再插入數據 
  36.  
  37.         signalNotFull(); // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列里還能再次添加數據 
  38.  
  39.     return x; 
  40.  

 

remove方法:

  1. public boolean remove(Object o) { 
  2.  
  3.     if (o == nullreturn false
  4.  
  5.     fullyLock(); // remove操作要移動的位置不固定,2個鎖都需要加鎖 
  6.  
  7.     try { 
  8.  
  9.         for (Node<E> trail = head, p = trail.next; // 從鏈表頭結點開始遍歷 
  10.  
  11.              p != null
  12.  
  13.              trail = p, p = p.next) { 
  14.  
  15.             if (o.equals(p.item)) { // 判斷是否找到對象 
  16.  
  17.                 unlink(p, trail); // 修改節點的鏈接信息,同時調用notFull的signal方法 
  18.  
  19.                 return true
  20.  
  21.             } 
  22.  
  23.         } 
  24.  
  25.         return false
  26.  
  27.     } finally { 
  28.  
  29.         fullyUnlock(); // 2個鎖解鎖 
  30.  
  31.     } 
  32.  

 

LinkedBlockingQueue的take方法對于沒數據的情況下會阻塞,poll方法刪除鏈表頭結點,remove方法刪除指定的對象。

需要注意的是remove方法由于要刪除的數據的位置不確定,需要2個鎖同時加鎖。

責任編輯:龐桂玉 來源: 程序源
相關推薦

2020-11-20 06:22:02

LinkedBlock

2020-11-24 09:04:55

PriorityBlo

2020-11-19 07:41:51

ArrayBlocki

2020-11-25 14:28:56

DelayedWork

2012-06-14 10:34:40

Java阻塞搜索實例

2024-07-16 18:05:19

延遲隊列MQRabbitMQ

2025-01-14 00:00:00

Blocking隊列元素

2023-12-05 13:46:09

解密協程線程隊列

2021-06-04 18:14:15

阻塞非阻塞tcp

2022-06-30 14:31:57

Java阻塞隊列

2012-04-11 15:41:48

JavaNIO

2018-10-31 15:54:47

Java線程池源碼

2021-07-12 09:17:54

Memory Comp系統內存

2009-03-26 13:43:59

實現Order ByMySQL

2021-03-01 23:31:48

隊列實現棧存儲

2023-12-15 09:45:21

阻塞接口

2009-04-02 10:23:13

實現JoinMySQL

2015-06-15 10:12:36

Java原理分析

2023-02-07 09:17:19

Java注解原理

2023-10-08 00:02:07

Java排序算法
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 一级国产精品一级国产精品片 | 久久亚洲精品视频 | 成人在线视频一区 | 黄色一级大片在线免费看产 | 欧美成人精品 | 午夜爱爱毛片xxxx视频免费看 | 欧美日韩国产高清视频 | 中文字幕国产第一页 | 欧美va大片 | 亚洲欧美在线视频 | 中文字幕高清 | 国产激情在线观看 | 精品国产伦一区二区三区观看体验 | 国产清纯白嫩初高生在线播放视频 | 日韩精品视频一区二区三区 | 欧美在线视频一区 | 91视频观看| 美日韩视频| 中文在线一区二区 | 成人高清视频在线观看 | 黄网站涩免费蜜桃网站 | 久久久一区二区三区四区 | www久久99 | 伊人伊人 | 人人人干 | 久草新在线 | 亚洲精品一区二区 | 久久久久久一区 | 精品网| 最新91在线 | 精品视频一区二区 | 欧美 日韩 国产 成人 在线 | 永久免费视频 | 亚洲人成人一区二区在线观看 | 欧美理论 | 亚洲www啪成人一区二区麻豆 | 国产精品日产欧美久久久久 | 国精品一区二区 | 国产高清免费在线 | 91成人午夜性a一级毛片 | 中文字幕视频在线看5 |