線程池中線程是如何保活和回收的
面試時經常被問的一個問題,線程池中線程是如何保活的?
這個問題對于看過線程池源碼的同學應該已經知道答案了,沒有看過源碼的也不要慌,現在我們就一起看一下線程池線程的保活策略。
一、線程池中在哪執行任務
首先進入ThreadPoolExecutor的execute方法。
- 首先檢查當前工作線程數量,workerCountOf(c) 是否小于核心線程數量corePoolSize,如果小于就創建一個新線程addWorker()執行這個任務。
- 如果線程池在運行且任務加入隊列成功,但是當前工作線程數量為0,也會創建一個新線程addWorker()。
- 如果任務不能被加入任務隊列(workQueue.offer(command)返回false), 也會創建一個新線程addWorker()。
在execute方法內部有三處調用addWorker()方法的位置,進入到addWorker() 方法中,可以看到有這樣一行代碼new Worker(firstTask) 。
而 Worker類又實現了 Runnable,所以線程池中的線程也就是Worker,而執行就在run()方法內部,我們只需要找到Worker類中的run方法即可。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
run 方法內部調用了 runWorker,秘密就在runWorker方法內部。
所以線程池中線程最終就在runWorker方法中執行的。
二、getTask 獲取任務方法
在上面ThreadPoolExecutor的execute方法中,有一步是往阻塞隊列中放入任務(workQueue.offer(command))。
上面我們找到了線程池中線程執行任務的地方,那么我們看看是從哪讀取的任務?
runWorker方法中有一行代碼task = getTask(),此處就是從阻塞隊列中獲取任務的代碼,讓我們看一下 getTask() 的內部實現。
核心代碼就是
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
根據timed 的狀態判斷,當工作線程數量大于核心線程數量時調用poll方法獲取任務,當工作線程數量小于和核心線程數量時調用take方法。
對于阻塞隊列的介紹如下:
- poll 方法如果隊列不為空,返回頭部元素。如果隊列為空會將線程阻塞在此處,阻塞時間是keepAliveTime。當時間到了獲取不到任務時返回null。
- take方法如果隊列不為空返回頭部元素。如果隊列為空會將線程阻塞在此處,直到隊列中有元素可供使用。
對于非核心線程,當線程池中的線程數量超過核心線程數量且空閑時間超過keepAliveTime時,非核心線程會被回收。
通過這種方式,線程在沒有任務時就阻塞在隊列上,從而實現保活。
上面我們知道了非核心線程的保活策略,那么對于核心線程又是如何保活的呢?
三、核心線程如何實現的保活
在線程池中核心線程默認是不會被回收的。
不過我們可以通過設置allowCoreThreadTimeOut來實現,getTask方法中timed的校驗。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
所以當allowCoreThreadTimeOut 設置為true時,核心線程在沒有任務可以執行時會使用take方法進行阻塞,直到獲取到任務位置,而不是因為沒有任務就被銷毀,從而實現的線程保活。
任務執行的過程是無法保證不出問題的,不管是核心線程還是非核心線程,當線程中出現異常之后,線程池是如何處理的呢?
四、線程異常之后如何保活
繼續回到runWorker方法中,其中task.run()是真正執行任務的地方,當此處發生異常之后,有try catch。
所以當任務執行異常之后,會依次執三個finally塊中的代碼。
再看processWorkerExit() 方法之前,我們先看一個參數completedAbruptly。
completedAbruptly 代表是否是異常退出的,默認是true代表異常退出。
在runWorker方法中,如果線程任務是正常執行完成的,會在最后修改該值。
由于我們是異常線程,所以代碼肯定不會走到這,直接走到finally中的processWorkerExit方法。
在processWorkerExit方法中,它會根據completedAbruptly的值來調整線程池中的工作線程數量,從工作線程集合中移除該線程,并根據線程池的狀態和工作線程數量決定是否需要添加新線程。
!completedAbruptly 判斷工作線程是不是異常退出的,如果不是異常退出的計算最小線程數量。
如果允許核心線程回收allowCoreThreadTimeOut=true,min為0。
如果min為0 且工作隊列不為空! workQueue.isEmpty(),min為1。
如果當前工作線程workerCountOf(c)大于等于這個最小的線程數量min,直接返回。
如果小于這個最小的工作線程數量min,調用addWorker。
此處addWorker 的觸發條件就是當線程池的狀態小于STOP 也就是線程池還在運行runStateLessThan(c, STOP)時且不滿足上述不需要添加新線程的判斷。
當上述條件滿足的時候,則調用addWorker(null,false)添加一個新的工作線程,因為傳入的參數Runnable為null,所以這個新線程會從任務隊列中繼續讀取任務來執行。
最后總結一下,當線程異常之后,按照正常情況來說線程就直接消失了,但是通過processWorkerExit方法的補救,增加了一個新的線程,保證線程池的運行。
五、總結
線程池中的線程分為核心線程與非核心線程。
核心線程默認不回收,可以通過設置allowCoreThreadTimeOut為true 來回收。
非核心線程在獲取任務為空且空閑時間超過一定時間之后進行回收。
線程池的保活策略通過阻塞隊列的阻塞特性實現,poll 方法實現可以指定超時時間的阻塞,take 方法實現阻塞直到獲取到任務。
當線程異常之后,通過新增線程的方式實現線程的補救,保證線程池的運行。