沒看過ArrayBlockingQueue源碼,就別說精通線程池
引言
在日常開發中,我們好像很少用到BlockingQueue(阻塞隊列),BlockingQueue到底有什么作用?應用場景是什么樣的?
如果使用過線程池或者閱讀過線程池源碼,就會知道線程池的核心功能都是基于BlockingQueue實現的。
大家用過消息隊列(MessageQueue),就知道消息隊列作用是解耦、異步、削峰。同樣BlockingQueue的作用也是這三種,區別是BlockingQueue只作用于本機器,而消息隊列相當于分布式BlockingQueue。
BlockingQueue作為阻塞隊列,主要應用于生產者-消費者模式的場景,在并發多線程中尤其常用。
- 比如像線程池中的任務調度場景,提交任務和拉取并執行任務。
- 生產者與消費者解耦的場景,生產者把數據放到隊列中,消費者從隊列中取數據進行消費。兩者進行解耦,不用感知對方的存在。
- 應對突發流量的場景,業務高峰期突然來了很多請求,可以放到隊列中緩存起來,消費者以正常的頻率從隊列中拉取并消費數據,起到削峰的作用。
BlockingQueue是個接口,定義了幾組放數據和取數據的方法,來滿足不同的場景。
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
放數據 | add() | offer() | put() | offer(e, time, unit) |
取數據(同時刪除數據) | remove() | poll() | take() | poll(time, unit) |
取數據(不刪除) | element() | peek() | 不支持 | 不支持 |
BlockingQueue有5個常見的實現類,應用場景不同。
- ArrayBlockingQueue
基于數組實現的阻塞隊列,創建隊列時需指定容量大小,是有界隊列。
- LinkedBlockingQueue
基于鏈表實現的阻塞隊列,默認是無界隊列,創建可以指定容量大小
- SynchronousQueue
一種沒有緩沖的阻塞隊列,生產出的數據需要立刻被消費
- PriorityBlockingQueue
實現了優先級的阻塞隊列,基于數據顯示,是無界隊列
- DelayQueue
實現了延遲功能的阻塞隊列,基于PriorityQueue實現的,是無界隊列
今天重點講一下ArrayBlockingQueue的底層實現原理,在接下來的文章中再講一下其他隊列實現。
ArrayBlockingQueue類結構
先看一下ArrayBlockingQueue類里面有哪些屬性:
public class ArrayBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 用來存放數據的數組
*/
final Object[] items;
/**
* 下次取數據的數組下標位置
*/
int takeIndex;
/**
* 下次放數據的數組下標位置
*/
int putIndex;
/**
* 元素個數
*/
int count;
/**
* 獨占鎖,用來保證存取數據安全
*/
final ReentrantLock lock;
/**
* 取數據的條件
*/
private final Condition notEmpty;
/**
* 放數據的條件
*/
private final Condition notFull;
}
可以看出ArrayBlockingQueue底層是基于數組實現的,使用對象數組items存儲元素。為了實現隊列特性(一端插入,另一端刪除),定義了兩個指針,takeIndex表示下次取數據的位置,putIndex表示下次放數據的位置。 另外ArrayBlockingQueue還使用ReentrantLock保證線程安全,并且定義了兩個條件,當條件滿足的時候才允許放數據或者取數據,下面會詳細講。
初始化
ArrayBlockingQueue常用的初始化方法有兩個:
- 指定容量大小
- 指定容量大小和是否是公平鎖
/**
* 指定容量大小的構造方法
*/
BlockingQueue<Integer> blockingDeque1 = new ArrayBlockingQueue<>(1);
/**
* 指定容量大小、公平鎖的構造方法
*/
BlockingQueue<Integer> blockingDeque1 = new ArrayBlockingQueue<>(1, true);
再看一下對應的源碼實現:
/**
* 指定容量大小的構造方法(默認是非公平鎖)
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* 指定容量大小、公平鎖的構造方法
*
* @param capacity 數組容量
* @param fair 是否是公平鎖
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0) {
throw new IllegalArgumentException();
}
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
放數據源碼
放數據的方法有四個:
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
放數據 | add() | offer() | put() | offer(e, time, unit) |
offer方法源碼
先看一下offer()方法源碼,其他方法邏輯也是大同小異。 無論是放數據還是取數據,都是從隊頭開始,向隊尾移動。
圖片
/**
* offer方法入口
*
* @param e 元素
* @return 是否插入成功
*/
public boolean offer(E e) {
// 1. 判空,傳參不允許為null
checkNotNull(e);
// 2. 加鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 3. 判斷數組是否已滿,如果滿了就直接返回false結束
if (count == items.length) {
return false;
} else {
// 4. 否則就插入
enqueue(e);
return true;
}
} finally {
// 5. 釋放鎖
lock.unlock();
}
}
/**
* 入隊
*
* @param x 元素
*/
private void enqueue(E x) {
// 1. 獲取數組
final Object[] items = this.items;
// 2. 直接放入數組
items[putIndex] = x;
// 3. 移動putIndex位置,如果到達數組的末尾就從頭開始
if (++putIndex == items.length) {
putIndex = 0;
}
// 4. 計數
count++;
// 5. 喚醒因為隊列為空,等待取數據的線程
notEmpty.signal();
}
offer()在數組滿的時候,會返回false,表示添加失敗。 為了循環利用數組,添加元素的時候如果已經到了隊尾,就從隊頭重新開始,相當于一個循環隊列,像下面這樣:
圖片
add方法源碼
再看一下另外三個添加元素方法源碼: add()方法在數組滿的時候,會拋出異常,底層基于offer()實現。
/**
* add方法入口
*
* @param e 元素
* @return 是否添加成功
*/
public boolean add(E e) {
if (offer(e)) {
return true;
} else {
throw new IllegalStateException("Queue full");
}
}
put方法源碼
put()方法在數組滿的時候,會一直阻塞,直到有其他線程取走數據,空出位置,才能添加成功。
/**
* put方法入口
*
* @param e 元素
*/
public void put(E e) throws InterruptedException {
// 1. 判空,傳參不允許為null
checkNotNull(e);
// 2. 加可中斷的鎖,防止一直阻塞
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 3. 如果隊列已滿,就一直阻塞,直到被喚醒
while (count == items.length) {
notFull.await();
}
// 4. 如果隊列未滿,直接入隊
enqueue(e);
} finally {
// 5. 釋放鎖
lock.unlock();
}
}
offer(e, time, unit)源碼
再看一下offer(e, time, unit)方法源碼,在數組滿的時候, offer(e, time, unit)方法會阻塞一段時間。
/**
* offer方法入口
*
* @param e 元素
* @param timeout 超時時間
* @param unit 時間單位
* @return 是否添加成功
*/
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 1. 判空,傳參不允許為null
checkNotNull(e);
// 2. 把超時時間轉換為納秒
long nanos = unit.toNanos(timeout);
// 3. 加可中斷的鎖,防止一直阻塞
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 4. 循環判斷隊列是否已滿
while (count == items.length) {
if (nanos <= 0) {
// 6. 如果隊列已滿,且超時時間已過,則返回false
return false;
}
// 5. 如果隊列已滿,則等待指定時間
nanos = notFull.awaitNanos(nanos);
}
// 7. 如果隊列未滿,則入隊
enqueue(e);
return true;
} finally {
// 8. 釋放鎖
lock.unlock();
}
}
彈出數據源碼
彈出數據(取出數據并刪除)的方法有四個:
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
取數據(同時刪除數據) | remove() | poll() | take() | poll(time, unit) |
poll方法源碼
看一下poll()方法源碼,其他方法邏輯大同小異。 poll()方法在彈出元素的時候,如果數組為空,則返回null,表示彈出失敗。
/**
* poll方法入口
*/
public E poll() {
// 1. 加鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 2. 如果數組為空,則返回null,否則返回隊列頭部元素
return (count == 0) ? null : dequeue();
} finally {
// 3. 釋放鎖
lock.unlock();
}
}
/**
* 出列
*/
private E dequeue() {
// 1. 取出隊列頭部元素
final Object[] items = this.items;
E x = (E) items[takeIndex];
// 2. 取出元素后,把該位置置空
items[takeIndex] = null;
// 3. 移動takeIndex位置,如果到達數組的末尾就從頭開始
if (++takeIndex == items.length) {
takeIndex = 0;
}
// 4. 元素個數減一
count--;
if (itrs != null) {
itrs.elementDequeued();
}
// 5. 喚醒因為隊列已滿,等待放數據的線程
notFull.signal();
return x;
}
可見取數據跟放數據一樣,都是循環遍歷數組。
remove方法源碼
再看一下remove()方法源碼,如果數組為空,remove()會拋出異常。
/**
* remove方法入口
*/
public E remove() {
// 1. 直接調用poll方法
E x = poll();
// 2. 如果取到數據,直接返回,否則拋出異常
if (x != null) {
return x;
} else {
throw new NoSuchElementException();
}
}
take方法源碼
再看一下take()方法源碼,如果數組為空,take()方法就一直阻塞,直到被喚醒。
/**
* take方法入口
*/
public E take() throws InterruptedException {
// 1. 加可中斷的鎖,防止一直阻塞
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 2. 如果數組為空,就一直阻塞,直到被喚醒
while (count == 0) {
notEmpty.await();
}
// 3. 如果數組不為空,就從數組中取數據
return dequeue();
} finally {
// 4. 釋放鎖
lock.unlock();
}
}
poll(time, unit)源碼
再看一下poll(time, unit)方法源碼,在數組滿的時候, poll(time, unit)方法會阻塞一段時間。
/**
* poll方法入口
*
* @param timeout 超時時間
* @param unit 時間單位
* @return 元素
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 1. 把超時時間轉換成納秒
long nanos = unit.toNanos(timeout);
// 2. 加可中斷的鎖,防止一直阻塞
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 3. 如果數組為空,就開始阻塞
while (count == 0) {
if (nanos <= 0) {
// 5. 如果數組為空,且超時時間已過,則返回null
return null;
}
// 4. 阻塞到到指定時間
nanos = notEmpty.awaitNanos(nanos);
}
// 6. 如果數組不為空,則出列
return dequeue();
} finally {
// 7. 釋放鎖
lock.unlock();
}
}
查看數據源碼
再看一下查看數據源碼,查看數據,并不刪除數據。
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
取數據(不刪除) | element() | peek() | 不支持 | 不支持 |
peek方法源碼
先看一下peek()方法源碼,如果數組為空,就返回null。
/**
* peek方法入口
*/
public E peek() {
// 1. 加鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 2. 返回數組頭部元素,如果數組為空,則返回null
return itemAt(takeIndex);
} finally {
// 3. 釋放鎖
lock.unlock();
}
}
/**
* 返回當前位置元素
*/
final E itemAt(int i) {
return (E) items[i];
}
element方法源碼
再看一下element()方法源碼,如果數組為空,則拋出異常。
/**
* element方法入口
*/
public E element() {
// 1. 調用peek方法查詢數據
E x = peek();
// 2. 如果查到數據,直接返回
if (x != null) {
return x;
} else {
// 3. 如果沒找到,則拋出異常
throw new NoSuchElementException();
}
}
總結
這篇文章講解了ArrayBlockingQueue隊列的核心源碼,了解到ArrayBlockingQueue隊列具有以下特點:
- ArrayBlockingQueue實現了BlockingQueue接口,提供了四組放數據和讀數據的方法,來滿足不同的場景。
- ArrayBlockingQueue底層基于數組實現,采用循環數組,提升了數組的空間利用率。
- ArrayBlockingQueue初始化的時候,必須指定隊列長度,是有界的阻塞隊列,所以要預估好隊列長度,保證生產者和消費者速率相匹配。
- ArrayBlockingQueue的方法是線程安全的,使用ReentrantLock在操作前后加鎖來保證線程安全。
今天一起分析了ArrayBlockingQueue隊列的源碼,可以看到ArrayBlockingQueue的源碼非常簡單,沒有什么神秘復雜的東西,下篇文章再一起接著分析其他的阻塞隊列源碼。