成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

阿里架構師教你JUC-Future與FutureTask原理詳解

開發 前端
Future 表示一個任務的生命周期,是一個可取消的異步運算。提供了相應的方法來判斷任務狀態(完成或取消),以及獲取任務的結果和取消任務等。適合具有可取消性和執行時間較長的異步任務。

 

[[350087]]

1 Future

 

Future 表示一個任務的生命周期,是一個可取消的異步運算。提供了相應的方法來判斷任務狀態(完成或取消),以及獲取任務的結果和取消任務等。適合具有可取消性和執行時間較長的異步任務。

并發包中許多異步任務類都繼承自Future,其中最典型的就是 FutureTask

1.1 介紹

Future 表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,并獲取計算的結果。計算完成后只能使用get方法來獲取結果,如有必要,計算完成前可以阻塞此方法。取消則由 cancel 方法來執行。還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。如果為了可取消性而使用 Future 但又不提供可用的結果,則可以聲明 Future 形式類型、并返回 null 作為底層任務的結果。

也就是說Future具有這樣的特性

  • 異步執行,可用 get 方法獲取執行結果
  • 如果計算還沒完成,get 方法是會阻塞的,如果完成了,是可以多次獲取并立即得到結果的
  • 如果計算還沒完成,是可以取消計算的
  • 可以查詢計算的執行狀態

 

2 FutureTask

 

FutureTask 為 Future 提供了基礎實現,如獲取任務執行結果(get)和取消任務(cancel)等。如果任務尚未完成,獲取任務執行結果時將會阻塞。一旦執行結束,任務就不能被重啟或取消(除非使用runAndReset執行計算)。

FutureTask 常用來封裝 Callable 和 Runnable,也可作為一個任務提交到線程池中執行。除了作為一個獨立的類,此類也提供創建自定義 task 類使用。FutureTask 的線程安全由CAS保證。

 

FutureTask 內部維護了一個由volatile修飾的int型變量—state,代表當前任務的運行狀態

  • NEW:新建
  • COMPLETING:完成
  • NORMAL:正常運行
  • EXCEPTIONAL:異常退出
  • CANCELLED:任務取消
  • INTERRUPTING:線程中斷中
  • INTERRUPTED:線程已中斷

 

在這七種狀態中,有四種任務終止狀態:NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED。各種狀態的轉化如下:

數據結構及核心參數

  1. //內部持有的callable任務,運行完畢后置空 
  2. private Callable<V> callable; 
  3.  
  4. //從get()中返回的結果或拋出的異常 
  5. private Object outcome; // non-volatile, protected by state reads/writes 
  6.  
  7. //運行callable的線程,在 run 時進行 CAS 操作 
  8. private volatile Thread runner; 
  9.  
  10. //使用Treiber棧保存等待線程 
  11. private volatile WaitNode waiters; 

FutureTask 繼承了Runnale和Future,本身也作為一個線程運行,可以提交給線程池執行。維護了一個內部類WaitNode,使用簡單的Treiber棧(無鎖并發棧)實現,用于存儲等待線程。FutureTask 只有一個自定義的同步器 Sync 的屬性,所有的方法都是委派給此同步器來實現。這也是JUC里使用AQS的通用模式。

源碼解析

FutureTask 的同步器 由于Future在任務完成后,可以多次自由獲取結果,因此,用于控制同步的AQS使用共享模式。

 

