阻塞隊(duì)列—DelayedWorkQueue源碼分析
前言
線程池運(yùn)行時(shí),會(huì)不斷從任務(wù)隊(duì)列中獲取任務(wù),然后執(zhí)行任務(wù)。如果我們想實(shí)現(xiàn)延時(shí)或者定時(shí)執(zhí)行任務(wù),重要一點(diǎn)就是任務(wù)隊(duì)列會(huì)根據(jù)任務(wù)延時(shí)時(shí)間的不同進(jìn)行排序,延時(shí)時(shí)間越短地就排在隊(duì)列的前面,先被獲取執(zhí)行。
隊(duì)列是先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),就是先進(jìn)入隊(duì)列的數(shù)據(jù),先被獲取。但是有一種特殊的隊(duì)列叫做優(yōu)先級(jí)隊(duì)列,它會(huì)對(duì)插入的數(shù)據(jù)進(jìn)行優(yōu)先級(jí)排序,保證優(yōu)先級(jí)越高的數(shù)據(jù)首先被獲取,與數(shù)據(jù)的插入順序無關(guān)。
實(shí)現(xiàn)優(yōu)先級(jí)隊(duì)列高效常用的一種方式就是使用堆。關(guān)于堆的實(shí)現(xiàn)可以查看《堆和二叉堆的實(shí)現(xiàn)和特性》
ScheduledThreadPoolExecutor線程池
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以其內(nèi)部的數(shù)據(jù)結(jié)構(gòu)和ThreadPoolExecutor基本一樣,并在其基礎(chǔ)上增加了按時(shí)間調(diào)度執(zhí)行任務(wù)的功能,分為延遲執(zhí)行任務(wù)和周期性執(zhí)行任務(wù)。
ScheduledThreadPoolExecutor的構(gòu)造函數(shù)只能傳3個(gè)參數(shù)corePoolSize、ThreadFactory、RejectedExecutionHandler,默認(rèn)maximumPoolSize為Integer.MAX_VALUE。
工作隊(duì)列是高度定制化的延遲阻塞隊(duì)列DelayedWorkQueue,其實(shí)現(xiàn)原理和DelayQueue基本一樣,核心數(shù)據(jù)結(jié)構(gòu)是二叉最小堆的優(yōu)先隊(duì)列,隊(duì)列滿時(shí)會(huì)自動(dòng)擴(kuò)容,所以offer操作永遠(yuǎn)不會(huì)阻塞,maximumPoolSize也就用不上了,所以線程池中永遠(yuǎn)會(huì)保持至多有corePoolSize個(gè)工作線程正在運(yùn)行。
- public ScheduledThreadPoolExecutor(int corePoolSize,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue(), threadFactory, handler);
- }
DelayedWorkQueue延遲阻塞隊(duì)列

DelayedWorkQueue 也是一種設(shè)計(jì)為定時(shí)任務(wù)的延遲隊(duì)列,它的實(shí)現(xiàn)和DelayQueue一樣,不過是將優(yōu)先級(jí)隊(duì)列和DelayQueue的實(shí)現(xiàn)過程遷移到本身方法體中,從而可以在該過程當(dāng)中靈活的加入定時(shí)任務(wù)特有的方法調(diào)用。
工作原理
DelayedWorkQueue的實(shí)現(xiàn)原理中規(guī)中矩,內(nèi)部維護(hù)了一個(gè)以RunnableScheduledFuture類型數(shù)組實(shí)現(xiàn)的最小二叉堆,初始容量是16,使用ReentrantLock和Condition實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模式。
源碼分析
定義
DelayedWorkQueue 的類繼承關(guān)系如下:

其包含的方法定義如下:

