背會了常見的幾個線程池用法,結果被問翻
背景
這是張小帥失業之后的第三場面試。
面試官:“實際開發中用過多線程吧,那聊聊線程池吧”。
“有CachedThreadPool:可緩存線程池,FixedThreadPool:定長線程池.......balabala”。小帥暗暗竊喜,還好把這幾種線程池背下來了,看來這次可以上岸了。
面試官點點頭,繼續問到“那線程池底層是如何實現復用的?”
“額,這個....”
寒風中,那個男人的背影在暮色中顯得孤寂而凄涼,仿佛與世隔絕,獨自面對著無盡的寂寞......
概要
如果問到線程池的話,不好好剖析過底層代碼,恐怕真的會像小帥那樣被問翻吧。
那么在此我們就來好好剖析一下線程池的底層吧。我們大概從如下幾個方面著手:
概覽圖
什么是線程池
說到線程池,其實我們要先聊到池化技術。
池化技術:我們將資源或者任務放入池子,使用時從池中取,用完之后交給池子管理。通過優化資源分配的效率,達到性能的調優。
池化技術優點:
- 資源被重復使用,減少了資源在分配銷毀過程中的系統的調度消耗。比如,在IO密集型的服務器上,并發處理過程中的子線程或子進程的創建和銷毀過程,帶來的系統開銷將是難以接受的。所以在業務實現上,通常把一些資源預先分配好,如線程池,數據庫連接池,Redis連接池,HTTP連接池等,來減少系統消耗,提升系統性能。
- 池化技術分配資源,會集中分配,這樣有效避免了碎片化的問題。
- 可以對資源的整體使用做限制,相關資源預分配且只在預分配后生成,后續不再動態添加,從而限制了整個系統對資源的使用上限。
所以我們說線程池是提升線程可重復利用率、可控性的池化技術的一種。
線程池的使用
1.多線程發送郵件案例
現在我們有這樣一個場景,上層有業務系統批量調用底層進行發送郵件,廢話不多,直接上代碼:
demo
最終運行輸出結果為:
由線程:pool-1-thread-1 發送第:0封郵件
由線程:pool-1-thread-2 發送第:1封郵件
由線程:pool-1-thread-1 發送第:2封郵件
由線程:pool-1-thread-2 發送第:3封郵件
由線程:pool-1-thread-1 發送第:4封郵件
由線程:pool-1-thread-1 發送第:6封郵件
由線程:pool-1-thread-2 發送第:5封郵件
由線程:pool-1-thread-1 發送第:7封郵件
由線程:pool-1-thread-2 發送第:8封郵件
由線程:pool-1-thread-1 發送第:9封郵件
上面的例子中從結果來看是10封郵件分別由兩條線程發送出去了,上圖可見,我們給ThreadPoolExecutor這個執行器分別指定了七個參數。那么參數的含義到底是什么呢?接下來咱們層層抽絲剝繭。
2.構造函數說明
大家估計會有疑問,線程池的種類那么多,案例中為什么要用TheadPoolExecutor類呢,其他的種類是由TheadPoolExecutor通過不同的入參定義出來的,所以我們直接拿ThreadPoolExecutor來看。
我們先來看一下ThreadPoolExecutor的繼承關系,有個宏觀印象:
宏觀繼承
我們再來看一下ThreadPoolExecutor的構造方法:
構造方法
下面我們來解釋一下幾個參數的含義:
- corePoolSize:核心線程數。
- maximumPoolSize:最大線程數。
- keepAliveTime:線程池中線程的最大閑置生命周期。
- unit:針對keepAliveTime的時間單位。
- workQueue:阻塞隊列。
- threadFactory:創建線程的線程工廠。
- handler:拒絕策略。
大家對上述的含義初步有個概念。
3.工作流程概述
看了上面的構造函數字段大家估計也還是優點懵的,尤其是從來沒有接觸過商品池的小伙伴。所以老貓又擼了一張商品池的大概的工作流程圖,方便大家把這些概念串起來。
大概流程
上圖中老貓標記了四條線,簡單介紹一下(當然上圖若有問題,也希望大家能夠指出來)。
- 當發起任務時候,會計算線程池中存在的線程數量與核心線程數量(corePoolSize)進行比較,如果小于,則在線程池中創建線程,否則,進行下一步判斷。
- 如果不滿足條件1,則會將任務添加到阻塞隊列中。等待線程池中的線程空閑下來后,獲取隊列中的任務進行執行。
- 但是條件2中如果阻塞隊列滿了之后,此時又會重新獲取當前線程的數量和最大線程數(maximumPoolSize)進行比較,如果發現小于最大線程數,那么繼續添加到線程池中即可。
- 如果都不滿足上述條件,那么此時會放到拒絕策略中。
4.execute核心流程剖析
接下來我們來看一下執行theadPoolExecutor.execute()的時候到底發生了什么。先來看一下源碼:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
(1) ctl變量
進入執行源碼之后我們首先看到的是ctl,只知道ctl中拿到了一個int數據至于這個數值有什么用,目前不知道,接著看涉及的相關代碼,老貓將相關的代碼解讀放到源碼中進行注釋。
//通過ctl獲取線程池的狀態以及包含的線程數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // COUNT_BITS = 32-3 = 29
/**001左移29位
* 00100000 00000000 00000000 00000000
* 操作減1
* 00011111 11111111 11111111 11111111(表示初始化的時候線程情況,1表示均有空閑線程)
* 換成十進制:COUNT_MASK = 536870911
*/
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
/**
* 運行中狀態
* 1的原碼
* 00000000 00000000 00000000 00000001
* 取反+1
* 11111111 11111111 11111111 11111111
* 左移29位
* 11100000 00000000 00000000 00000000
**/
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; //運行中狀態 11100000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS; //終止狀態 00000000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS; //停止 00100000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS; // 01000000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS; // 01100000 00000000 00000000 00000000
//取高3位表示獲取運行狀態
private static int runStateOf(int c) { return c & ~COUNT_MASK; } //~COUNT_MASK表示取反碼:11100000 00000000 00000000 00000000
//取出低位29位的值,當前活躍的線程數
private static int workerCountOf(int c) { return c & COUNT_MASK; } //COUNT_MASK:00011111 11111111 11111111 11111111
//計算ctl的值,ctl=[3位]線程池狀態 + [29位]線程池中線程數量。
private static int ctlOf(int rs, int wc) { return rs | wc; } //進行或運算
上面我們針對各個狀態以及那么多的二進制表示符有點懵,當然如果不會二進制運算的,大家可以先自己去了解一下二進制的運算邏輯。通過源碼中的英文,我們知道CTL的值其實分成兩部分組成,高三位是狀態,其余均為當前線程數。如下的圖:
線程池狀態
上面的圖的描述解釋,其實也都是英文注釋版的翻譯,我們再來看一下有了這些狀態,這些狀態是怎么流轉的,英文注釋是這樣的:
/*** RUNNING -> SHUTDOWN
* On invocation of shutdown()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
* /
上面的描述不太直觀,老貓將流程串了起來,得到了下面的狀態機流轉圖。如下圖:
狀態機流程
寫到這里,其實ctl已經很清楚了,ctl說白了就是狀態位和活躍線程數的表示方式。通過ctl咱們可以知道當前是什么狀態以及活躍線程數量是多少 (設計很巧妙,如果此處還有問題,歡迎大家私聊老貓)。
(3) 線程池中的線程數小于核心線程數
讀完ctl之后,我們來看一下接下來的代碼。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return; //添加新的線程
c = ctl.get(); //重新獲取當前的狀態以及線程數量
}
繼上述的workerCountOf,我們知道這個方法可以獲取當前活躍的線程數。如果當前線程數小于配置的核心線程數,則會調用addWorker進行添加新的線程。如果添加失敗了,則重新獲取ctl的值。
(4) 任務添加到隊列的相關邏輯
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次check一下,當前線程池是否是運行狀態,如果不是運行時狀態,則把剛剛添加到workQueue中的command移除掉
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
上述我們知道當添加線程池失敗的時候,我們會重新獲取ctl的值。此時咱們的第一步就很清楚了:
- 通過isRunning方法來判斷線程池狀態是不是運行中狀態,如果是,則將command任務放到阻塞隊列workQueue中。
- 再次check一下,當前線程池是否是運行狀態,如果不是運行時狀態,則把剛剛添加到workQueue中的command移除掉,并調用拒絕策略。否則,判斷如果當前活動的線程數如果為0,則表明只去創建線程,而此處,并不執行任務(因為,任務已經在上面的offer方法中被添加到了workQueue中了,等待線程池中的線程去消費隊列中的任務)
(5) 線程池中的線程數量小于最大線程數代碼邏輯以及拒絕策略的代碼邏輯
接下來,我們看一下最后的一個步驟
/**
* 進入第三步驟前提:
* 1.線程池不是運行狀態,所以isRunning(c)為false
* 2.workCount >= corePoolSize的時候于此同時并且添加到queue失敗的時候執行
*/
else if (!addWorker(command, false))
reject(command);
}
由于調用addWorker的第二個參數是false,則表示對比的是最大線程數,那么如果往線程池中創建線程依然失敗,即addWorker返回false,那么則進入if語句中,直接調用reject方法調用拒絕策略了。
寫到這里大家估計會對這個第二個參數是false為什么比較的是最大線程數有疑問。其實這個是addWorker中的方法。我們可以大概看一下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
}
我們很明顯地看到當core為flase的時候咱們獲取的是maximumPoolSize,也就是最大線程數。
寫到這里,其實咱們的核心主流程大概就已經結束了。這里其實老貓也只是寫了一個算是比較入門的開頭。當然我們還可以再深入去理addWorker的源碼。這個其實就交給大家去細看了,篇幅過長,相信大家也會失去閱讀的興趣了,感興趣的可以自己研究一下,如果說還是有問題的,可以找老貓一起探討,老貓的公眾號:"程序員老貓"。老貓覺得在上述的源碼中比較重要的其實就是ctl值的流轉順序以及計算方式,讀懂這個的話,后面一切的源碼只要順藤摸瓜即可理解。
5.Executors線程池模板
我們上述主要和大家分享了比較核心的theadPoolExecutor。除此之外,線程池Executors里面包含了很多其他的線程池模板。當然這也是小貓直接面試的時候說的那些,其實小貓也就僅僅只是背了線程池模板而已,并不知曉其工作原理。如下幾種:
- newCachedThreadPool 創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
- newFixedThreadPool 創建一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。
- newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。
- newSingleThreadScheduleExecutor 創建一個單線程執行程序,它可安排在給定延遲后運行命令或者定期地執行。(注意,如果因為在關閉前的執行期間出現失敗而終止了此單個線程,那么如果需要,一個新線程會代替它執行后續的任務)。可保證順序地執行各個任務,并且在任意給定的時間不會有多個線程是活動的。與其他等效的 newScheduledThreadPool(1) 不同,可保證無需重新配置此方法所返回的執行程序即可使用其他的線程。
- newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
6.多樣化的blockingQueue
- PriorityBlockingQueue 它是一個無界的并發隊列。無法向這個隊列中插入null值。所有插入到這個隊列中的元素必須實現Comparable接口。因此該隊列中元素的排序就取決于你自己的Comparable實現。
- SynchronousQueue 它是一個特殊的隊列,它的內部同時只能夠容納單個元素。如果該隊列已有一個元素的話,那么試圖向隊列中插入一個新元素的線程將會阻塞,直到另一個新線程將該元素從隊列中抽走。同樣的,如果隊列為空,試圖向隊列中抽取一個元素的線程將會被阻塞,直到另一個線程向隊列中插入了一條新的元素。因此,它其實不太像是一個隊列,而更像是一個匯合點。
- ArrayBlockingQueue 它是一個有界的阻塞隊列,其內部實現是將對象放到一個數組里。一但初始化,大小就無法修改
- LinkedBlockingQueue 它內部以一個鏈式結構(鏈接節點)對其元素進行存儲。可以指定元素上限,否則,上限則為Integer.MAX_VALUE。
- DelayQueue 它對元素進行持有直到一個特定的延遲到期。注意:進入其中的元素必須實現Delayed接口。
上述針對這些羅列了一下,其實很多官網上也有相關的介紹,當然感興趣的小伙伴也可以再去刨一刨里面的源碼實現。
7.拒絕策略
- AbortPolicy 丟棄任務并拋出RejectedExecutionException異常。
- DiscardPolicy 丟棄任務,但是不拋出異常。
- DiscardOldestPolicy 丟棄隊列中最前面的任務,然后重新嘗試執行任務。
- CallerRunsPolicy 由調用線程處理該任務。
總結
很多小伙伴在用一些線程池或者第三方中間件的時候可能只停留在如何使用上,一旦出了問題或者被人深入問到其實現原理的時候就比較頭大。所以在日常開發的過程中,我們不僅僅需要知道如何去用,其實更應該知道底層的原理是什么。這樣才能長立于不敗之地。