FutureTask 底層任務的執行狀態保存在AQS的狀態里。AQS是否允許線程獲取(是否阻塞)是取決于任務是否執行完成,而不是具體的狀態值。

  1. private final class Sync extends AbstractQueuedSynchronizer { 
  2.     // 定義表示任務執行狀態的常量。由于使用了位運算進行判斷,所以狀態值分別是2的冪。 
  3.  
  4.     // 表示任務已經準備好了,可以執行 
  5.     private static final int READY     = 0; 
  6.  
  7.     // 表示任務正在執行中 
  8.     private static final int RUNNING   = 1; 
  9.  
  10.     // 表示任務已執行完成 
  11.     private static final int RAN       = 2; 
  12.  
  13.     // 表示任務已取消 
  14.     private static final int CANCELLED = 4; 
  15.  
  16.  
  17.     // 底層的表示任務的可執行對象 
  18.     private final Callable<V> callable; 
  19.  
  20.     // 表示任務執行結果,用于get方法返回。 
  21.     private V result; 
  22.  
  23.     // 表示任務執行中的異常,用于get方法調用時拋出。 
  24.     private Throwable exception; 
  25.  
  26.      /* 
  27.      * 用于執行任務的線程。在 set/cancel 方法后置為空,表示結果可獲取。 
  28.      * 必須是 volatile的,用于確保完成后(result和exception)的可見性。 
  29.      * (如果runner不是volatile,則result和exception必須都是volatile的) 
  30.      */ 
  31.     private volatile Thread runner; 
  32.  
  33.  
  34.      /** 
  35.      * 已完成或已取消 時成功獲取 
  36.      */ 
  37.     protected int tryAcquireShared( int ignore) { 
  38.         return innerIsDone() ? 1 : -1; 
  39.     } 
  40.  
  41.     /** 
  42.      * 在設置最終完成狀態后讓AQS總是通知,通過設置runner線程為空。 
  43.      * 這個方法并沒有更新AQS的state屬性, 
  44.      * 所以可見性是通過對volatile的runner的寫來保證的。 
  45.      */ 
  46.     protected boolean tryReleaseShared( int ignore) { 
  47.         runner = null
  48.         return true
  49.     } 
  50.  
  51.  
  52.      // 執行任務的方法 
  53.     void innerRun() { 
  54.         // 用于確保任務不會重復執行 
  55.         if (!compareAndSetState(READY, RUNNING)) 
  56.             return
  57.  
  58.         // 由于Future一般是異步執行,所以runner一般是線程池里的線程。 
  59.         runner = Thread.currentThread(); 
  60.  
  61.         // 設置執行線程后再次檢查,在執行前檢查是否被異步取消 
  62.         // 由于前面的CAS已把狀態設置RUNNING, 
  63.         if (getState() == RUNNING) { // recheck after setting thread 
  64.             V result; 
  65.             // 
  66.             try { 
  67.                 result = callable.call(); 
  68.             } catch (Throwable ex) { 
  69.                 // 捕獲任務執行過程中拋出的所有異常 
  70.                 setException(ex); 
  71.                 return
  72.             } 
  73.             set(result); 
  74.         } else { 
  75.       // 釋放等待的線程 
  76.             releaseShared(0); // cancel 
  77.         } 
  78.     } 
  79.  
  80.     // 設置結果 
  81.     void innerSet(V v) { 
  82.         // 放在循環里進行是為了失敗后重試。 
  83.         for (;;) { 
  84.             // AQS初始化時,狀態值默認是 0,對應這里也就是 READY 狀態。 
  85.             int s = getState(); 
  86.  
  87.             // 已完成任務不能設置結果 
  88.             if (s == RAN) 
  89.                 return
  90.  
  91.             // 已取消 的任務不能設置結果 
  92.             if (s == CANCELLED) { 
  93.                 // releaseShared 會設置runner為空, 
  94.                 // 這是考慮到與其他的取消請求線程 競爭中斷 runner 
  95.                 releaseShared(0); 
  96.                 return
  97.             } 
  98.  
  99.             // 先設置已完成,免得多次設置 
  100.             if (compareAndSetState(s, RAN)) { 
  101.                 result = v; 
  102.                 releaseShared(0); // 此方法會更新 runner,保證result的可見性 
  103.                 done(); 
  104.                 return
  105.             } 
  106.         } 
  107.     } 
  108.  
  109.     // 獲取異步計算的結果 
  110.     V innerGet() throws InterruptedException, ExecutionException { 
  111.         acquireSharedInterruptibly(0);// 獲取共享,如果沒有完成則會阻塞。 
  112.  
  113.         // 檢查是否被取消 
  114.         if (getState() == CANCELLED) 
  115.             throw new CancellationException(); 
  116.  
  117.         // 異步計算過程中出現異常 
  118.         if (exception != null
  119.             throw new ExecutionException(exception); 
  120.  
  121.         return result; 
  122.     } 
  123.  
  124.     // 取消執行任務 
  125.     boolean innerCancel( boolean mayInterruptIfRunning) { 
  126.         for (;;) { 
  127.             int s = getState(); 
  128.  
  129.             // 已完成或已取消的任務不能再次取消 
  130.             if (ranOrCancelled(s)) 
  131.                 return false
  132.  
  133.             // 任務處于 READY 或 RUNNING 
  134.             if (compareAndSetState(s, CANCELLED)) 
  135.                 break; 
  136.         } 
  137.         // 任務取消后,中斷執行線程 
  138.         if (mayInterruptIfRunning) { 
  139.             Thread r = runner; 
  140.             if (r != null
  141.                 r.interrupt(); 
  142.         } 
  143.         releaseShared(0); // 釋放等待的訪問結果的線程 
  144.         done(); 
  145.         return true
  146.     } 
  147.  
  148.     /** 
  149.      * 檢查任務是否處于完成或取消狀態 
  150.      */ 
  151.     private boolean ranOrCancelled( int state) { 
  152.         return (state & (RAN | CANCELLED)) != 0; 
  153.     } 
  154.  
  155.      // 其他方法省略 

從 innerCancel 方法可知,取消操作只是改變了任務對象的狀態并可能會中斷執行線程。如果任務的邏輯代碼沒有響應中斷,則會一直異步執行直到完成,只是最終的執行結果不會被通過get方法返回,計算資源的開銷仍然是存在的。

總的來說,Future 是線程間協調的一種工具。

AbstractExecutorService.submit(Callable task)

 

FutureTask 內部實現方法都很簡單,先從線程池的submit分析。submit方法默認實現在AbstractExecutorService,幾種實現源碼如下:

  1. public Future<?> submit(Runnable task) { 
  2.     if (task == null) throw new NullPointerException(); 
  3.     RunnableFuture<Void> ftask = newTaskFor(task, null); 
  4.     execute(ftask); 
  5.     return ftask; 
  6. public <T> Future<T> submit(Runnable task, T result) { 
  7.     if (task == null) throw new NullPointerException(); 
  8.     RunnableFuture<T> ftask = newTaskFor(task, result); 
  9.     execute(ftask); 
  10.     return ftask; 
  11. public <T> Future<T> submit(Callable<T> task) { 
  12.     if (task == null) throw new NullPointerException(); 
  13.     RunnableFuture<T> ftask = newTaskFor(task); 
  14.     execute(ftask); 
  15.     return ftask; 
  16. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 
  17.     return new FutureTask<T>(runnable, value); 
  18. public FutureTask(Runnable runnable, V result) { 
  19.     this.callable = Executors.callable(runnable, result); 
  20.     this.state = NEW;       // ensure visibility of callable 

首先調用newTaskFor方法構造FutureTask,然后調用execute把任務放進線程池中,返回FutureTask

FutureTask.run()

  1. public void run() { 
  2.     //新建任務,CAS替換runner為當前線程 
  3.     if (state != NEW || 
  4.         !UNSAFE.compareAndSwapObject(this, runnerOffset, 
  5.                                      null, Thread.currentThread())) 
  6.         return
  7.     try { 
  8.         Callable<V> c = callable; 
  9.         if (c != null && state == NEW) { 
  10.             V result; 
  11.             boolean ran; 
  12.             try { 
  13.                 result = c.call(); 
  14.                 ran = true
  15.             } catch (Throwable ex) { 
  16.                 result = null
  17.                 ran = false
  18.                 setException(ex); 
  19.             } 
  20.             if (ran) 
  21.                 set(result);//設置執行結果 
  22.         } 
  23.     } finally { 
  24.         // runner must be non-null until state is settled to 
  25.         // prevent concurrent calls to run() 
  26.         runner = null
  27.         // state must be re-read after nulling runner to prevent 
  28.         // leaked interrupts 
  29.         int s = state; 
  30.         if (s >= INTERRUPTING) 
  31.             handlePossibleCancellationInterrupt(s);//處理中斷邏輯 
  32.     } 

運行任務,如果任務狀態為NEW狀態,則利用CAS修改為當前線程。執行完畢調用set(result)方法設置執行結果。 set(result)源碼如下

首先利用cas修改state狀態為

設置返回結果,然后使用 lazySet(UNSAFE.putOrderedInt)的方式設置state狀態為

結果設置完畢后,調用finishCompletion()喚醒等待線程

  1. private void finishCompletion() { 
  2.     for (WaitNode q; (q = waiters) != null;) { 
  3.         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//移除等待線程 
  4.             for (;;) {//自旋遍歷等待線程 
  5.                 Thread t = q.thread; 
  6.                 if (t != null) { 
  7.                     q.thread = null
  8.                     LockSupport.unpark(t);//喚醒等待線程 
  9.                 } 
  10.                 WaitNode next = q.next
  11.                 if (next == null
  12.                     break; 
  13.                 q.next = null; // unlink to help gc 
  14.                 q = next
  15.             } 
  16.             break; 
  17.         } 
  18.     } 
  19.     //任務完成后調用函數,自定義擴展 
  20.     done(); 
  21.     callable = null;        // to reduce footprint 

回到run方法,如果在 run 期間被中斷,此時需要調用handlePossibleCancellationInterrupt處理中斷邏輯,確保任何中斷(例如cancel(true))只停留在當前run或runAndReset的任務中

  1. private void handlePossibleCancellationInterrupt(int s) { 
  2.     //在中斷者中斷線程之前可能會延遲,所以我們只需要讓出CPU時間片自旋等待 
  3.     if (s == INTERRUPTING) 
  4.         while (state == INTERRUPTING) 
  5.             Thread.yield(); // wait out pending interrupt 

FutureTask.runAndReset()

runAndReset是 FutureTask另外一個任務執行的方法,它不會返回執行結果,而且在任務執行完之后會重置stat的狀態為NEW,使任務可以多次執行。 runAndReset的典型應用是在 ScheduledThreadPoolExecutor 中,周期性的執行任務。

 

FutureTask.get()

FutureTask 通過get()獲取任務執行結果。如果任務處于未完成的狀態(state <= COMPLETING),就調用awaitDone等待任務完成。任務完成后,通過report獲取執行結果或拋出執行期間的異常。

awaitDone(boolean timed, long nanos)

  1. private int awaitDone(boolean timed, long nanos) 
  2.     throws InterruptedException { 
  3.     final long deadline = timed ? System.nanoTime() + nanos : 0L; 
  4.     WaitNode q = null
  5.     boolean queued = false
  6.     for (;;) {//自旋 
  7.         if (Thread.interrupted()) {//獲取并清除中斷狀態 
  8.             removeWaiter(q);//移除等待WaitNode 
  9.             throw new InterruptedException(); 
  10.         } 
  11.  
  12.         int s = state; 
  13.         if (s > COMPLETING) { 
  14.             if (q != null
  15.                 q.thread = null;//置空等待節點的線程 
  16.             return s; 
  17.         } 
  18.         else if (s == COMPLETING) // cannot time out yet 
  19.             Thread.yield(); 
  20.         else if (q == null
  21.             q = new WaitNode(); 
  22.         else if (!queued) 
  23.             //CAS修改waiter 
  24.             queued = UNSAFE.compareAndSwapObject(this, waitersOffset, 
  25.                                                  q.next = waiters, q); 
  26.         else if (timed) { 
  27.             nanos = deadline - System.nanoTime(); 
  28.             if (nanos <= 0L) { 
  29.                 removeWaiter(q);//超時,移除等待節點 
  30.                 return state; 
  31.             } 
  32.             LockSupport.parkNanos(this, nanos);//阻塞當前線程 
  33.         } 
  34.         else 
  35.             LockSupport.park(this);//阻塞當前線程 
  36.     } 

awaitDone用于等待任務完成,或任務因為中斷或超時而終止。返回任務的完成狀態。

1.如果線程被中斷,首先清除中斷狀態,調用removeWaiter移除等待節點,然后拋InterruptedException。removeWaiter源碼如下:

  1. private void removeWaiter(WaitNode node) { 
  2.     if (node != null) { 
  3.         node.thread = null;//首先置空線程 
  4.         retry: 
  5.         for (;;) {          // restart on removeWaiter race 
  6.             //依次遍歷查找 
  7.             for (WaitNode pred = null, q = waiters, s; q != null; q = s) { 
  8.                 s = q.next
  9.                 if (q.thread != null
  10.                     pred = q; 
  11.                 else if (pred != null) { 
  12.                     pred.next = s; 
  13.                     if (pred.thread == null) // check for race 
  14.                         continue retry; 
  15.                 } 
  16.                 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s)) //cas替換 
  17.                     continue retry; 
  18.             } 
  19.             break; 
  20.         } 
  21.     } 

2.如果當前為結束態(state>COMPLETING),則根據需要置空等待節點的線程,并返回 Future 狀態

3.如果當前為正在完成(COMPLETING),說明此時 Future 還不能做出超時動作,為任務讓出CPU執行時間片

4.如果state為NEW,先新建一個WaitNode,然后CAS修改當前waiters

5.如果等待超時,則調用removeWaiter移除等待節點,返回任務狀態;如果設置了超時時間但是尚未超時,則park阻塞當前線程

6.其他情況直接阻塞當前線程

 

FutureTask.cancel(boolean mayInterruptIfRunning)

  1. public boolean cancel(boolean mayInterruptIfRunning) { 
  2.     //如果當前Future狀態為NEW,根據參數修改Future狀態為INTERRUPTING或CANCELLED 
  3.     if (!(state == NEW && 
  4.           UNSAFE.compareAndSwapInt(this, stateOffset, NEW, 
  5.               mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 
  6.         return false
  7.     try {    // in case call to interrupt throws exception 
  8.         if (mayInterruptIfRunning) {//可以在運行時中斷 
  9.             try { 
  10.                 Thread t = runner; 
  11.                 if (t != null
  12.                     t.interrupt(); 
  13.             } finally { // final state 
  14.                 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); 
  15.             } 
  16.         } 
  17.     } finally { 
  18.         finishCompletion();//移除并喚醒所有等待線程 
  19.     } 
  20.     return true

說明:嘗試取消任務。如果任務已經完成或已經被取消,此操作會失敗。如果當前Future狀態為NEW,根據參數修改Future狀態為INTERRUPTING或CANCELLED。如果當前狀態不為NEW,則根據參數mayInterruptIfRunning決定是否在任務運行中也可以中斷。中斷操作完成后,調用finishCompletion移除并喚醒所有等待線程。

 

示例

小結
本章重點:FutureTask 結果返回機制,以及內部運行狀態的轉變

 

 

責任編輯:姜華 來源: JavaEdge
相關推薦

2020-10-26 09:02:45

如何校驗參數

2019-02-22 10:00:45

Java開發代碼

2022-06-02 11:12:10

CallableFuture

2019-10-24 11:03:56

HadoopGoogle硬件

2019-10-24 15:15:19

Hadoop框架數據

2020-01-16 15:35:00

高并發架構服務器

2025-06-23 10:13:00

FutureTask線程開發

2021-10-25 09:41:04

架構運維技術

2020-01-14 14:37:29

JVMJava體系

2020-10-26 11:41:47

kill代碼

2020-06-28 14:15:52

前端架構師互聯網

2019-07-22 22:22:02

架構運維技術

2009-02-26 16:32:58

SaaS開發SaaS應用Open API

2020-12-07 09:40:19

Future&Futu編程Java

2019-08-22 10:54:05

分布式系統架構

2021-02-01 07:40:55

架構師阿里技專家

2021-06-07 09:35:11

架構運維技術

2020-06-28 08:34:07

架構師阿里軟件

2021-08-20 07:53:07

Android動態換膚

2021-10-09 09:52:49

MYSQL開發數據庫
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产91网址| 北条麻妃一区二区三区在线观看 | 日韩免费在线视频 | 亚洲精品久久久久久久久久久 | 欧美精品第三页 | 久久久久久久久久影视 | 美女一级黄 | 亚洲欧美日韩精品 | 欧美亚洲日本 | 亚洲成人观看 | 国产成人精品高清久久 | 久久com| 中文字幕在线观看av | 台湾a级理论片在线观看 | 亚洲视频一区在线播放 | 欧美激情视频一区二区三区在线播放 | 日本精品免费在线观看 | 久久精品亚洲 | 欧美天堂| 欧美国产在线一区 | 国产一区二区三区免费 | 99久久婷婷国产综合精品电影 | 国产精品视频播放 | 国产精品无码专区在线观看 | 免费国产视频 | 污免费网站| 91精品国产91综合久久蜜臀 | 伊人在线 | 羞羞视频在线观看网站 | 91大神在线看 | 午夜码电影| 99精品久久久 | 又爽又黄axxx片免费观看 | 激情网五月天 | 99综合在线 | 国产在线拍偷自揄拍视频 | 亚洲欧美精品在线 | 91精品在线看 | 国产欧美视频一区二区三区 | 亚洲国产精选 | 91在线观看网址 |