小米面試:什么是線程池?工作原理是什么?線程池可以動態修改嗎?
大家好,我是碼哥,《Redis 高手心法》暢銷書作者。
有讀者分享小米 Java 后端面試,其中有一個問題,當時沒有回答好:什么是線程池、工作原理是什么、線程池可以動態修改嗎?
回答這個問題之前,首先我們來了解下什么是線程池,它的工作原理是什么。
什么是線程池
線程池(Thread Pool)是一種基于池化思想管理線程的工具,它維護多個線程。在線程池中,總有幾個活躍線程。當需要使用線程來執行任務時,可以從池子中隨便拿一個空閑線程來用,當完成工作時,該線程并不會死亡,而是再次返回線程池中成為空閑狀態,等待執行下一個任務。
這種做法,一方面避免了處理任務時創建銷毀線程開銷的代價,另一方面避免了線程數量膨脹導致的過分調度問題,保證了對內核的充分利用。
線程池狀態
然后,我們來看下線程池有哪些狀態呢?
線程池有五種狀態:這五種狀態并不能任意轉換,只會有以下幾種轉換情況:線程池的五種狀態是如何流轉的?
- RUNNING:會接收新任務并且會處理隊列中的任務
- SHUTDOWN:不會接收新任務并且會處理隊列中的任務
- STOP:不會接收新任務并且不會處理隊列中的任務,并且會中斷在處理的任務(注意:一個任務能不能被中斷得看任務本身)
- TIDYING:所有任務都終止了,線程池中也沒有線程了,這樣線程池的狀態就會轉為 TIDYING,一旦達到此狀態,就會調用線程池的 terminated()
- TERMINATED:terminated()執行完之后就會轉變為 TERMINATED
線程池工作原理
如何自定義一個線程池?
public ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
// 核心線程池大小,表示線程池常駐線程數量
30,
// 最大線程數,表示線程池最多創建的線程數量
100,
// 保活時間,表示一個非核心線程多久沒有使用,會被回收
10,
TimeUnit.MINUTES,
// 阻塞隊列,表示隊列最多緩存多少任務,如果隊列滿了,將觸發 RejectedExecutionHandler
new ArrayBlockingQueue<>(1000),
// 線程工廠,創建線程時候用的,可以給線程命名等
new NamedThreadFactory("cust-task")
);
// 拒絕策略,當阻塞隊列滿了之后,會觸發這里的handler
// 默認是丟棄新任務
executor.setRejectedExecutionHandler((r, executor1) -> {
log.warn("thread pool is full");
});
}
線程池執行流程圖
- 首先檢測線程池運行狀態,如果不是 RUNNING,則直接拒絕,線程池要保證在 RUNNING 的狀態下執行任務。
- 如果當前線程數未超過核心線程數,則創建并啟動一個線程來執行新提交的任務。
- 如果當前線程數超過核心線程數,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中。
- 如果當前線程數超過核心線程數且 線程池內的阻塞隊列已滿,且未超過最大線程數,則創建并啟動一個線程來執行新提交的任務。
- 如果已超過最大線程數,并且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。
注意:提交一個 Runnable 時,不管當前線程池中的線程是否空閑,只要數量小于核心線程數就會創建新線程。
線程池的拒絕策略
ThreadPoolExecutor 內部有實現 4 個拒絕策略:
- CallerRunsPolicy,由調用 execute 方法提交任務的線程來執行這個任務。
- AbortPolicy,拋出異常 RejectedExecutionException 拒絕提交任務。
- DiscardPolicy,直接拋棄任務,不做任何處理。
- DiscardOldestPolicy,去除任務隊列中的第一個任務(最舊的),重新提。
如何監控線程池?
好了,言歸正傳,再回歸到這個題目本身,在修改線程池之前,我們要如何監控線程池的信息呢?
比如線程池的執行任務前后總時間,當前任務數等信息。
- 統計任務執行時間可以通過實現 beforeExecute 和 afterExecute 方法,計算出任務總耗時。
- 統計線程池的任務數,線程數等信息,可定時上報到 kafka,展示到可視化的界面上比如 Grafana。
監控核心代碼
@Slf4j
public class ThreadPoolMonitor {
private final ThreadPoolExecutor customThreadPool;
private final String poolName;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public ThreadPoolMonitor(ThreadPoolExecutor customThreadPool, String poolName) {
this.customThreadPool = customThreadPool;
this.poolName = poolName;
}
public void startMonitoring(long period, TimeUnit unit) {
scheduler.scheduleAtFixedRate(this::monitor, 0, period, unit);
}
private void monitor() {
//核心線程數
int corePoolSize = customThreadPool.getCorePoolSize();
//最大線程數
int maximumPoolSize = customThreadPool.getMaximumPoolSize();
//活躍線程數
int activeCount = customThreadPool.getActiveCount();
//隊列任務數
int queueSize = customThreadPool.getQueue().size();
//已執行完成任務數
long completedTaskCount = customThreadPool.getCompletedTaskCount();
//隊列任務數峰值
int largestPoolSize = customThreadPool.getLargestPoolSize();
//上報監控數據
sendToKafka(corePoolSize,maximumPoolSize, activeCount, queueSize, completedTaskCount, largestPoolSize);
}
private void sendToKafka(int corePoolSize,int maximumPoolSize, int activeCount, int queueSize, long completedTaskCount, int largestPoolSize) {
// 自定義實現發送kafka邏輯或上報到prometheus邏輯
}
}
如何動態調整線程池?
一般我們在設置線程池的線程數時,會參考實際業務場景。比較通用的公式是
- IO 密集型場景:線程數=CPU 核心數*2+1
- CPU 密集型場景線程數=CPU 核心數+1
但這只是比較簡單粗暴的計算方式,在實際使用過程中,我們還是不可避免的需要調整線程池的一些參數,以達到最佳性能。
那么我們通過會比較關注線程池以下的幾個參數
線程池參數 | 說明 |
corePoolSize | 核心線程數 |
maximumPoolSize | 最大線程數 |
queueCapacity | 等待隊列大小 |
keepAliveTime | 空閑時間 |
- corePoolSize、maximumPoolSize 和 keepAliveTime 可以通過調用 setCorePoolSize、setMaximumPoolSize、setKeepAliveTime 方法修改。
- queueCapacity 雖然不能直接修改,我們可以通過實現自定義一個阻塞隊列的方式去實現 setQueueCapacity 方法來修改隊列大小的屬性。
最后可以通過 Apollo、Nacos 配置中心實現動態監聽的方法,達到實時更新線程池的效果。
擴展 1:線程池核心線程數會被銷毀嗎?
擴展 2:線程發生異常,會被移出線程池嗎?