Java線程進階之ThreadPoolExecutor線程池執行原理機制詳解
前言
線程池有很多優點:
降低資源消耗;
提高響應速度;
提高線程的可管理性等等;
今天我們就來分析探討下原理實現
一、線程池接口簡單分析
1、Executor接口
- public interface Executor {
- // 執行一個任務。任務都被封裝成Runnable的實現
- void execute(Runnable command);
- }
2、 ExecutorService接口
- public interface ExecutorService extends Executor {
- // 啟動有序的關閉,之前提交的任務將會被執行,但不會接受新的任務。
- void shutdown();
- // 嘗試停止所有正在執行的任務,停止等待處理的任務,病返回任務列表
- List<Runnable> shutdownNow();
- // 判斷線程池是否已經關閉
- boolean isShutdown();
- // 如果關閉后所有任務都已完成。但是前提是必須先執行:shutdown 或者 shutdownNow
- boolean isTerminated();
- // 在開啟shutdown之后,阻止所有的任務知道執行完成
- boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException;
- // 提交任務,帶返回結果的
- <T> Future<T> submit(Callable<T> task);
- // 提交任務,封裝返回結果為T
- <T> Future<T> submit(Runnable task, T result);
- // 提交一個普通任務,返回結果任意
- Future<?> submit(Runnable task);
- // 執行一批任務,返回結果為 List<Future<T>>
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException;
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException;
- <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException;
- <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }
其具有5個核心的內部類。其中4內部類對應的是拒絕策略。Worker是核心的執行代碼;
3、 RejectedExecutionHandler
- public interface RejectedExecutionHandler {
- // 拒絕執行策略
- void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
- }
4、 AbortPolicy 策略
Java線程池默認的阻塞策略,不執行此任務,而且直接拋出一個運行時異常
- public static class AbortPolicy implements RejectedExecutionHandler {
- public AbortPolicy() { }
- // 直接拋出異常,描述前線程的基本信息
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- throw new RejectedExecutionException("Task " + r.toString() +
- " rejected from " +
- e.toString());
- }
- }
5、DiscardPolicy策略
空方法,不做任何處理
- public static class DiscardPolicy implements RejectedExecutionHandler {
- public DiscardPolicy() { }
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- }
- }
6、DiscardOldestPolicy 策略
從隊列里面拋棄一個最老的任務,并再次execute 此task
- public static class DiscardOldestPolicy implements RejectedExecutionHandler {
- public DiscardOldestPolicy() { }
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- if (!e.isShutdown()) {
- // 從隊列里面取出最老的一個任務
- e.getQueue().poll();
- // 手動調用execute方法執行,將任務添加到隊列中
- e.execute(r);
- }
- }
- }
7、CallerRunsPolicy 策略
- public static class CallerRunsPolicy implements RejectedExecutionHandler {
- /**
- * Creates a {@code CallerRunsPolicy}.
- */
- public CallerRunsPolicy() { }
- // 如果當前線程池沒有關閉,則調用線程的run方法
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- if (!e.isShutdown()) {
- r.run();
- }
- }
- }
8、ThreadPoolExecutor
構造函數詳解
- public class ThreadPoolExecutor extends AbstractExecutorService {
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
- Executors.defaultThreadFactory(), defaultHandler);
- }
- }
構造函數參數說明:
- corePoolSize:線程池中的核心線程數,空閑時候線程也不會回收,除非把allowCoreThreadTimeOut設置為 true,這時核心線程才會被回收;
- maximumPoolSize:線程池中可以創建的最大線程數,限定為2^29-1;
- keepAliveTime:當線程池中創建的線程超過了核心線程數的時候,在沒有新任務加入的等待時間;
- workQueue:存放任務的隊列,只有當線程數 > 核心線程數,才會把其他的任務放入queue,一般常用的是queue就是ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue;
- threadFactory:創建線程的工廠類;
- handler:當queue滿了和線程數達到最大限制,對于繼續到達的任務采取的策略。默認采取AbortPolicy , 也就是拒絕策略,直接拋出異常;
9、核心成員變量分析
- 線程池中設計非常巧妙的一個地方是把線程池的狀態和運行的線程數量用一個int類型進行存儲;這樣一來可以保持線程池狀態和線程池活躍線程數量的一致性。因為AtomicInteger是線程安全的;
- workerCount:線程池中當前活動的線程數量,占據ctl的低29位;
- runState:線程池運行狀態,占據ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五種狀態;
- 為了將線程池的狀態和線程池中的工作線程的數量放到一個int里面進行管理。他們利用了二進制數據進行位運算。其中int類型有4個字節,一個字節8位。總共有32位。其中高的3位表示線程的狀態。低29位代表線程的數量;
其中32位中,高三位代表的是狀態:
- 111 > RUNNING
- 000 > SHUTDOWN
- 001 > STOP
- 010 > TIDYING
- 110 > TERMINATED
低29位代表線程的數量。所以最大的線程數為 2^29 -1 = 536870911;
// 記錄線程池狀態和線程數量(總共32位,前三位表示線程池狀態,后29位表示線程數量),保證線程安全性
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // int 字節32位,COUNT_BITS代表的是29位
- private static final int COUNT_BITS = Integer.SIZE - 3;
- // 線程的最大容量:000 11111111111111111111111111111
- private static final int CAPACITY = (1 << COUNT_BITS) - 1;
- // 運行狀態:111 00000000000000000000000000000
- private static final int RUNNING = -1 << COUNT_BITS;
- // 關閉狀態:000 00000000000000000000000000000
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- // 停止狀態:001 00000000000000000000000000000
- private static final int STOP = 1 << COUNT_BITS;
- // 整理狀態:010 00000000000000000000000000000
- private static final int TIDYING = 2 << COUNT_BITS;
- // 終止狀態:011 00000000000000000000000000000
- private static final int TERMINATED = 3 << COUNT_BITS;
- /**
- * 是按位取反的意思,CAPACITY表示的是高位的3個0,和低位的29個1,而~CAPACITY則表示高位的3個1,2低位的9個0,
- * 然后再與入參c執行按位與操作,即高3位保持原樣,低29位全部設置為0,也就獲取了線程池的運行狀態runState
- */
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- /**
- * 返回當前線程的數量。其中c代表線程池的狀態,即是高三位。:
- * 而CAPACITY 代表的是線程的容量,即000 11111111111111111111111111111
- * c & CAPACITY ,只有當都為1的時候,才為真,這樣直接舍棄高位
- */
- private static int workerCountOf(int c) { return c & CAPACITY; }
- /**
- * 傳入的rs表示線程池運行狀態runState,其是高3位有值,低29位全部為0的int,
- * 而wc則代表線程池中有效線程的數量workerCount,其為高3位全部為0,而低29位有值得int,
- * 將runState和workerCount做或操作|處理,即用runState的高3位,workerCount的低29位填充的數字,而默認傳入的
- */
- private static int ctlOf(int rs, int wc) { return rs | wc; }
線程池的狀態轉換:
- // 調用了shutdown()方法
- RUNNING -> SHUTDOWN
- // 調用了shutdownNow()
- (RUNNING 或 SHUTDOWN) -> STOP
- // 當隊列和線程池為空
- SHUTDOWN -> TIDYING
- // 當線程池為空
- STOP -> TIDYING
- // 當terminated()鉤子方法執行完成
- TIDYING -> TERMINATED
二、線程池執行流程源碼分析
1、程序入口:execute 方法
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- //獲取當前線程池的狀態+線程個數變量
- int c = ctl.get();
- //當前線程池線程個數是否小于corePoolSize,小于則開啟新線程運行
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- //如果線程池處于RUNNING狀態,則添加任務到阻塞隊列
- if (isRunning(c) && workQueue.offer(command)) {
- //二次檢查
- int recheck = ctl.get();
- //如果當前線程池狀態不是RUNNING則從隊列刪除任務,并執行拒絕策略
- if (! isRunning(recheck) && remove(command))
- reject(command);
- //否者如果當前線程池線程空,則添加一個線程
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- //如果隊列滿了,則新增線程,新增失敗則執行拒絕策略
- else if (!addWorker(command, false))
- reject(command);
- }
- 如果當前線程池線程個數小于corePoolSize則開啟新線程;
- 否則添加任務到任務隊列;
- 如果任務隊列滿了,則嘗試新開啟線程執行任務,如果線程個數>maximumPoolSize則執行拒絕策略;
重點看addWorkder方法:
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- //1、 檢查隊列是否只在必要時為空.
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- //循環cas增加線程個數
- for (;;) {
- int wc = workerCountOf(c);
- //如果線程個數超限則返回false
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- //cas增加線程個數,同時只有一個線程成功
- if (compareAndIncrementWorkerCount(c))
- break retry;
- //cas失敗了,則看線程池狀態是否變化了,變化則跳到外層循環重試重新獲取線程池狀態,否者內層循環重新cas。
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- }
- }
- //2、到這里說明cas成功了
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- //創建worker
- final ReentrantLock mainLock = this.mainLock;
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- //加獨占鎖,為了workers同步,因為可能多個線程調用了線程池的execute方法。
- mainLock.lock();
- try {
- //重新檢查線程池狀態,為了避免在獲取鎖前調用了shutdown接口
- int c = ctl.get();
- int rs = runStateOf(c);
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- //添加任務
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- //添加成功則啟動任務
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
- 第(1)雙重循環目的是通過cas增加線程池線程個數;
- 第(2)主要是并發安全的把任務添加到workers里面,并且啟動任務執行;
- rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty())
也就是說下面幾種情況下會返回false:
- 當前線程池狀態為STOP,TIDYING,TERMINATED;
- 當前線程池狀態為SHUTDOWN并且已經有了第一個任務;
- 當前線程池狀態為SHUTDOWN并且任務隊列為空;
- 內層循環作用是使用cas增加線程個數,如果線程個數超限則返回false,否者進行cas,cas成功則退出雙循環,否者cas失敗了,要看當前線程池的狀態是否變化了,如果變了,則重新進入外層循環重新獲取線程池狀態,否者進入內層循環繼續進行cas嘗試;
- 到了第(2)說明CAS成功了,也就是說線程個數加一了,但是現在任務還沒開始執行,這里使用全局的獨占鎖來控制workers里面添加任務,其實也可以使用并發安全的set,但是性能沒有獨占鎖好;
- 這里需要注意的是要在獲取鎖后重新檢查線程池的狀態,這是因為其他線程可可能在本方法獲取鎖前改變了線程池的狀態,比如調用了shutdown方法。添加成功則啟動任務執行;
2、 工作線程Worker
先看下構造函數:
- Worker(Runnable firstTask) {
- setState(-1); // 在調用runWorker前禁止中斷
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);//創建一個線程
- }
- 這里添加一個新狀態-1是為了避免當前線程worker線程被中斷;
- 這里設置了-1所以條件不滿足就不會中斷該線程了;
- 運行runWorker時候會調用unlock方法,該方法吧status變為了0,所以這時候調用shutdownNow會中斷worker線程;
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // status設置為0,允許中斷
- boolean completedAbruptly = true;
- try {
- while (task != null || (task = getTask()) != null) {
- w.lock();
- // 如果線程池當前狀態至少是stop,則設置中斷標志;
- // 如果線程池當前狀態是RUNNININ,則重置中斷標志,重置后需要重新
- //檢查下線程池狀態,因為當重置中斷標志時候,可能調用了線程池的shutdown方法
- //改變了線程池狀態。
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- //任務執行前干一些事情
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- task.run();//執行任務
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- //任務執行完畢后干一些事情
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- //統計當前worker完成了多少個任務
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- //執行清了工作
- processWorkerExit(w, completedAbruptly);
- }
- }
- 如果當前task為空,則直接執行;
- 否者調用getTask從任務隊列獲取一個任務執行,如果任務隊列為空,則worker退出;
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // 如果當前線程池狀態>=STOP 或者線程池狀態為shutdown并且工作隊列為空則,減少工作線程個數
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- boolean timed; // Are workers subject to culling?
- for (;;) {
- int wc = workerCountOf(c);
- timed = allowCoreThreadTimeOut || wc > corePoolSize;
- if (wc <= maximumPoolSize && ! (timedOut && timed))
- break;
- if (compareAndDecrementWorkerCount(c))
- return null;
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- try {
- //根據timed選擇調用poll還是阻塞的take
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
- decrementWorkerCount();
- //統計整個線程池完成的任務個數
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- completedTaskCount += w.completedTasks;
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
- //嘗試設置線程池狀態為TERMINATED,如果當前是shutdonw狀態并且工作隊列為空
- //或者當前是stop狀態當前線程池里面沒有活動線程
- tryTerminate();
- //如果當前線程個數小于核心個數,則增加
- int c = ctl.get();
- if (runStateLessThan(c, STOP)) {
- if (!completedAbruptly) {
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- if (workerCountOf(c) >= min)
- return; // replacement not needed
- }
- addWorker(null, false);
- }
- }
3、 shutdown操作
- 調用shutdown后,線程池就不會在接受新的任務了;
- 但是工作隊列里面的任務還是要執行的,但是該方法立刻返回的,并不等待隊列任務完成在返回;
- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- //權限檢查
- checkShutdownAccess();
- //設置當前線程池狀態為SHUTDOWN,如果已經是SHUTDOWN則直接返回
- advanceRunState(SHUTDOWN);
- //設置中斷標志
- interruptIdleWorkers();
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- //嘗試狀態變為TERMINATED
- tryTerminate();
- }
如果當前狀態>=targetState則直接返回,否者設置當前狀態為targetState;
- private void advanceRunState(int targetState) {
- for (;;) {
- int c = ctl.get();
- if (runStateAtLeast(c, targetState) ||
- ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
- break;
- }
- }
- private void interruptIdleWorkers() {
- interruptIdleWorkers(false);
- }
- 設置所有線程的中斷標志,主要這里首先加了全局鎖;
- 同時只有一個線程可以調用shutdown時候設置中斷標志,然后嘗試獲取worker自己的鎖,獲取成功則設置中斷標示;
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers) {
- Thread t = w.thread;
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
4、shutdownNow操作
調用shutdown后,線程池就不會在接受新的任務了,并且丟棄工作隊列里面里面的任務,正在執行的任務會被中斷,但是該方法立刻返回的,并不等待激活的任務執行完成在返回。返回隊列里面的任務列表;
- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();//權限檢查
- advanceRunState(STOP);// 設置線程池狀態為stop
- interruptWorkers();//中斷線程
- tasks = drainQueue();//移動隊列任務到tasks
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- return tasks;
- }
- 調用隊列的drainTo一次當前隊列的元素到taskList;
- 可能失敗,如果調用drainTo后隊列海不為空,則循環刪除,并添加到taskList;
- private List<Runnable> drainQueue() {
- BlockingQueue<Runnable> q = workQueue;
- List<Runnable> taskList = new ArrayList<Runnable>();
- q.drainTo(taskList);
- if (!q.isEmpty()) {
- for (Runnable r : q.toArray(new Runnable[0])) {
- if (q.remove(r))
- taskList.add(r);
- }
- }
- return taskList;
- }
5、 awaitTermination操作
- 等待線程池狀態變為TERMINATED則返回,或者時間超時;
- 由于整個過程獨占鎖,所以一般調用shutdown或者shutdownNow后使用;
- public boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (;;) {
- if (runStateAtLeast(ctl.get(), TERMINATED))
- return true;
- if (nanos <= 0)
- return false;
- nanos = termination.awaitNanos(nanos);
- }
- } finally {
- mainLock.unlock();
- }
- }
總結
當往線程池中添加任務的時候,每次添加一個任務都回去新增一個線程。直到不滿足 wc < corePoolSize;
當前線程池的大小已經達到了corePoolSize的時候,每次添加任務會被存放到阻塞任務隊列中。等待執行;
等等待任務隊列也滿的時候,且添加失敗。此時在來新的任務,就會接著增加線程的個數,直到滿足:wc >= maximumPoolSize ,添加線程失敗執行拒絕策略;
線程池中,把線程的狀態和數量通過int類型進行維護,高三位表示狀態,低29位表示線程數量。這樣可以保證線程的狀態和數量的一致性;
線程池巧妙的使用一個Integer類型原子變量來記錄線程池狀態和線程池線程個數;