阿里架構師教你JUC-Future與FutureTask原理詳解
1 Future
Future 表示一個任務的生命周期,是一個可取消的異步運算。提供了相應的方法來判斷任務狀態(完成或取消),以及獲取任務的結果和取消任務等。適合具有可取消性和執行時間較長的異步任務。
并發包中許多異步任務類都繼承自Future,其中最典型的就是 FutureTask
1.1 介紹
Future 表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,并獲取計算的結果。計算完成后只能使用get方法來獲取結果,如有必要,計算完成前可以阻塞此方法。取消則由 cancel 方法來執行。還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。如果為了可取消性而使用 Future 但又不提供可用的結果,則可以聲明 Future 形式類型、并返回 null 作為底層任務的結果。
也就是說Future具有這樣的特性
- 異步執行,可用 get 方法獲取執行結果
- 如果計算還沒完成,get 方法是會阻塞的,如果完成了,是可以多次獲取并立即得到結果的
- 如果計算還沒完成,是可以取消計算的
- 可以查詢計算的執行狀態
2 FutureTask
FutureTask 為 Future 提供了基礎實現,如獲取任務執行結果(get)和取消任務(cancel)等。如果任務尚未完成,獲取任務執行結果時將會阻塞。一旦執行結束,任務就不能被重啟或取消(除非使用runAndReset執行計算)。
FutureTask 常用來封裝 Callable 和 Runnable,也可作為一個任務提交到線程池中執行。除了作為一個獨立的類,此類也提供創建自定義 task 類使用。FutureTask 的線程安全由CAS保證。
FutureTask 內部維護了一個由volatile修飾的int型變量—state,代表當前任務的運行狀態
- NEW:新建
- COMPLETING:完成
- NORMAL:正常運行
- EXCEPTIONAL:異常退出
- CANCELLED:任務取消
- INTERRUPTING:線程中斷中
- INTERRUPTED:線程已中斷
在這七種狀態中,有四種任務終止狀態:NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED。各種狀態的轉化如下:
數據結構及核心參數
- //內部持有的callable任務,運行完畢后置空
- private Callable<V> callable;
- //從get()中返回的結果或拋出的異常
- private Object outcome; // non-volatile, protected by state reads/writes
- //運行callable的線程,在 run 時進行 CAS 操作
- private volatile Thread runner;
- //使用Treiber棧保存等待線程
- private volatile WaitNode waiters;
FutureTask 繼承了Runnale和Future,本身也作為一個線程運行,可以提交給線程池執行。維護了一個內部類WaitNode,使用簡單的Treiber棧(無鎖并發棧)實現,用于存儲等待線程。FutureTask 只有一個自定義的同步器 Sync 的屬性,所有的方法都是委派給此同步器來實現。這也是JUC里使用AQS的通用模式。
源碼解析
FutureTask 的同步器 由于Future在任務完成后,可以多次自由獲取結果,因此,用于控制同步的AQS使用共享模式。
FutureTask 底層任務的執行狀態保存在AQS的狀態里。AQS是否允許線程獲取(是否阻塞)是取決于任務是否執行完成,而不是具體的狀態值。
- private final class Sync extends AbstractQueuedSynchronizer {
- // 定義表示任務執行狀態的常量。由于使用了位運算進行判斷,所以狀態值分別是2的冪。
- // 表示任務已經準備好了,可以執行
- private static final int READY = 0;
- // 表示任務正在執行中
- private static final int RUNNING = 1;
- // 表示任務已執行完成
- private static final int RAN = 2;
- // 表示任務已取消
- private static final int CANCELLED = 4;
- // 底層的表示任務的可執行對象
- private final Callable<V> callable;
- // 表示任務執行結果,用于get方法返回。
- private V result;
- // 表示任務執行中的異常,用于get方法調用時拋出。
- private Throwable exception;
- /*
- * 用于執行任務的線程。在 set/cancel 方法后置為空,表示結果可獲取。
- * 必須是 volatile的,用于確保完成后(result和exception)的可見性。
- * (如果runner不是volatile,則result和exception必須都是volatile的)
- */
- private volatile Thread runner;
- /**
- * 已完成或已取消 時成功獲取
- */
- protected int tryAcquireShared( int ignore) {
- return innerIsDone() ? 1 : -1;
- }
- /**
- * 在設置最終完成狀態后讓AQS總是通知,通過設置runner線程為空。
- * 這個方法并沒有更新AQS的state屬性,
- * 所以可見性是通過對volatile的runner的寫來保證的。
- */
- protected boolean tryReleaseShared( int ignore) {
- runner = null;
- return true;
- }
- // 執行任務的方法
- void innerRun() {
- // 用于確保任務不會重復執行
- if (!compareAndSetState(READY, RUNNING))
- return;
- // 由于Future一般是異步執行,所以runner一般是線程池里的線程。
- runner = Thread.currentThread();
- // 設置執行線程后再次檢查,在執行前檢查是否被異步取消
- // 由于前面的CAS已把狀態設置RUNNING,
- if (getState() == RUNNING) { // recheck after setting thread
- V result;
- //
- try {
- result = callable.call();
- } catch (Throwable ex) {
- // 捕獲任務執行過程中拋出的所有異常
- setException(ex);
- return;
- }
- set(result);
- } else {
- // 釋放等待的線程
- releaseShared(0); // cancel
- }
- }
- // 設置結果
- void innerSet(V v) {
- // 放在循環里進行是為了失敗后重試。
- for (;;) {
- // AQS初始化時,狀態值默認是 0,對應這里也就是 READY 狀態。
- int s = getState();
- // 已完成任務不能設置結果
- if (s == RAN)
- return;
- // 已取消 的任務不能設置結果
- if (s == CANCELLED) {
- // releaseShared 會設置runner為空,
- // 這是考慮到與其他的取消請求線程 競爭中斷 runner
- releaseShared(0);
- return;
- }
- // 先設置已完成,免得多次設置
- if (compareAndSetState(s, RAN)) {
- result = v;
- releaseShared(0); // 此方法會更新 runner,保證result的可見性
- done();
- return;
- }
- }
- }
- // 獲取異步計算的結果
- V innerGet() throws InterruptedException, ExecutionException {
- acquireSharedInterruptibly(0);// 獲取共享,如果沒有完成則會阻塞。
- // 檢查是否被取消
- if (getState() == CANCELLED)
- throw new CancellationException();
- // 異步計算過程中出現異常
- if (exception != null)
- throw new ExecutionException(exception);
- return result;
- }
- // 取消執行任務
- boolean innerCancel( boolean mayInterruptIfRunning) {
- for (;;) {
- int s = getState();
- // 已完成或已取消的任務不能再次取消
- if (ranOrCancelled(s))
- return false;
- // 任務處于 READY 或 RUNNING
- if (compareAndSetState(s, CANCELLED))
- break;
- }
- // 任務取消后,中斷執行線程
- if (mayInterruptIfRunning) {
- Thread r = runner;
- if (r != null)
- r.interrupt();
- }
- releaseShared(0); // 釋放等待的訪問結果的線程
- done();
- return true;
- }
- /**
- * 檢查任務是否處于完成或取消狀態
- */
- private boolean ranOrCancelled( int state) {
- return (state & (RAN | CANCELLED)) != 0;
- }
- // 其他方法省略
- }
從 innerCancel 方法可知,取消操作只是改變了任務對象的狀態并可能會中斷執行線程。如果任務的邏輯代碼沒有響應中斷,則會一直異步執行直到完成,只是最終的執行結果不會被通過get方法返回,計算資源的開銷仍然是存在的。
總的來說,Future 是線程間協調的一種工具。
AbstractExecutorService.submit(Callable task)
FutureTask 內部實現方法都很簡單,先從線程池的submit分析。submit方法默認實現在AbstractExecutorService,幾種實現源碼如下:
- public Future<?> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<Void> ftask = newTaskFor(task, null);
- execute(ftask);
- return ftask;
- }
- public <T> Future<T> submit(Runnable task, T result) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task, result);
- execute(ftask);
- return ftask;
- }
- public <T> Future<T> submit(Callable<T> task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task);
- execute(ftask);
- return ftask;
- }
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- return new FutureTask<T>(runnable, value);
- }
- public FutureTask(Runnable runnable, V result) {
- this.callable = Executors.callable(runnable, result);
- this.state = NEW; // ensure visibility of callable
- }
首先調用newTaskFor方法構造FutureTask,然后調用execute把任務放進線程池中,返回FutureTask
FutureTask.run()
- public void run() {
- //新建任務,CAS替換runner為當前線程
- if (state != NEW ||
- !UNSAFE.compareAndSwapObject(this, runnerOffset,
- null, Thread.currentThread()))
- return;
- try {
- Callable<V> c = callable;
- if (c != null && state == NEW) {
- V result;
- boolean ran;
- try {
- result = c.call();
- ran = true;
- } catch (Throwable ex) {
- result = null;
- ran = false;
- setException(ex);
- }
- if (ran)
- set(result);//設置執行結果
- }
- } finally {
- // runner must be non-null until state is settled to
- // prevent concurrent calls to run()
- runner = null;
- // state must be re-read after nulling runner to prevent
- // leaked interrupts
- int s = state;
- if (s >= INTERRUPTING)
- handlePossibleCancellationInterrupt(s);//處理中斷邏輯
- }
- }
運行任務,如果任務狀態為NEW狀態,則利用CAS修改為當前線程。執行完畢調用set(result)方法設置執行結果。 set(result)源碼如下
首先利用cas修改state狀態為
設置返回結果,然后使用 lazySet(UNSAFE.putOrderedInt)的方式設置state狀態為
結果設置完畢后,調用finishCompletion()喚醒等待線程
- private void finishCompletion() {
- for (WaitNode q; (q = waiters) != null;) {
- if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//移除等待線程
- for (;;) {//自旋遍歷等待線程
- Thread t = q.thread;
- if (t != null) {
- q.thread = null;
- LockSupport.unpark(t);//喚醒等待線程
- }
- WaitNode next = q.next;
- if (next == null)
- break;
- q.next = null; // unlink to help gc
- q = next;
- }
- break;
- }
- }
- //任務完成后調用函數,自定義擴展
- done();
- callable = null; // to reduce footprint
- }
回到run方法,如果在 run 期間被中斷,此時需要調用handlePossibleCancellationInterrupt處理中斷邏輯,確保任何中斷(例如cancel(true))只停留在當前run或runAndReset的任務中
- private void handlePossibleCancellationInterrupt(int s) {
- //在中斷者中斷線程之前可能會延遲,所以我們只需要讓出CPU時間片自旋等待
- if (s == INTERRUPTING)
- while (state == INTERRUPTING)
- Thread.yield(); // wait out pending interrupt
- }
FutureTask.runAndReset()
runAndReset是 FutureTask另外一個任務執行的方法,它不會返回執行結果,而且在任務執行完之后會重置stat的狀態為NEW,使任務可以多次執行。 runAndReset的典型應用是在 ScheduledThreadPoolExecutor 中,周期性的執行任務。
FutureTask.get()
FutureTask 通過get()獲取任務執行結果。如果任務處于未完成的狀態(state <= COMPLETING),就調用awaitDone等待任務完成。任務完成后,通過report獲取執行結果或拋出執行期間的異常。
awaitDone(boolean timed, long nanos)
- private int awaitDone(boolean timed, long nanos)
- throws InterruptedException {
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- WaitNode q = null;
- boolean queued = false;
- for (;;) {//自旋
- if (Thread.interrupted()) {//獲取并清除中斷狀態
- removeWaiter(q);//移除等待WaitNode
- throw new InterruptedException();
- }
- int s = state;
- if (s > COMPLETING) {
- if (q != null)
- q.thread = null;//置空等待節點的線程
- return s;
- }
- else if (s == COMPLETING) // cannot time out yet
- Thread.yield();
- else if (q == null)
- q = new WaitNode();
- else if (!queued)
- //CAS修改waiter
- queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
- q.next = waiters, q);
- else if (timed) {
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {
- removeWaiter(q);//超時,移除等待節點
- return state;
- }
- LockSupport.parkNanos(this, nanos);//阻塞當前線程
- }
- else
- LockSupport.park(this);//阻塞當前線程
- }
- }
awaitDone用于等待任務完成,或任務因為中斷或超時而終止。返回任務的完成狀態。
1.如果線程被中斷,首先清除中斷狀態,調用removeWaiter移除等待節點,然后拋InterruptedException。removeWaiter源碼如下:
- private void removeWaiter(WaitNode node) {
- if (node != null) {
- node.thread = null;//首先置空線程
- retry:
- for (;;) { // restart on removeWaiter race
- //依次遍歷查找
- for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
- s = q.next;
- if (q.thread != null)
- pred = q;
- else if (pred != null) {
- pred.next = s;
- if (pred.thread == null) // check for race
- continue retry;
- }
- else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s)) //cas替換
- continue retry;
- }
- break;
- }
- }
- }
2.如果當前為結束態(state>COMPLETING),則根據需要置空等待節點的線程,并返回 Future 狀態
3.如果當前為正在完成(COMPLETING),說明此時 Future 還不能做出超時動作,為任務讓出CPU執行時間片
4.如果state為NEW,先新建一個WaitNode,然后CAS修改當前waiters
5.如果等待超時,則調用removeWaiter移除等待節點,返回任務狀態;如果設置了超時時間但是尚未超時,則park阻塞當前線程
6.其他情況直接阻塞當前線程
FutureTask.cancel(boolean mayInterruptIfRunning)
- public boolean cancel(boolean mayInterruptIfRunning) {
- //如果當前Future狀態為NEW,根據參數修改Future狀態為INTERRUPTING或CANCELLED
- if (!(state == NEW &&
- UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
- mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
- return false;
- try { // in case call to interrupt throws exception
- if (mayInterruptIfRunning) {//可以在運行時中斷
- try {
- Thread t = runner;
- if (t != null)
- t.interrupt();
- } finally { // final state
- UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
- }
- }
- } finally {
- finishCompletion();//移除并喚醒所有等待線程
- }
- return true;
- }
說明:嘗試取消任務。如果任務已經完成或已經被取消,此操作會失敗。如果當前Future狀態為NEW,根據參數修改Future狀態為INTERRUPTING或CANCELLED。如果當前狀態不為NEW,則根據參數mayInterruptIfRunning決定是否在任務運行中也可以中斷。中斷操作完成后,調用finishCompletion移除并喚醒所有等待線程。
示例
小結
本章重點:FutureTask 結果返回機制,以及內部運行狀態的轉變