成員屬性
- // 初始時(shí),數(shù)組長度大小。
- private static final int INITIAL_CAPACITY = 16;
- // 使用數(shù)組來儲(chǔ)存隊(duì)列中的元素。
- private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
- // 使用lock來保證多線程并發(fā)安全問題。
- private final ReentrantLock lock = new ReentrantLock();
- // 隊(duì)列中儲(chǔ)存元素的大小
- private int size = 0;
- //特指隊(duì)列頭任務(wù)所在線程
- private Thread leader = null;
- // 當(dāng)隊(duì)列頭的任務(wù)延時(shí)時(shí)間到了,或者有新的任務(wù)變成隊(duì)列頭時(shí),用來喚醒等待線程
- private final Condition available = lock.newCondition();
DelayedWorkQueue是用數(shù)組來儲(chǔ)存隊(duì)列中的元素,核心數(shù)據(jù)結(jié)構(gòu)是二叉最小堆的優(yōu)先隊(duì)列,隊(duì)列滿時(shí)會(huì)自動(dòng)擴(kuò)容。
構(gòu)造函數(shù)
DelayedWorkQueue 是 ScheduledThreadPoolExecutor 的靜態(tài)類部類,默認(rèn)只有一個(gè)無參構(gòu)造方法。
- static class DelayedWorkQueue extends AbstractQueue<Runnable>
- implements BlockingQueue<Runnable> {
- // ...
- }
入隊(duì)方法
DelayedWorkQueue 提供了 put/add/offer(帶時(shí)間) 三個(gè)插入元素方法。我們發(fā)現(xiàn)與普通阻塞隊(duì)列相比,這三個(gè)添加方法都是調(diào)用offer方法。那是因?yàn)樗鼪]有隊(duì)列已滿的條件,也就是說可以不斷地向DelayedWorkQueue添加元素,當(dāng)元素個(gè)數(shù)超過數(shù)組長度時(shí),會(huì)進(jìn)行數(shù)組擴(kuò)容。
- public void put(Runnable e) {
- offer(e);
- }
- public boolean add(Runnable e) {
- return offer(e);
- }
- public boolean offer(Runnable e, long timeout, TimeUnit unit) {
- return offer(e);
- }
offer添加元素
ScheduledThreadPoolExecutor提交任務(wù)時(shí)調(diào)用的是DelayedWorkQueue.add,而add、put等一些對(duì)外提供的添加元素的方法都調(diào)用了offer。
- public boolean offer(Runnable x) {
- if (x == null)
- throw new NullPointerException();
- RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
- // 使用lock保證并發(fā)操作安全
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- int i = size;
- // 如果要超過數(shù)組長度,就要進(jìn)行數(shù)組擴(kuò)容
- if (i >= queue.length)
- // 數(shù)組擴(kuò)容
- grow();
- // 將隊(duì)列中元素個(gè)數(shù)加一
- size = i + 1;
- // 如果是第一個(gè)元素,那么就不需要排序,直接賦值就行了
- if (i == 0) {
- queue[0] = e;
- setIndex(e, 0);
- } else {
- // 調(diào)用siftUp方法,使插入的元素變得有序。
- siftUp(i, e);
- }
- // 表示新插入的元素是隊(duì)列頭,更換了隊(duì)列頭,
- // 那么就要喚醒正在等待獲取任務(wù)的線程。
- if (queue[0] == e) {
- leader = null;
- // 喚醒正在等待等待獲取任務(wù)的線程
- available.signal();
- }
- } finally {
- lock.unlock();
- }
- return true;
- }
其基本流程如下:
- 其作為生產(chǎn)者的入口,首先獲取鎖。
- 判斷隊(duì)列是否要滿了(size >= queue.length),滿了就擴(kuò)容grow()。
- 隊(duì)列未滿,size+1。
- 判斷添加的元素是否是第一個(gè),是則不需要堆化。
- 添加的元素不是第一個(gè),則需要堆化siftUp。
- 如果堆頂元素剛好是此時(shí)被添加的元素,則喚醒take線程消費(fèi)。
- 最終釋放鎖。
offer基本流程圖如下:

