Java 線程池知識(shí)點(diǎn)小結(jié)
在當(dāng)今軟件開發(fā)領(lǐng)域,高性能和高并發(fā)處理能力成為了衡量應(yīng)用程序質(zhì)量的重要標(biāo)準(zhǔn)之一。Java 作為一門廣泛使用的編程語言,在支持多線程和并發(fā)編程方面提供了強(qiáng)大的工具——線程池。本文將深入探討 Java 線程池的原理、實(shí)現(xiàn)方式及其優(yōu)化技巧,旨在幫助開發(fā)者更好地理解和利用這一重要機(jī)制,從而構(gòu)建更加高效穩(wěn)定的并發(fā)系統(tǒng)。
一、詳解線程池核心知識(shí)點(diǎn)
1. 為什么需要線程池
我們可以從性能、資源、安全等角度來回答這個(gè)問題:
- 提高響應(yīng)速度:從性能角度來說,通過線程池進(jìn)行池化統(tǒng)一管理線程,使用時(shí)直接通過線程池獲取,不再需要手動(dòng)創(chuàng)建線程,響應(yīng)速度大大提高。
- 降低資源消耗:從資源消耗的角度,由于線程池被池化管理了,我們無需為了某些功能去手動(dòng)創(chuàng)建和銷毀線程,資源消耗自然降低。
- 便于管理和監(jiān)控:因?yàn)槲覀兊墓ぷ骶€程都來自于線程池中所以對(duì)于線程的監(jiān)控和管理自然方便了許多。
2. 線程池使用示例
接下來我們展示了一個(gè)非常簡(jiǎn)單的demo,創(chuàng)建一個(gè)含有3個(gè)線程的線程,提交3個(gè)任務(wù)到線程池中,讓線程池中的線程池執(zhí)行。 完成后通過shutdown停止線程池,線程池收到通知后會(huì)將手頭的任務(wù)都執(zhí)行完,再將線程池停止,所以筆者這里使用isTerminated判斷線程池是否完全停止了。只有狀態(tài)為terminated才能說明線程池關(guān)閉了,結(jié)束循環(huán),退出方法。
//創(chuàng)建含有3個(gè)線程的線程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
//提交3個(gè)任務(wù)到線程池中
for (int i = 0; i < 3; i++) {
finalint taskNo = i;
threadPool.execute(() -> {
log.info("執(zhí)行任務(wù){(diào)}", taskNo);
});
}
//關(guān)閉線程池
threadPool.shutdown();
//如果線程池還沒達(dá)到Terminated狀態(tài),說明線程池中還有任務(wù)沒有執(zhí)行完,則繼續(xù)循環(huán)等待線程池執(zhí)行完任務(wù)
while (!threadPool.isTerminated()) {
}
對(duì)應(yīng)輸出結(jié)果如下:
10:38:51.993 [pool-1-thread-3] INFO com.sharkChili.Main - 執(zhí)行任務(wù)2
10:38:51.993 [pool-1-thread-2] INFO com.sharkChili.Main - 執(zhí)行任務(wù)1
10:38:51.993 [pool-1-thread-1] INFO com.sharkChili.Main - 執(zhí)行任務(wù)0
3. 詳解線程池核心參數(shù)
我們上文通過Executors框架創(chuàng)建了線程池,它底層是通過ThreadPoolExecutor完成線程池的創(chuàng)建:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可以看到ThreadPoolExecutor的構(gòu)造方法包含下面幾個(gè)參數(shù),它們分別是:
- corePoolSize:核心線程數(shù),即時(shí)空閑也會(huì)保留在線程池中的線程。
- maximumPoolSize:線程池允許創(chuàng)建的最大線程數(shù),例如配置為10,那么線程池中最大的線程數(shù)就為10。
- keepAliveTime:核:心線程數(shù)以外的線程的生存時(shí)間,例如corePoolSize為2,maximumPoolSize為5,假如我們線程池中有5個(gè)線程,核心線程以外有3個(gè),這3個(gè)線程如果在keepAliveTime的時(shí)間內(nèi)沒有被用到就會(huì)被回收。
- unit:keepAliveTime的時(shí)間單位。
- workQueue:當(dāng)核心線程都在忙碌時(shí),任務(wù)都會(huì)先放到隊(duì)列中。
- threadFactory:線程工廠,用戶可以通過這個(gè)參數(shù)指定創(chuàng)建線程的線程工廠。
- handler:當(dāng)線程池?zé)o法接受新的任務(wù)時(shí),就會(huì)根據(jù)這個(gè)參數(shù)做出拒絕策略,默認(rèn)拒絕策略是直接拋異常。
對(duì)應(yīng)的構(gòu)造方法如下所示:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//......
}
4. 線程池的工作流程
從ThreadPoolExecutor的execute方法我們大體可以窺探到其內(nèi)部核心邏輯:
- 如果工作的線程小于核心線程數(shù),則調(diào)用addWorker創(chuàng)建線程并執(zhí)行我們傳入的任務(wù)。
- 如果核心線程都在工作,則調(diào)用workQueue.offer(command)將我們提交的任務(wù)放到隊(duì)列中。
- 如果隊(duì)列也無法容納任務(wù)時(shí),則繼續(xù)創(chuàng)建線程并用這些線程處理新進(jìn)來的任務(wù)。
- 如果還有新的任務(wù)接入且當(dāng)線程數(shù)達(dá)到maximumPoolSize時(shí),說明已經(jīng)無法容納任務(wù)了,則調(diào)用reject(command)按照拒絕策略處理任務(wù)。
對(duì)應(yīng)的我們給出execute的源碼核心邏輯,讀者可自行參閱:
public void execute(Runnable command) {
//......
//工作線程數(shù)小于核心線程數(shù),則創(chuàng)建線程線程處理傳入的任務(wù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//核心線程都在工作則將任務(wù)存入阻塞隊(duì)列
//......
}
elseif (!addWorker(command, false))//隊(duì)列無法容納則繼續(xù)創(chuàng)建線程應(yīng)急處理,如果創(chuàng)建失敗說明當(dāng)前線程超過maximumPoolSize,則調(diào)用reject按照拒絕策略處理任務(wù)
reject(command);
}
5. 線程池的幾種狀態(tài)
ThreadPoolExecutor通過2的次冪并結(jié)合位運(yùn)算標(biāo)識(shí)了幾種線程的狀態(tài),這種做法也是計(jì)算機(jī)程序設(shè)計(jì)中常見的實(shí)用技巧,即通過位運(yùn)算標(biāo)識(shí)替代常規(guī)數(shù)值比對(duì),保證狀態(tài)唯一性的同時(shí)還能保證程序的執(zhí)行效率,對(duì)應(yīng)的線程池狀態(tài)和注釋如下:
//RUNNING 說明線程正處于運(yùn)行狀態(tài),正在處理任務(wù)和接受新的任務(wù)進(jìn)來
privatestaticfinalint RUNNING = -1 << COUNT_BITS;
//說明線程收到關(guān)閉的通知了,繼續(xù)處理手頭任務(wù),但不接受新任務(wù)
privatestaticfinalint SHUTDOWN = 0 << COUNT_BITS;
//STOP說明線程停止了不處理任務(wù)也不接受任務(wù),即時(shí)隊(duì)列中有任務(wù),我們也會(huì)將其打斷。
privatestaticfinalint STOP = 1 << COUNT_BITS;
//表明所有任務(wù)都已經(jīng)停止,記錄的任務(wù)數(shù)量為0
privatestaticfinalint TIDYING = 2 << COUNT_BITS;
//線程池完全停止了
privatestaticfinalint TERMINATED = 3 << COUNT_BITS;
6. 線程池的幾種拒絕策略
- AbortPolicy:這個(gè)拒絕策略在無法容納新任務(wù)的時(shí)候直接拋出異常,這種策略是線程池默認(rèn)的拒絕策略。
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
- CallerRunsPolicy:從源碼中可以看出,當(dāng)線程池?zé)o法容納新任務(wù)的時(shí),會(huì)直接將當(dāng)前任務(wù)交給調(diào)用者執(zhí)行。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//......
//讓當(dāng)前提交任務(wù)的線程運(yùn)行
if (!e.isShutdown()) {
r.run();
}
}
}
- DiscardOldestPolicy:顧名思義,當(dāng)線程池?zé)o法最新任務(wù)時(shí),會(huì)將隊(duì)首的任務(wù)丟棄,將新任務(wù)存入。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//將隊(duì)首元素poll掉,并將當(dāng)前任務(wù)提交
e.getQueue().poll();
e.execute(r);
}
}
}
DiscardPolicy:從源碼中可以看出這個(gè)策略什么也不做,相當(dāng)于直接將當(dāng)前任務(wù)丟棄。
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
//什么都不做直接即丟棄當(dāng)前任務(wù)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
6. 線程兩種任務(wù)提交方式
首先是execute,當(dāng)任務(wù)提交到線程池中時(shí)直接按照流程執(zhí)行即可,處理完成后是沒有返回值的源碼上文已經(jīng)給出這里就不多贅述。而submit它會(huì)將傳進(jìn)來的任務(wù)封裝成RunnableFuture,然后將Future返回出去,調(diào)用者可以通過get方法獲取返回結(jié)果:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
//提交任務(wù)
execute(ftask);
//返回Future,后續(xù)我們可以通過get獲取結(jié)果
return ftask;
}
對(duì)應(yīng)的我們也給出使用示例:
@Test
void baseUse() throws ExecutionException, InterruptedException {
//創(chuàng)建含有3個(gè)線程的線程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
//提交3個(gè)任務(wù)到線程池中
for (int i = 0; i < 3; i++) {
finalint taskNo = i;
Future<Integer> future = threadPool.submit(() -> {
logger.info("執(zhí)行任務(wù){(diào)}", taskNo);
return1;
});
logger.info("處理結(jié)果:{}", future.get());
}
//關(guān)閉線程池
threadPool.shutdown();
//如果線程池還沒達(dá)到Terminated狀態(tài),說明線程池中還有任務(wù)沒有執(zhí)行完,則繼續(xù)循環(huán)等待線程池執(zhí)行完任務(wù)
while (!threadPool.isTerminated()) {
}
}
輸出結(jié)果:
00:24:41.204 [pool-1-thread-1] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 執(zhí)行任務(wù)0
00:24:41.208 [main] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 處理結(jié)果:1
00:24:41.209 [pool-1-thread-2] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 執(zhí)行任務(wù)1
00:24:41.209 [main] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 處理結(jié)果:1
00:24:41.209 [pool-1-thread-3] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 執(zhí)行任務(wù)2
00:24:41.209 [main] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 處理結(jié)果:1
7. 線程池的關(guān)閉方式
線程池的停止方式有兩種:
- shutdown:筆者上述代碼示例用的都是這種方式,使用這個(gè)方法之后,我們無法提交新的任務(wù)進(jìn)來,線程池會(huì)繼續(xù)工作,將手頭的任務(wù)執(zhí)行完再停止:
public void shutdown() {
//上鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//檢查并設(shè)置狀態(tài),不再接受新任務(wù)
checkShutdownAccess();
advanceRunState(SHUTDOWN);
//打斷空閑的線程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//停止線程池
tryTerminate();
}
- shutdownNow:這種停止方式就比較粗暴了,線程池會(huì)直接將手頭的任務(wù)都強(qiáng)行停止,且不接受新任務(wù)進(jìn)來,線程停止立即生效:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
//上鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//設(shè)置狀態(tài)為stop強(qiáng)行停止
checkShutdownAccess();
advanceRunState(STOP);
//打斷空閑線程
interruptWorkers();
//移除隊(duì)列中的任務(wù)
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//停止線程池
tryTerminate();
return tasks;
}
8. 非核心線程創(chuàng)建與調(diào)度饑餓問題
了解了線程池整體工作原理后,讀者是否想過,為什么先要用corePoolSize核心線程,然后當(dāng)核心線程處理不過來時(shí)將異步任務(wù)先放到workQueue中,而不是直接開maximumPoolSize的線程數(shù)繼續(xù)處理應(yīng)急任務(wù)呢?如果按照當(dāng)前的線程池執(zhí)行流程不就存在一個(gè)饑餓問題?即后來的任務(wù)可能會(huì)比存在于隊(duì)列中的任務(wù)先執(zhí)行:
其實(shí)回答這個(gè)問題,其實(shí)我們可以通過反證法來解釋,假設(shè)任務(wù)處理不過來之后,直接創(chuàng)建maximumPoolSize個(gè)線程處理任務(wù),那么就會(huì)存在以下幾個(gè)問題:
- 僅僅因?yàn)楹诵木€程數(shù)處理不來任務(wù)就認(rèn)為是應(yīng)急情況,這會(huì)導(dǎo)致應(yīng)急線程被提前創(chuàng)建,這就可能存在頻繁創(chuàng)建和銷毀線程的性能損耗,例如核心線程為4,剛剛好某個(gè)時(shí)間段剛剛好來個(gè)5個(gè)異步任務(wù),僅僅因?yàn)槎嗔艘粋€(gè)任務(wù),在沒有任何緩沖的情況下,直接創(chuàng)建應(yīng)急線程然后被銷毀,這就會(huì)導(dǎo)致這種不合理的性能損耗。
- 資源消耗:創(chuàng)建完最大線程之后,線程有可能處于空閑中,這也不能意味著線程沒有任何開銷,一旦線程被啟動(dòng)對(duì)于CPU、內(nèi)存而言都是存在一定的資源開銷的,如果maximumPoolSize線程數(shù)過大,對(duì)于系統(tǒng)資源占用也是非常不劃算的。
總的來說,設(shè)計(jì)者們認(rèn)為只有緩沖區(qū)處理不來(隊(duì)列容納不下)的情況下才能開啟應(yīng)急線程是一種對(duì)于應(yīng)急情況的判斷依據(jù),由此避免了非應(yīng)急情況創(chuàng)建應(yīng)急線程的開銷:
//存入阻塞隊(duì)列失敗后,才會(huì)嘗試調(diào)用addWorker開啟非核心線程,即通過阻塞隊(duì)列的閾值來作為應(yīng)急情況判斷的依據(jù)
if (isRunning(c) && workQueue.offer(command)) {
//......
}
else if (!addWorker(command, false))//如果非核心線程開啟失敗,則執(zhí)行拒絕策略
reject(command);
再來回答另外一個(gè)問題即任務(wù)饑餓問題,,這確實(shí)會(huì)存在一定情況下存在饑餓問題,但筆者認(rèn)為該問題只要線程池參數(shù)設(shè)定得當(dāng),在非核心線程啟動(dòng)之后,這些堆積在阻塞隊(duì)列的任務(wù)在一定時(shí)間后就會(huì)被任意抽身出來的線程從隊(duì)列中取出并處理,對(duì)應(yīng)的我們給出每一個(gè)worker線程的執(zhí)行邏輯即ThreadPoolExecutor的runWorker方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//......
try {
//從阻塞隊(duì)列中獲取任務(wù)task
while (task != null || (task = getTask()) != null) {
//......
try {
//......
try {
//執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
//......
} finally {
afterExecute(task, thrown);
}
} finally {
//......
}
}
//......
} finally {
processWorkerExit(w, completedAbruptly);
}
}
當(dāng)然如果讀者對(duì)于任務(wù)公平有著嚴(yán)格要求同時(shí)系統(tǒng)資源也足夠充分,完全可以考慮通過Executors.newSingleThreadExecutor()這種只有一個(gè)線程的線程輪詢處理阻塞隊(duì)列的任務(wù)模式,來保證異步任務(wù)順序性和公平性:
對(duì)應(yīng)的我們也給出singleThreadExecutor的核心實(shí)現(xiàn),讀者也可以參考源碼了解一下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
二、線程池使用注意事項(xiàng)
1. 避免使用Executors的newFixedThreadPool
接下來我們來看看日常使用線程池時(shí)一些錯(cuò)誤示例,為了更好的看到線程池的變化,我們編寫這樣一個(gè)定時(shí)任務(wù)去監(jiān)控線程池的變化。
/**
* 打印線程池情況
*
* @param threadPool
*/
private void printStats(ThreadPoolExecutor threadPool) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
log.info("=========================");
log.info("Pool Size:{}", threadPool.getPoolSize());
log.info("Active Threads:{}", threadPool.getActiveCount());
log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());
log.info("Number of Tasks in Queue:{}", threadPool.getQueue().size());
log.info("=========================");
}, 0, 1, TimeUnit.SECONDS);
}
先來看看這樣一段代碼,我們循環(huán)1e次,每次創(chuàng)建這樣一個(gè)任務(wù):生成一串大字符串,休眠一小時(shí)后打印輸出。
@GetMapping("oom1")
public void oom1() {
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
printStats(threadPool);
for (int i = 0; i < 1_0000_0000; i++) {
threadPool.submit(() -> {
String payload = IntStream.rangeClosed(1, 100_0000)
.mapToObj(__ -> "a")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(payload);
});
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}
項(xiàng)目啟動(dòng)后使用jvisualvm監(jiān)控項(xiàng)目的變化:
可以看到此時(shí)CPU使用情況,堆區(qū)、還有線程數(shù)使用情況都是正常的。
然后我們對(duì)剛剛的接口發(fā)起請(qǐng)求:
curl http://localhost:8080/threadpooloom/oom1
我們先來看看控制臺(tái)輸出,可以看到線程數(shù)沒有增加,而隊(duì)列的任務(wù)卻不斷累積。
看看jvisualvm,此時(shí)堆區(qū)內(nèi)存不斷增加,盡管發(fā)生了幾次GC,還是沒有回收到足夠的空間。最終引發(fā)OOM問題。
我們通過源碼來觀察一下newFixedThreadPool的特征,可以看到它的核心線程數(shù)和最大線程數(shù)都是傳進(jìn)來的值,這意味著無論多少個(gè)任務(wù)進(jìn)來,線程數(shù)都是nThreads。如果我們沒有足夠的線程去執(zhí)行的任務(wù)的話,任務(wù)就會(huì)堆到LinkedBlockingQueue中,從源碼中我們也能看出,LinkedBlockingQueue是無界隊(duì)列(底層是通過鏈表實(shí)現(xiàn)的):
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2. 避免使用Executors的newCachedThreadPool
再來看看第二段代碼,同樣的任務(wù)提交到newCachedThreadPool中,我們看看會(huì)發(fā)生什么。
@GetMapping("oom2")
public void oom2() {
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
printStats(threadPool);
for (int i = 0; i < 1_0000_0000; i++) {
threadPool.submit(() -> {
String payload = IntStream.rangeClosed(1, 100_0000)
.mapToObj(__ -> "b")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(payload);
});
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}
先來看看控制臺(tái),可以看到線程數(shù)正在不斷的飆升。
從jvisualvm也能看出堆區(qū)和線程數(shù)也在不斷飆升,最終導(dǎo)致OOM。
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 32744 bytes for ChunkPool::allocate
# An error report file with more information is saved as:
# F:\github\java-common-mistakes-100\hs_err_pid147400.log
我們來看看newCachedThreadPool源碼,可以看到這個(gè)線程池核心線程數(shù)初始為0,最大線程數(shù)為Integer.MAX_VALUE,而隊(duì)列使用的是SynchronousQueue,所以這個(gè)隊(duì)列等于不會(huì)存儲(chǔ)任何任務(wù)。
這就意味著我們每次提交一個(gè)任務(wù)沒有線程處理的話,線程池就會(huì)創(chuàng)建一個(gè)新的線程去處理這個(gè)任務(wù),1s內(nèi)沒有線程使用就將其銷毀。
我們的連續(xù)1e次循環(huán)提交任務(wù)就會(huì)導(dǎo)致創(chuàng)建1e個(gè)線程,最終導(dǎo)致線程數(shù)飆升,進(jìn)而引發(fā)OOM問題。
public static ExecutorService newCachedThreadPool() {
//隊(duì)列不可容納元素,最大線程數(shù)設(shè)置為Integer.MAX_VALUE
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
3. 確保你創(chuàng)建線程池的方式線程可以被復(fù)用
我們監(jiān)控發(fā)現(xiàn)某段時(shí)間線程會(huì)不斷飆升,然后急速下降,然后急速上升:
然后我們?cè)诰€程的棧幀中看到SynchronousQueue,大概率有人使用newCachedThreadPool。
最終通過全局搜索看到這樣一段代碼,可以看到這個(gè)工具類每次請(qǐng)求就會(huì)創(chuàng)建一個(gè)newCachedThreadPool給用戶使用。
static class ThreadPoolHelper {
public static ThreadPoolExecutor getThreadPool() {
return (ThreadPoolExecutor) Executors.newCachedThreadPool();
}
}
我們?cè)诙ㄎ坏秸{(diào)用出,真想明了了,原來每一次請(qǐng)求都會(huì)創(chuàng)建一個(gè)newCachedThreadPool處理大量的任務(wù),由于newCachedThreadPool回收時(shí)間為1s,所以線程使用完之后立刻就被回收了。
@GetMapping("wrong")
public String wrong() {
ThreadPoolExecutor threadPool = ThreadPoolHelper.getThreadPool();
IntStream.rangeClosed(1, 20).forEach(i -> {
threadPool.execute(() -> {
String payload = IntStream.rangeClosed(1, 1000000)
.mapToObj(__ -> "a")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
log.debug(payload);
});
});
return"ok";
}
解決方式也很簡(jiǎn)單,我們按需調(diào)整線程池參數(shù),將線程池作為靜態(tài)變量全局復(fù)用即可。
static class ThreadPoolHelper {
privatestatic ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10,
50,
2,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get());
public static ThreadPoolExecutor getRightThreadPool() {
return threadPool;
}
}
從監(jiān)控來看線程數(shù)正常多了。
4. 仔細(xì)斟酌線程混用策略
我們使用線程池來處理一些異步任務(wù),每個(gè)任務(wù)耗時(shí)10ms左右。
@GetMapping("wrong")
public int wrong() throws ExecutionException, InterruptedException {
return threadPool.submit(calcTask()).get();
}
private Callable<Integer> calcTask() {
return () -> {
log.info("執(zhí)行異步任務(wù)");
TimeUnit.MILLISECONDS.sleep(10);
return1;
};
}
壓測(cè)的時(shí)候發(fā)現(xiàn)性能很差,處理時(shí)間最長(zhǎng)要283ms。
步入線程池發(fā)現(xiàn),線程池的配置如下,只有2個(gè)線程和50個(gè)隊(duì)列。
private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
2,
1,
TimeUnit.HOURS,
new ArrayBlockingQueue<>(50),
new ThreadFactoryBuilder().setNameFormat("batchfileprocess-threadpool-%d").get(),
new ThreadPoolExecutor.CallerRunsPolicy());
查看調(diào)用發(fā)現(xiàn),原來后臺(tái)有一個(gè)處理字符串并將內(nèi)容寫入到文本文件的操作,綜合來看相當(dāng)于一個(gè)計(jì)算型任務(wù),由于這個(gè)任務(wù)不是很經(jīng)常出現(xiàn),所以開發(fā)者就設(shè)置兩個(gè)線程,并且為了讓任務(wù)能夠正確完成,拒絕策略也是使用CallerRunsPolicy,讓多出來的任務(wù)用調(diào)用者線程來執(zhí)行。
@PostConstruct
public void init() {
printStats(threadPool);
new Thread(() -> {
String payload = IntStream.rangeClosed(1, 100_0000)
.mapToObj(__ -> "a")
.collect(Collectors.joining(""));
while (true) {
threadPool.execute(() -> {
try {
Files.write(Paths.get("demo.txt"), Collections.singletonList(LocalTime.now().toString() + ":" + payload), UTF_8, CREATE, TRUNCATE_EXISTING);
} catch (IOException e) {
e.printStackTrace();
}
// log.info("batch file processing done");
});
}
}, "T1").start();
}
解決方式也很簡(jiǎn)單,上述線程池并不是為我們這種IO密集型任務(wù)準(zhǔn)備的,所以我們單獨(dú)為其劃分一個(gè)線程池出來處理這些任務(wù)。
private ThreadPoolExecutor asyncCalcThreadPool = new ThreadPoolExecutor(200,
200,
1,
TimeUnit.HOURS,
new ArrayBlockingQueue<>(50),
new ThreadFactoryBuilder().setNameFormat("asynccalc-threadpool-%d").get(),
new ThreadPoolExecutor.CallerRunsPolicy());
@GetMapping("wrong")
public int wrong() throws ExecutionException, InterruptedException {
return asyncCalcThreadPool.submit(calcTask()).get();
}
經(jīng)過壓測(cè)可以發(fā)現(xiàn)性能明顯上來了:
5. 使用正確的方式提交任務(wù)
假如我們提交給線程池的任務(wù)沒有返回值,我們建議使用execute。
這一點(diǎn)我們不妨看一下這樣一段代碼,該代碼會(huì)循環(huán)提交10個(gè)算術(shù)異常的任務(wù)給線程池??梢钥吹轿覀兲峤坏娜蝿?wù)是沒有返回值的,而我們提交任務(wù)時(shí)卻用到了submit。使用submit提交任務(wù)時(shí),會(huì)返回一個(gè)Future對(duì)象,通過Future對(duì)象我們可以使用get方法阻塞獲取任務(wù)返回結(jié)果。
因?yàn)槲覀兊娜蝿?wù)是沒有返回值的,所以我們提交過程中并沒有通過get方法獲取返回結(jié)果,這就導(dǎo)致了一個(gè)隱患——吞異常。
private static ExecutorService threadPool = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
//提交一個(gè)算術(shù)異常的任務(wù)
threadPool.submit(() -> {
log.info("開始執(zhí)行運(yùn)算");
int r = 1 / 0;
log.info("結(jié)束執(zhí)行運(yùn)算");
});
}
//等待線程池關(guān)閉
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}
可以看到這段代碼的輸出結(jié)果如下,控制臺(tái)僅僅輸出線程開始工作,卻沒有輸出結(jié)果。
09:15:36.940 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
這一點(diǎn),我們通過查看FutureTask的run源碼可以得知,F(xiàn)utureTask的run方法執(zhí)行步驟如下:
- 調(diào)用call方法,執(zhí)行任務(wù)。
- 得到result后將ran設(shè)置為true。
- 如果執(zhí)行過程中報(bào)錯(cuò),直接進(jìn)入catch模塊,將result設(shè)置為null,并將ran設(shè)置為false。
- 調(diào)用setException處理異常。
try { //執(zhí)行任務(wù),返回一個(gè)結(jié)果賦值給result
result = c.call();
ran = true;
} catch (Throwable ex) {
//任務(wù)拋出異常后,將result設(shè)置為null,ran狀態(tài)設(shè)置為false,并調(diào)用setException處理異常
result = null;
ran = false;
setException(ex);
}
步入代碼查看setException我們可以發(fā)現(xiàn),它會(huì)將異常結(jié)果賦值給outcome然后調(diào)用finishCompletion結(jié)束任務(wù),所以如果我們沒有主動(dòng)獲取任務(wù)結(jié)果,那么這個(gè)錯(cuò)誤就永遠(yuǎn)不會(huì)被感知。
protected void setException(Throwable t) {
//通過cas將結(jié)果設(shè)置為完成(COMPLETING)值為1
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//將異常賦值給outcome直接將任務(wù)結(jié)束
outcome = t;
//通過cas將結(jié)果設(shè)置為異常(EXCEPTIONAL)值為3
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
針對(duì)上述問題,要想獲取異常也很簡(jiǎn)單,主動(dòng)調(diào)用get獲取結(jié)果即可:
private static ExecutorService threadPool = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
//提交一個(gè)算術(shù)異常的任務(wù)
Future<?> future = threadPool.submit(() -> {
System.out.println(Thread.currentThread().getName() + " do working");
int r = 1 / 0;
System.out.println(r);
});
try {
//通過get阻塞獲取任務(wù)結(jié)果
Object o = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
threadPool.shutdown();
while (!threadPool.isTerminated()){
}
}
從輸出結(jié)果可以看到出現(xiàn)異常后,錯(cuò)誤直接拋出,我們就可以及時(shí)調(diào)試處理了。
pool-1-thread-1do working
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
pool-1-thread-1do working
pool-1-thread-1do working
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
pool-1-thread-1do working
pool-1-thread-1do working
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
pool-1-thread-1do working
at com.sharkChili.threadpool.Main.main(Main.java:23)
Caused by: java.lang.ArithmeticException: / by zero
at com.sharkChili.threadpool.Main.lambda$main$0(Main.java:17)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
為什么調(diào)用get才能捕獲到異常呢?通過查看get源碼可以了解到get方法的執(zhí)行步驟:
- 獲取任務(wù)執(zhí)行狀態(tài)state。
- 如果state小于COMPLETING(COMPLETING值為1)說明任務(wù)未完成,則調(diào)用awaitDone等待任務(wù)完成。
- 如果大于1則說明任務(wù)已完成(),通過上文源碼可知我們的任務(wù)已經(jīng)被CAS設(shè)置為EXCEPTIONAL(值為3),所以直接調(diào)用report。
public V get() throws InterruptedException, ExecutionException {
int s = state;
//如果s小于1則說命未完成,調(diào)用awaitDone等待完成,在調(diào)用report
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
查看report代碼我們終于知道原因了,我們?nèi)蝿?wù)執(zhí)行報(bào)錯(cuò)所以s的值為3,小于CANCELLED,所以調(diào)用了最后一段代碼將異常拋出了。
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
通過上述我們知道的submit使用不當(dāng)可能存在吞異常的情況以及應(yīng)對(duì)辦法,實(shí)際上對(duì)于沒有返回值的任務(wù),我們建議直接使用execute,execute感知異常時(shí)會(huì)直接將任務(wù)拋出:
private static ExecutorService threadPool = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
//提交一個(gè)算術(shù)異常的任務(wù)
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " do working");
int r = 1 / 0;
System.out.println(r);
});
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}
從輸出結(jié)果來看,算術(shù)異常直接拋出,被主線程感知了。
pool-1-thread-1do working
Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-2" java.lang.ArithmeticException: / by zero
pool-1-thread-2do working
pool-1-thread-3do working
at com.sharkChili.threadpool.Main.lambda$main$0(Main.java:17)
pool-1-thread-4do working
pool-1-thread-5do working
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
pool-1-thread-6do working
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
pool-1-thread-7do working
通過查看sumbit執(zhí)行源碼,我們可以看到代碼調(diào)用棧會(huì)來到ThreadPoolExecutor的runWorker下面這個(gè)代碼段的邏輯:
- 調(diào)用run執(zhí)行任務(wù)。
- afterExecute收尾任務(wù)。
- 如果感知異常則拋出異常throw x。
- 所以我們的任務(wù)會(huì)因?yàn)樗阈g(shù)異常而拋出任務(wù)。
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執(zhí)行算數(shù)任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; thrownew Error(x);
} finally {
afterExecute(task, thrown);
}
最終代碼被JVM感知直接將異常拋到控制臺(tái),所以對(duì)于沒有返回值的任務(wù),我們建議使用execute執(zhí)行任務(wù)。
private void dispatchUncaughtException(Throwable e) {
getUncaughtExceptionHandler().uncaughtException(this, e);
}
6. 避免任務(wù)頻繁拋出異常
上文提到使用execute提交無返回值的任務(wù),這樣異常就會(huì)被感知,但還需要注意的是頻繁的拋出異常會(huì)讓線程消亡,導(dǎo)致線程池每次執(zhí)行新任務(wù)時(shí)回去創(chuàng)建新的線程。
還是以這段代碼為例,我們對(duì)于算術(shù)異常沒有任務(wù)處理。
private static ExecutorService threadPool = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
//提交一個(gè)算術(shù)異常的任務(wù)
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " do working");
int r = 1 / 0;
System.out.println(r);
});
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}
可以看到我們明明只有一個(gè)線程的線程池,每次拋出異常后,都會(huì)創(chuàng)建一個(gè)新的線程處理任務(wù)。
pool-1-thread-1 do working
pool-1-thread-2 do working
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at com.sharkChili.threadpool.Main.lambda$main$0(Main.java:17)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "pool-1-thread-2" pool-1-thread-3 do working
這一點(diǎn)我們從源碼中可知,拋出的異常被JVM感知并調(diào)用dispatchUncaughtException方法,該方法會(huì)通過getUncaughtExceptionHandler得到線程組,然后調(diào)用uncaughtException處理異常。
private void dispatchUncaughtException(Throwable e) {
getUncaughtExceptionHandler().uncaughtException(this, e);
}
最終代碼會(huì)走到e.printStackTrace打印異常堆棧信息并終止任務(wù),銷毀線程。
public void uncaughtException(Thread t, Throwable e) {
if (parent != null) {
parent.uncaughtException(t, e);
} else {
Thread.UncaughtExceptionHandler ueh =
Thread.getDefaultUncaughtExceptionHandler();
if (ueh != null) {
ueh.uncaughtException(t, e);
} elseif (!(e instanceof ThreadDeath)) {
System.err.print("Exception in thread \""
+ t.getName() + "\" ");
e.printStackTrace(System.err);
}
}
}
所以我們建議,對(duì)于線程池的中的任務(wù)盡可能不要用異常來處理邏輯,對(duì)于可以預(yù)見的異常,我們建議手動(dòng)處理返回,避免線程銷毀再創(chuàng)建的開銷。
以我們的算術(shù)異常為例,我們可以提前判斷一下被處數(shù)提前用業(yè)務(wù)手段處理掉異常。
private static ExecutorService threadPool = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
//提交一個(gè)算術(shù)異常的任務(wù)
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " do working");
//手動(dòng)處理業(yè)務(wù)代碼的異常
int num= RandomUtil.randomInt(0,10);
if (num==0){
System.out.println("The dividend cannot be zero. ");
return;
}
int r = 1 / num;
System.out.println(r);
});
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}
三、小結(jié)
總結(jié)一下上述線程池的使用經(jīng)驗(yàn):
- 避免使用Executors創(chuàng)建線程池。
- 確保線程確實(shí)被服用到。
- 使用合適的方式提交任務(wù)以及及時(shí)處理任務(wù)中的異常。
- 確保在合適的場(chǎng)景使用合適的線程池:
CPU密集型:若是CPU密集型,我們希望多利用CPU資源來處理任務(wù),因?yàn)闆]有任何IO,理想情況線程數(shù)=CPU核心數(shù)即可,但是考慮到可能回出現(xiàn)某個(gè)意外情況導(dǎo)致線程阻塞,所以我們建議線程數(shù)=CPU核心數(shù)+1
IO密集型:IO密集型由于每個(gè)任務(wù)可能回出現(xiàn)IO導(dǎo)致任務(wù)阻塞,在單核情況下,我們建議:
線程數(shù)=IO時(shí)長(zhǎng)/CPU計(jì)算耗時(shí)+1
若在多核的情況下,我們建議
線程數(shù)=CPU核心數(shù) * (IO時(shí)長(zhǎng)/CPU計(jì)算耗時(shí)+1)
但是具體情況還要具體結(jié)合壓測(cè)結(jié)果進(jìn)行響應(yīng)調(diào)整。