90%的人(包括我)都以為會用ThreadPoolExecutor了,看了這十張圖再說吧!
在阿里巴巴手冊中有一條建議:
【強制】線程池不允許使用 Executors 去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
如果經常基于Executors提供的工廠方法創建線程池,很容易忽略線程池內部的實現。特別是拒絕策略,因使用Executors創建線程池時不會傳入這個參數,直接采用默認值,所以常常被忽略。
下面我們就來了解一下線程池相關的實現原理、API以及實例。
線程池的作用
在實踐應用中創建線程池主要是為了:
- 減少資源開銷:減少每次創建、銷毀線程的開銷;
- 提高響應速度:請求到來時,線程已創建好,可直接執行,提高響應速度;
- 提高線程的可管理性:線程是稀缺資源,需根據情況加以限制,確保系統穩定運行;
ThreadPoolExecutor
ThreadPoolExecutor可以實現線程池的創建。ThreadPoolExecutor相關類圖如下:
類圖
從類圖可以看出,ThreadPoolExecutor最終實現了Executor接口,是線程池創建的真正實現者。
Executor兩級調度模型
Executor模型
在HotSpot虛擬機中,Java中的線程將會被一一映射為操作系統的線程。在Java虛擬機層面,用戶將多個任務提交給Executor框架,Executor負責分配線程執行它們;在操作系統層面,操作系統再將這些線程分配給處理器執行。
ThreadPoolExecutor的三個角色
任務
ThreadPoolExecutor接受兩種類型的任務:Callable和Runnable。
- Callable:該類任務有返回結果,可以拋出異常。通過submit方法提交,返回Future對象。通過get獲取執行結果。
- Runnable:該類任務只執行,無法獲取返回結果,在執行過程中無法拋異常。通過execute或submit方法提交。
任務執行器
Executor框架最核心的接口是Executor,它表示任務的執行器。
通過上面類圖可以看出,Executor的子接口為ExecutorService。再往底層有兩大實現類:ThreadPoolExecutor和ScheduledThreadPoolExecutor(集成自ThreadPoolExecutor)。
執行結果
Future接口表示異步的執行結果,它的實現類為FutureTask。
三個角色之間的處理邏輯圖如下:
FutureTask邏輯
線程池處理流程
線程池處理流程
一個線程從被提交(submit)到執行共經歷以下流程:
- 線程池判斷核心線程池里是的線程是否都在執行任務,如果不是,則創建一個新的工作線程來執行任務。如果核心線程池里的線程都在執行任務,則進入下一個流程;
- 線程池判斷工作隊列是否已滿。如果工作隊列沒有滿,則將新提交的任務儲存在這個工作隊列里。如果工作隊列滿了,則進入下一個流程;
- 線程池判斷其內部線程是否都處于工作狀態。如果沒有,則創建一個新的工作線程來執行任務。如果已滿了,則交給飽和策略來處理這個任務。
線程池在執行execute方法時,主要有以下四種情況:
線程池執行excute方法
- 如果當前運行的線程少于corePoolSize,則創建新線程來執行任務(需要獲得全局鎖);
- 如果運行的線程等于或多于corePoolSize,則將任務加入BlockingQueue;
- 如果無法將任務加入BlockingQueue(隊列已滿),則創建新的線程來處理任務(需要獲得全局鎖);
- 如果創建新線程將使當前運行的線程超出maxiumPoolSize,任務將被拒絕,并調用RejectedExecutionHandler.rejectedExecution()方法。
線程池采取上述的流程進行設計是為了減少獲取全局鎖的次數。在線程池完成預熱(當前運行的線程數大于或等于corePoolSize)之后,幾乎所有的excute方法調用都執行步驟二。
線程的狀態流轉
順便再回顧一下線程的狀態的轉換,在JDK中Thread類中提供了一個枚舉類,例舉了線程的各個狀態:
- public enum State {
- NEW,
- RUNNABLE,
- BLOCKED,
- WAITING,
- TIMED_WAITING,
- TERMINATED;
- }
一共定義了6個枚舉值,其實代表的是5種類型的線程狀態:
- NEW:新建;
- RUNNABLE:運行狀態;
- BLOCKED:阻塞狀態;
- WAITING:等待狀態,WAITING和TIMED_WAITING可以歸為一類,都屬于等待狀態,只是后者可以設置等待時間,即等待多久;
- TERMINATED:終止狀態;
線程關系轉換圖:
線程狀態轉換
當new Thread()說明這個線程處于NEW(新建狀態);調用Thread.start()方法表示這個線程處于RUNNABLE(運行狀態);但是RUNNABLE狀態中又包含了兩種狀態:READY(就緒狀態)和RUNNING(運行中)。調用start()方法,線程不一定獲得了CPU時間片,這時就處于READY,等待CPU時間片,當獲得了CPU時間片,就處于RUNNING狀態。
在運行中調用synchronized同步的代碼塊,沒有獲取到鎖,這時會處于BLOCKED(阻塞狀態),當重新獲取到鎖時,又會變為RUNNING狀態。在代碼執行的過程中可能會碰到Object.wait()等一些等待方法,線程的狀態又會轉變為WAITING(等待狀態),等待被喚醒,當調用了Object.notifyAll()喚醒了之后線程執行完就會變為TERMINATED(終止狀態)。
線程池的狀態
線程池中狀態通過2個二進制位(bit)來表示線程池的5個狀態:RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED:
- RUNNING:線程池正常工作的狀態,在 RUNNING 狀態下線程池接受新的任務并處理任務隊列中的任務;
- SHUTDOWN:調用shutdown()方法會進入 SHUTDOWN 狀態。在 SHUTDOWN 狀態下,線程池不接受新的任務,但是會繼續執行任務隊列中已有的任務;
- STOP:調用shutdownNow()會進入 STOP 狀態。在 STOP 狀態下線程池既不接受新的任務,也不處理已經在隊列中的任務。對于還在執行任務的工作線程,線程池會發起中斷請求來中斷正在執行的任務,同時會清空任務隊列中還未被執行的任務;
- TIDYING:當線程池中的所有執行任務的工作線程都已經終止,并且工作線程集合為空的時候,進入 TIDYING 狀態;
- TERMINATED:當線程池執行完terminated()鉤子方法以后,線程池進入終態 TERMINATED;
ThreadPoolExecutor API
ThreadPoolExecutor創建線程池API:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
參數解釋:
- corePoolSize :線程池常駐核心線程數。創建線程池時,線程池中并沒有任何線程,當有任務來時才去創建線程,執行任務。提交一個任務,創建一個線程,直到需要執行的任務數大于線程池基本大小,則不再創建。當創建的線程數等于corePoolSize 時,會加入設置的阻塞隊列。
- maximumPoolSize :線程池允許創建的最大線程數。當隊列滿時,會創建線程執行任務直到線程池中的數量等于maximumPoolSize。
- keepAliveTime :當線程數大于核心時,此為終止前多余的空閑線程等待新任務的最長時間。
- unit :keepAliveTime的時間單位,可選項:天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微妙)。
- workQueue :用來儲存等待執行任務的隊列。
- threadFactory :線程工廠,用來生產一組相同任務的線程。主要用于設置生成的線程名詞前綴、是否為守護線程以及優先級等。設置有意義的名稱前綴有利于在進行虛擬機分析時,知道線程是由哪個線程工廠創建的。
- handler :執行拒絕策略對象。當達到任務緩存上限時(即超過workQueue參數能存儲的任務數),執行拒接策略。也就是當任務處理不過來的時候,線程池開始執行拒絕策略。JDK 1.5提供了四種飽和策略:
- AbortPolicy:默認,直接拋異常;
- 只用調用者所在的線程執行任務,重試添加當前的任務,它會自動重復調用execute()方法;
- DiscardOldestPolicy:丟棄任務隊列中最久的任務;
- DiscardPolicy:丟棄當前任務;
適當的阻塞隊列
當創建的線程數等于corePoolSize,會將任務加入阻塞隊列(BlockingQueue),維護著等待執行的Runnable對象。
阻塞隊列通常有如下類型:
- ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。可以限定隊列的長度,接收到任務時,如果沒有達到corePoolSize的值,則新建線程(核心線程)執行任務,如果達到了,則入隊等候,如果隊列已滿,則新建線程(非核心線程)執行任務,又如果總線程數到了maximumPoolSize,并且隊列也滿了,則發生錯誤。
- LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。這個隊列在接收到任務時,如果當前線程數小于核心線程數,則新建線程(核心線程)處理任務;如果當前線程數等于核心線程數,則進入隊列等待。由于這個隊列沒有最大值限制,即所有超過核心線程數的任務都將被添加到隊列中,這也就導致了maximumPoolSize的設定失效,因為總線程數永遠不會超過corePoolSize。
- PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
- DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。隊列內元素必須實現Delayed接口,這就意味著傳入的任務必須先實現Delayed接口。這個隊列在接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務。
- SynchronousQueue:一個不存儲元素的阻塞隊列。這個隊列在接收到任務時,會直接提交給線程處理,而不保留它,如果所有線程都在工作就新建一個線程來處理這個任務。所以為了保證不出現【線程數達到了maximumPoolSize而不能新建線程】的錯誤,使用這個類型隊列時,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大。
- LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
- LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
明確的拒絕策略
當任務處理不過來時,線程池開始執行拒絕策略。
支持的拒絕策略:
- ThreadPoolExecutor.AbortPolicy: 丟棄任務并拋出RejectedExecutionException異常。(默認)
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務。(重復此過程)
- ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務。
線程池關閉
- shutdown:將線程池狀態置為SHUTDOWN,并不會立即停止。停止接收外部submit的任務,內部正在跑的任務和隊列里等待的任務,會執行完后,才真正停止。
- shutdownNow:將線程池狀態置為STOP。企圖立即停止,事實上不一定,跟shutdown()一樣,先停止接收外部提交的任務,忽略隊列里等待的任務,嘗試將正在跑的任務interrupt中斷(如果線程未處于sleep、wait、condition、定時鎖狀態,interrupt無法中斷當前線程),返回未執行的任務列表。
- awaitTermination(long timeOut, TimeUnit unit)當前線程阻塞,直到等所有已提交的任務(包括正在跑的和隊列中等待的)執行完或者等超時時間到或者線程被中斷,拋出InterruptedException,然后返回true(shutdown請求后所有任務執行完畢)或false(已超時)。
Executors
Executors是一個幫助類,提供了創建幾種預配置線程池實例的方法:newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool等。
如果查看源碼就會發現,Executors本質上就是實現了幾類默認的ThreadPoolExecutor。而阿里巴巴開發手冊,不建議采用Executors默認的,讓使用者直接通過ThreadPoolExecutor來創建。
Executors.newSingleThreadExecutor()
創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當于單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。
- new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
該類型線程池的結構圖:
newSingleThreadExecutor
該線程池的特點:
- 只會創建一條工作線程處理任務;
- 采用的阻塞隊列為LinkedBlockingQueue;
Executors.newFixedThreadPool()
創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
- new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
該類型線程池的結構圖:
newFixedThreadPool
該線程池的特點:
- 固定大小;
- corePoolSize和maximunPoolSize都為用戶設定的線程數量nThreads;
- keepAliveTime為0,意味著一旦有多余的空閑線程,就會被立即停止掉;但這里keepAliveTime無效;
- 阻塞隊列采用了LinkedBlockingQueue,一個無界隊列;
- 由于阻塞隊列是一個無界隊列,因此永遠不可能拒絕任務;
- 由于采用了無界隊列,實際線程數量將永遠維持在nThreads,因此maximumPoolSize和keepAliveTime將無效。
Executors.newCachedThreadPool()
創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(或者說JVM)能夠創建的最大線程大小。
- new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
該類型線程池的結構圖:
newCachedThreadPool
- 該線程池的特點:
- 可以無限擴大;
- 比較適合處理執行時間比較小的任務;
- corePoolSize為0,maximumPoolSize為無限大,意味著線程數量可以無限大;
- keepAliveTime為60S,意味著線程空閑時間超過60s就會被殺死;
- 采用SynchronousQueue裝等待的任務,這個阻塞隊列沒有存儲空間,這意味著只要有請求到來,就必須要找到一條工作線程處理它,如果當前沒有空閑的線程,那么就會再創建一條新的線程。
Executors.newScheduledThreadPool()
創建一個定長線程池,支持定時及周期性任務執行。
- new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue());
該線程池類圖:
newScheduledThreadPool
該線程池的特點:
- 接收SchduledFutureTask類型的任務,有兩種提交任務的方式:scheduledAtFixedRate和scheduledWithFixedDelay。SchduledFutureTask接收的參數:
- time:任務開始的時間
- sequenceNumber:任務的序號
- period:任務執行的時間間隔
- 采用DelayQueue存儲等待的任務;
- DelayQueue內部封裝了一個PriorityQueue,它會根據time的先后時間排序,若time相同則根據sequenceNumber排序;
- DelayQueue也是一個無界隊列;
- 工作線程執行時,工作線程會從DelayQueue取已經到期的任務去執行;執行結束后重新設置任務的到期時間,再次放回DelayQueue;
Executors.newWorkStealingPool()
JDK8引入,創建持有足夠線程的線程池支持給定的并行度,并通過使用多個隊列減少競爭。
- public static ExecutorService newWorkStealingPool() {
- return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
- ForkJoinPool.defaultForkJoinWorkerThreadFactory,
- null, true);
- }
Executors方法的弊端
1)newFixedThreadPool 和 newSingleThreadExecutor:允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。2)newCachedThreadPool 和 newScheduledThreadPool:允許的創建線程數量為Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。
合理配置線程池大小
合理配置線程池,需要先分析任務特性,可以從以下角度來進行分析:
- 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
- 任務的優先級:高,中和低。
- 任務的執行時間:長,中和短。
- 任務的依賴性:是否依賴其他系統資源,如數據庫連接。
另外,還需要查看系統的內核數:
- Runtime.getRuntime().availableProcessors());
根據任務所需要的CPU和IO資源可以分為:
- CPU密集型任務: 主要是執行計算任務,響應時間很快,CPU一直在運行。一般公式:線程數 = CPU核數 + 1。只有在真正的多核CPU上才能得到加速,優點是不存在線程切換開銷,提高了CPU的利用率并減少了線程切換的效能損耗。
- IO密集型任務:主要是進行IO操作,CPU并不是一直在執行任務,IO操作(CPU空閑狀態)的時間較長,應配置盡可能多的線程,其中的線程在IO操作時,其他線程可以繼續利用CPU,從而提高CPU的利用率。一般公式:線程數 = CPU核數 * 2。
使用實例
任務實現類:
- /**
- * 任務實現線程
- * @author sec
- * @version 1.0
- * @date 2021/10/30
- **/
- public class MyThread implements Runnable{
- private final Integer number;
- public MyThread(int number){
- this.number = number;
- }
- public Integer getNumber() {
- return number;
- }
- @Override
- public void run() {
- try {
- // 業務處理
- TimeUnit.SECONDS.sleep(1);
- System.out.println("Hello! ThreadPoolExecutor - " + getNumber());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
自定義阻塞提交的ThreadLocalExcutor:
- /**
- * 自定義阻塞提交的ThreadPoolExecutor
- * @author sec
- * @version 1.0
- * @date 2021/10/30
- **/
- public class CustomBlockThreadPoolExecutor {
- private ThreadPoolExecutor pool = null;
- /**
- * 線程池初始化方法
- */
- public void init() {
- // 核心線程池大小
- int poolSize = 2;
- // 最大線程池大小
- int maxPoolSize = 4;
- // 線程池中超過corePoolSize數目的空閑線程最大存活時間:30+單位TimeUnit
- long keepAliveTime = 30L;
- // ArrayBlockingQueue<Runnable> 阻塞隊列容量30
- int arrayBlockingQueueSize = 30;
- pool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime,
- TimeUnit.SECONDS, new ArrayBlockingQueue<>(arrayBlockingQueueSize), new CustomThreadFactory(),
- new CustomRejectedExecutionHandler());
- }
- /**
- * 關閉線程池方法
- */
- public void destroy() {
- if (pool != null) {
- pool.shutdownNow();
- }
- }
- public ExecutorService getCustomThreadPoolExecutor() {
- return this.pool;
- }
- /**
- * 自定義線程工廠類,
- * 生成的線程名詞前綴、是否為守護線程以及優先級等
- */
- private static class CustomThreadFactory implements ThreadFactory {
- private final AtomicInteger count = new AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- String threadName = CustomBlockThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
- t.setName(threadName);
- return t;
- }
- }
- /**
- * 自定義拒絕策略對象
- */
- private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- // 核心改造點,將blockingqueue的offer改成put阻塞提交
- try {
- executor.getQueue().put(r);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 當提交任務被拒絕時,進入拒絕機制,實現拒絕方法,把任務重新用阻塞提交方法put提交,實現阻塞提交任務功能,防止隊列過大,OOM
- */
- public static void main(String[] args) {
- CustomBlockThreadPoolExecutor executor = new CustomBlockThreadPoolExecutor();
- // 初始化
- executor.init();
- ExecutorService pool = executor.getCustomThreadPoolExecutor();
- for (int i = 1; i < 51; i++) {
- MyThread myThread = new MyThread(i);
- System.out.println("提交第" + i + "個任務");
- pool.execute(myThread);
- }
- pool.shutdown();
- try {
- // 阻塞,超時時間到或者線程被中斷
- if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
- // 立即關閉
- executor.destroy();
- }
- } catch (InterruptedException e) {
- executor.destroy();
- }
- }
- }
小結
看似簡單的線程池創建,其中卻蘊含著各類知識,融合貫通,根據具體場景采用具體的參數進行設置才能夠達到最優的效果。
總結一下就是:
- 用ThreadPoolExecutor自定義線程池,要看線程的用途。如果任務量不大,可以用無界隊列,如果任務量非常大,要用有界隊列,防止OOM;
- 如果任務量很大,且要求每個任務都處理成功,要對提交的任務進行阻塞提交,重寫拒絕機制,改為阻塞提交。保證不拋棄一個任務;
- 最大線程數一般設為2N+1最好,N是CPU核數;
- 核心線程數,要根據任務是CPU密集型,還是IO密集型。同時,如果任務是一天跑一次,設置為0合適,因為跑完就停掉了;
- 如果要獲取任務執行結果,用CompletionService,但是注意,獲取任務的結果要重新開一個線程獲取,如果在主線程獲取,就要等任務都提交后才獲取,就會阻塞大量任務結果,隊列過大OOM,所以最好異步開個線程獲取結果。
參考文章:
[1]https://www.jianshu.com/p/94852bd1a283
[2]https://blog.csdn.net/jek123456/article/details/90601351
[3]https://blog.csdn.net/z_s_z2016/article/details/81674893
[4]https://zhuanlan.zhihu.com/p/33264000
[5]https://www.cnblogs.com/semi-sub/p/13021786.html