擴(kuò)容grow()
可以看到,當(dāng)隊(duì)列滿時(shí),不會(huì)阻塞等待,而是繼續(xù)擴(kuò)容。新容量newCapacity在舊容量oldCapacity的基礎(chǔ)上擴(kuò)容50%(oldCapacity >> 1相當(dāng)于oldCapacity /2)。最后Arrays.copyOf,先根據(jù)newCapacity創(chuàng)建一個(gè)新的空數(shù)組,然后將舊數(shù)組的數(shù)據(jù)復(fù)制到新數(shù)組中。
- private void grow() {
- int oldCapacity = queue.length;
- // 每次擴(kuò)容增加原來數(shù)組的一半數(shù)量。
- // grow 50%
- int newCapacity = oldCapacity + (oldCapacity >> 1);
- if (newCapacity < 0) // overflow
- newCapacity = Integer.MAX_VALUE;
- // 使用Arrays.copyOf來復(fù)制一個(gè)新數(shù)組
- queue = Arrays.copyOf(queue, newCapacity);
- }
向上堆化siftUp
新添加的元素先會(huì)加到堆底,然后一步步和上面的父親節(jié)點(diǎn)比較,若小于父親節(jié)點(diǎn)則和父親節(jié)點(diǎn)互換位置,循環(huán)比較直至大于父親節(jié)點(diǎn)才結(jié)束循環(huán)。通過循環(huán),來查找元素key應(yīng)該插入在堆二叉樹那個(gè)節(jié)點(diǎn)位置,并交互父節(jié)點(diǎn)的位置。
向上堆化siftUp的詳細(xì)過程可以查看《堆和二叉堆的實(shí)現(xiàn)和特性》
- private void siftUp(int k, RunnableScheduledFuture<?> key) {
- // 當(dāng)k==0時(shí),就到了堆二叉樹的根節(jié)點(diǎn)了,跳出循環(huán)
- while (k > 0) {
- // 父節(jié)點(diǎn)位置坐標(biāo), 相當(dāng)于(k - 1) / 2
- int parent = (k - 1) >>> 1;
- // 獲取父節(jié)點(diǎn)位置元素
- RunnableScheduledFuture<?> e = queue[parent];
- // 如果key元素大于父節(jié)點(diǎn)位置元素,滿足條件,那么跳出循環(huán)
- // 因?yàn)槭菑男〉酱笈判虻摹?nbsp;
- if (key.compareTo(e) >= 0)
- break;
- // 否則就將父節(jié)點(diǎn)元素存放到k位置
- queue[k] = e;
- // 這個(gè)只有當(dāng)元素是ScheduledFutureTask對(duì)象實(shí)例才有用,用來快速取消任務(wù)。
- setIndex(e, k);
- // 重新賦值k,尋找元素key應(yīng)該插入到堆二叉樹的那個(gè)節(jié)點(diǎn)
- k = parent;
- }
- // 循環(huán)結(jié)束,k就是元素key應(yīng)該插入的節(jié)點(diǎn)位置
- queue[k] = key;
- setIndex(key, k);
- }
出隊(duì)方法
DelayedWorkQueue 提供了以下幾個(gè)出隊(duì)方法
- take(),等待獲取隊(duì)列頭元素
- poll() ,立即獲取隊(duì)列頭元素
- poll(long timeout, TimeUnit unit) ,超時(shí)等待獲取隊(duì)列頭元素
take消費(fèi)元素
Worker工作線程啟動(dòng)后就會(huì)循環(huán)消費(fèi)工作隊(duì)列中的元素,因?yàn)镾cheduledThreadPoolExecutor的keepAliveTime=0,所以消費(fèi)任務(wù)其只調(diào)用了DelayedWorkQueue.take。take基本流程如下:
- 首先獲取可中斷鎖,判斷堆頂元素是否是空,空的則阻塞等待available.await()。
- 堆頂元素不為空,則獲取其延遲執(zhí)行時(shí)間delay,delay <= 0說明到了執(zhí)行時(shí)間,出隊(duì)列finishPoll。
- delay > 0還沒到執(zhí)行時(shí)間,判斷l(xiāng)eader線程是否為空,不為空則說明有其他take線程也在等待,當(dāng)前take將無限期阻塞等待。
- leader線程為空,當(dāng)前take線程設(shè)置為leader,并阻塞等待delay時(shí)長。
- 當(dāng)前l(fā)eader線程等待delay時(shí)長自動(dòng)喚醒或者被其他take線程喚醒,則最終將leader設(shè)置為null。
- 再循環(huán)一次判斷delay <= 0出隊(duì)列。
- 跳出循環(huán)后判斷l(xiāng)eader為空并且堆頂元素不為空,則喚醒其他take線程,最后是否鎖。
- public RunnableScheduledFuture<?> take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- for (;;) {
- RunnableScheduledFuture<?> first = queue[0];
- // 如果沒有任務(wù),就讓線程在available條件下等待。
- if (first == null)
- available.await();
- else {
- // 獲取任務(wù)的剩余延時(shí)時(shí)間
- long delay = first.getDelay(NANOSECONDS);
- // 如果延時(shí)時(shí)間到了,就返回這個(gè)任務(wù),用來執(zhí)行。
- if (delay <= 0)
- return finishPoll(first);
- // 將first設(shè)置為null,當(dāng)線程等待時(shí),不持有first的引用
- first = null; // don't retain ref while waiting
- // 如果還是原來那個(gè)等待隊(duì)列頭任務(wù)的線程,
- // 說明隊(duì)列頭任務(wù)的延時(shí)時(shí)間還沒有到,繼續(xù)等待。
- if (leader != null)
- available.await();
- else {
- // 記錄一下當(dāng)前等待隊(duì)列頭任務(wù)的線程
- Thread thisThread = Thread.currentThread();
- leader = thisThread;
- try {
- // 當(dāng)任務(wù)的延時(shí)時(shí)間到了時(shí),能夠自動(dòng)超時(shí)喚醒。
- available.awaitNanos(delay);
- } finally {
- if (leader == thisThread)
- leader = null;
- }
- }
- }
- }
- } finally {
- if (leader == null && queue[0] != null) // 喚醒等待任務(wù)的線程
- available.signal();
- ock.unlock();
- }
- }
take基本流程圖如下:

take線程阻塞等待
可以看出這個(gè)生產(chǎn)者take線程會(huì)在兩種情況下阻塞等待:
- 堆頂元素為空。
- 堆頂元素的delay > 0 。
finishPoll出隊(duì)列
堆頂元素delay<=0,執(zhí)行時(shí)間到,出隊(duì)列就是一個(gè)向下堆化的過程siftDown。
- // 移除隊(duì)列頭元素
- private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
- // 將隊(duì)列中元素個(gè)數(shù)減一
- int s = --size;
- // 獲取隊(duì)列末尾元素x
- RunnableScheduledFuture<?> x = queue[s];
- // 原隊(duì)列末尾元素設(shè)置為null
- queue[s] = null;
- if (s != 0)
- // 因?yàn)橐瞥岁?duì)列頭元素,所以進(jìn)行重新排序。
- siftDown(0, x);
- setIndex(f, -1);
- return f;
- }
堆的刪除方法主要分為三步:
- 先將隊(duì)列中元素個(gè)數(shù)減一;
- 將原隊(duì)列末尾元素設(shè)置成為隊(duì)列頭元素,再將隊(duì)列末尾元素設(shè)置為null;
- 調(diào)用setDown(O,x)方法,保證按照元素的優(yōu)先級(jí)排序。
向下堆化siftDown
由于堆頂元素出隊(duì)列后,就破壞了堆的結(jié)構(gòu),需要組織整理下,將堆尾元素移到堆頂,然后向下堆化:
- 從堆頂開始,父親節(jié)點(diǎn)與左右子節(jié)點(diǎn)中較小的孩子節(jié)點(diǎn)比較(左孩子不一定小于右孩子)。
- 父親節(jié)點(diǎn)小于等于較小孩子節(jié)點(diǎn),則結(jié)束循環(huán),不需要交換位置。
- 若父親節(jié)點(diǎn)大于較小孩子節(jié)點(diǎn),則交換位置。
- 繼續(xù)向下循環(huán)判斷父親節(jié)點(diǎn)和孩子節(jié)點(diǎn)的關(guān)系,直到父親節(jié)點(diǎn)小于等于較小孩子節(jié)點(diǎn)才結(jié)束循環(huán)。
向下堆化siftDown的詳細(xì)過程可以查看《堆和二叉堆的實(shí)現(xiàn)和特性》
- private void siftDown(int k, RunnableScheduledFuture<?> key) {
- // 無符號(hào)右移,相當(dāng)于size/2
- int half = size >>> 1;
- // 通過循環(huán),保證父節(jié)點(diǎn)的值不能大于子節(jié)點(diǎn)。
- while (k < half) {
- // 左子節(jié)點(diǎn), 相當(dāng)于 (k * 2) + 1
- int child = (k << 1) + 1;
- // 左子節(jié)點(diǎn)位置元素
- RunnableScheduledFuture<?> c = queue[child];
- // 右子節(jié)點(diǎn), 相當(dāng)于 (k * 2) + 2
- int right = child + 1;
- // 如果左子節(jié)點(diǎn)元素值大于右子節(jié)點(diǎn)元素值,那么右子節(jié)點(diǎn)才是較小值的子節(jié)點(diǎn)。
- // 就要將c與child值重新賦值
- if (right < size && c.compareTo(queue[right]) > 0)
- c = queue[child = right];
- // 如果父節(jié)點(diǎn)元素值小于較小的子節(jié)點(diǎn)元素值,那么就跳出循環(huán)
- if (key.compareTo(c) <= 0)
- break;
- // 否則,父節(jié)點(diǎn)元素就要和子節(jié)點(diǎn)進(jìn)行交換
- queue[k] = c;
- setIndex(c, k);
- k = child;
- }
- queue[k] = key;
- setIndex(key, k);
- }
leader線程
leader線程的設(shè)計(jì),是Leader-Follower模式的變種,旨在于為了不必要的時(shí)間等待。當(dāng)一個(gè)take線程變成leader線程時(shí),只需要等待下一次的延遲時(shí)間,而不是leader線程的其他take線程則需要等leader線程出隊(duì)列了才喚醒其他take線程。
poll()
立即獲取隊(duì)列頭元素,當(dāng)隊(duì)列頭任務(wù)是null,或者任務(wù)延時(shí)時(shí)間沒有到,表示這個(gè)任務(wù)還不能返回,因此直接返回null。否則調(diào)用finishPoll方法,移除隊(duì)列頭元素并返回。
- public RunnableScheduledFuture<?> poll() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- RunnableScheduledFuture<?> first = queue[0];
- // 隊(duì)列頭任務(wù)是null,或者任務(wù)延時(shí)時(shí)間沒有到,都返回null
- if (first == null || first.getDelay(NANOSECONDS) > 0)
- return null;
- else
- // 移除隊(duì)列頭元素
- return finishPoll(first);
- } finally {
- lock.unlock();
- }
- }
poll(long timeout, TimeUnit unit)
超時(shí)等待獲取隊(duì)列頭元素,與take方法相比較,就要考慮設(shè)置的超時(shí)時(shí)間,如果超時(shí)時(shí)間到了,還沒有獲取到有用任務(wù),那么就返回null。其他的與take方法中邏輯一樣。
- public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
- throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- for (;;) {
- RunnableScheduledFuture<?> first = queue[0];
- // 如果沒有任務(wù)。
- if (first == null) {
- // 超時(shí)時(shí)間已到,那么就直接返回null
- if (nanos <= 0)
- return null;
- else
- // 否則就讓線程在available條件下等待nanos時(shí)間
- nanos = available.awaitNanos(nanos);
- } else {
- // 獲取任務(wù)的剩余延時(shí)時(shí)間
- long delay = first.getDelay(NANOSECONDS);
- // 如果延時(shí)時(shí)間到了,就返回這個(gè)任務(wù),用來執(zhí)行。
- if (delay <= 0)
- return finishPoll(first);
- // 如果超時(shí)時(shí)間已到,那么就直接返回null
- if (nanos <= 0)
- return null;
- // 將first設(shè)置為null,當(dāng)線程等待時(shí),不持有first的引用
- first = null; // don't retain ref while waiting
- // 如果超時(shí)時(shí)間小于任務(wù)的剩余延時(shí)時(shí)間,那么就有可能獲取不到任務(wù)。
- // 在這里讓線程等待超時(shí)時(shí)間nanos
- if (nanos < delay || leader != null)
- nanos = available.awaitNanos(nanos);
- else {
- Thread thisThread = Thread.currentThread();
- leader = thisThread;
- try {
- // 當(dāng)任務(wù)的延時(shí)時(shí)間到了時(shí),能夠自動(dòng)超時(shí)喚醒。
- long timeLeft = available.awaitNanos(delay);
- // 計(jì)算剩余的超時(shí)時(shí)間
- nanos -= delay - timeLeft;
- } finally {
- if (leader == thisThread)
- leader = null;
- }
- }
- }
- }
- } finally {
- if (leader == null && queue[0] != null) // 喚醒等待任務(wù)的線程
- available.signal();
- lock.unlock();
- }
- }
remove刪除指定元素
刪除指定元素一般用于取消任務(wù)時(shí),任務(wù)還在阻塞隊(duì)列中,則需要將其刪除。當(dāng)刪除的元素不是堆尾元素時(shí),需要做堆化處理。
- public boolean remove(Object x) {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- int i = indexOf(x);
- if (i < 0)
- return false;
- //維護(hù)heapIndex
- setIndex(queue[i], -1);
- int s = --size;
- RunnableScheduledFuture<?> replacement = queue[s];
- queue[s] = null;
- if (s != i) {
- //刪除的不是堆尾元素,則需要堆化處理
- //先向下堆化
- siftDown(i, replacement);
- if (queue[i] == replacement)
- //若向下堆化后,i位置的元素還是replacement,說明四無需向下堆化的,
- //則需要向上堆化
- siftUp(i, replacement);
- }
- return true;
- } finally {
- lock.unlock();
- }
- }
總結(jié)
使用優(yōu)先級(jí)隊(duì)列DelayedWorkQueue,保證添加到隊(duì)列中的任務(wù),會(huì)按照任務(wù)的延時(shí)時(shí)間進(jìn)行排序,延時(shí)時(shí)間少的任務(wù)首先被獲取。
- DelayedWorkQueue的數(shù)據(jù)結(jié)構(gòu)是基于堆實(shí)現(xiàn)的;
- DelayedWorkQueue采用數(shù)組實(shí)現(xiàn)堆,根節(jié)點(diǎn)出隊(duì),用最后葉子節(jié)點(diǎn)替換,然后下推至滿足堆成立條件;最后葉子節(jié)點(diǎn)入隊(duì),然后向上推至滿足堆成立條件;
- DelayedWorkQueue添加元素滿了之后會(huì)自動(dòng)擴(kuò)容原來容量的1/2,即永遠(yuǎn)不會(huì)阻塞,最大擴(kuò)容可達(dá)Integer.MAX_VALUE,所以線程池中至多有corePoolSize個(gè)工作線程正在運(yùn)行;
- DelayedWorkQueue 消費(fèi)元素take,在堆頂元素為空和delay >0 時(shí),阻塞等待;
- DelayedWorkQueue 是一個(gè)生產(chǎn)永遠(yuǎn)不會(huì)阻塞,消費(fèi)可以阻塞的生產(chǎn)者消費(fèi)者模式;
- DelayedWorkQueue 有一個(gè)leader線程的變量,是Leader-Follower模式的變種。當(dāng)一個(gè)take線程變成leader線程時(shí),只需要等待下一次的延遲時(shí)間,而不是leader線程的其他take線程則需要等leader線程出隊(duì)列了才喚醒其他take線程。