硬核詳解 FutureTask 設計與實現
最近看到一篇比較不錯的FutureTask實踐,于是對FutureTask源碼進行了研究,而本文將從實踐和源碼兩個角度分析FutureTask的設計與實現思路,希望對你有幫助。
FutureTask使用示例
我們的批量線程會進行嘗試創建一些任務執行,同時我們希望每個任務只有有一個線程去執行,其他線程如果拿到這個任務準備執行時發現這個任務已經在執行,則等待這個任務的返回結果。
如下圖,假設我們的第一個線程提交task-1至清單成功后,這個線程就會執行該任務,而線程2同樣想提交這個任務發現該任務已存在,則直接等待清單中記錄的這個任務的結果返回。而線程3則也是因為第一次提交任務,所以提交清單成功并執行。
我們完全可以通過FutureTask做到這一點,先來說說任務清單,它用于存儲正在執行的任務和任務名,為了保證多線程并發操作安全,筆者直接采用ConcurrentHashMap:
//保存執行的任務清單
private static ConcurrentHashMap<String, Future<String>> taskCache = new ConcurrentHashMap<>();
每一個線程執行任務則都是通過executionTask方法,該邏輯一旦檢查任務不存在則創建,然后通過樂觀鎖方式保證任務不存在時才能提交,完成這些操作后通過get等待結果返回。 需要注意的是,這里筆者為了保證邏輯可用做了一點小小的計數處理,每當一個任務通過run執行時,筆者會用原子類自增一下,以此判斷多線程執行該方法時有沒有重復執行任務的情況出現。
privatestatic AtomicInteger counter = new AtomicInteger(0);
public static String executionTask(String taskName) {
while (true){//執行到任務成功
if (!taskCache.containsKey(taskName)) {//如果任務不存在則創建任務
Callable<String> callable = () -> taskName;
FutureTask<String> futureTask = new FutureTask<>(callable);
//雙重鎖校驗避免任務重復提交
Future<String> future = taskCache.putIfAbsent(taskName, futureTask);
if (future == null) {//如果返回空則說明這個任務之前沒有提交過,當前線程直接執行
futureTask.run();
//累加執行的任務數
counter.incrementAndGet();
}
}
//任務非空或提交任務完成的線程在這里等待任務返回
try {
return taskCache.get(taskName).get();
} catch (Exception e) {
//如果保存則從清單中移除
taskCache.remove(taskName);
}
}
}
對應的測試代碼如下,筆者提交10w個線程執行10個任務,從計數器的輸出結果來看,執行了10個任務,沒有問題:
public static void main(String[] args) throws Exception {
//創建線程池
ExecutorService executorService = Executors.newFixedThreadPool(10_000);
for (int i = 0; i < 10_000; i++) {
//通過取模運算保證10w個線程只會創建 名為10以內的任務
int finalI = i % 10;
//提交任務
executorService.submit(() -> FutureTaskExample.executionTask(String.valueOf(finalI)));
}
//等待結束
executorService.shutdown();
while (!executorService.isTerminated()) {
}
//查看計數
System.out.println(counter.get());//10
}
FutureTask在閉鎖下的哲學
本質上FutureTask也可以是一種閉鎖,即在FutureTask對應線程未完成運算前,FutureTask這個閉鎖就像一個大門一樣不允許所有線程通過,只有FutureTask完成運算進入完成狀態后,其它線程才能通過。
例如我們希望通過FutureTask執行一些耗時的運算,此時就可以:
- 通過FutureTask提交任務
- 異步任務運算期間,執行一些其它任務
- 通過get阻塞等待FutureTask結果返回
- FutureTask任務返回線程通過,打印輸出結果
所以FutureTask也是一個高效的異步工具,我們可以將一些耗時的操作提前啟動,著手其它耗時操作等待完成后拿結果:
對應的我們也給出上述實現的代碼示例:
public class Task {
//休眠完成后,返回一個隨機數
privatefinal FutureTask<Integer> futureTask = new FutureTask<>(() -> {
ThreadUtil.sleep(1000);
return RandomUtil.randomInt();
});
privatefinal Thread thread = new Thread(futureTask);
//啟動任務執行
public void start() {
thread.start();
}
//阻塞獲取結果
public int get() throws ExecutionException, InterruptedException {
return futureTask.get();
}
}
使用實例如下,即通過FutureTask執行異步運算后,通過get執行閉鎖控制異步流程:
Task task = new Task();
long begin = System.currentTimeMillis();
task.start();
//futureTask異步運行期間,執行一些業務邏輯
System.out.println("do something...");
//阻塞等待futureTask完成,即利用get方法實現一個閉鎖的操作
int result = task.get();
long end = System.currentTimeMillis();
Console.log("result: {},cost:{}ms", result, end - begin);
輸出結果如下,可以看到整體來說利用了異步運算期間執行一些其它操作,同時還使用get保證整體流程順序正常:
do something...
result: -1158881871,cost:1009ms
FutureTask狀態機的扭轉
在正式進行源碼介紹之前,筆者先簡單介紹一下FutureTask執行狀態的扭轉,FutureTask在創建時是全新的任務,此時它的狀態就是NEW,一旦調用run之后就會開始運行,此時就會出現4個分支:
- 當任務正確執行完成之后,先將狀態設置為完成中COMPLETING,然后將執行結果存入outcome變量中,完成后再將狀態設置為正常結束,即NORMAL。
- 一旦任務執行出現異常,FutureTask則同樣將任務設置為完成中,將結果設置為null之后,調整狀態為執行出錯EXCEPTIONAL。
- 當任務在執行過程中,需要進行打斷操作時,FutureTask會將狀態設置為打斷中INTERRUPTING,一旦線程正常被打斷,任務就會被設置為終態INTERRUPTED。
- FutureTask同樣支持不打斷當前執行線程,這一點筆者會在后文中說明,這種情況則直接將線程設置為CANCELLED。
一旦狀態按照預期調整為終態后,FutureTask就會喚醒那些等待任務執行完成的線程:
這4種狀態扭轉,我們也可以通過閱讀FutureTask上關于狀態變量的注釋了解:
/*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
privatevolatileint state;
privatestaticfinalint NEW = 0;
privatestaticfinalint COMPLETING = 1;
privatestaticfinalint NORMAL = 2;
privatestaticfinalint EXCEPTIONAL = 3;
privatestaticfinalint CANCELLED = 4;
privatestaticfinalint INTERRUPTING = 5;
privatestaticfinalint INTERRUPTED = 6;
FutureTask如何執行
FutureTask在創建之初時的初始態為NEW,也就是整數變量0,一旦某個線程執行這個任務,JDK8版本會通過原子操作將記錄運行線程的地址(這個位置的偏移量用runnerOffset記錄)設置為當前線程,一旦操作成功則執行該task封裝的Callable任務,如果運行成功則將result設置為運行后的結果,并將ran標志為true,并將任務狀態設置為NORMAL:
反之如果運行失敗,則將ran設置為false,result設置為空,并將錯誤信息設置到stateOffset標志位上了,將任務運行狀態設置為終態EXCEPTIONAL,然后喚醒其他需要處理這個任務的線程:
對此我們給出FutureTask的底層實現,可以看到FutureTask只有狀態為NEW且通過CAS操作將runner設置為自己的線程才能執行任務,而后續的線程如果看到state不為new則只能獲取結果,而不能執行,這就FutureTask避免重復運行的核心設計所在。 進行樂觀鎖上鎖拿到執行權之后,就會基于CAS上鎖的線程進行任務調用,對應的結果扭轉就如上文所說,這里讀者可以參考筆者注釋自行閱讀:
public void run() {
//1. 為NEW 說明是第一次運行,則可以通過CAS操作獲取執行權
//2. 不為NEW,直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
//如果任務不為空且狀態為NEW則調用call運行
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//運行成功則記錄結果到result 并將ran 設置為true
result = c.call();
ran = true;
} catch (Throwable ex) {
//如果報錯則result設置為空,并將ran設置為false
result = null;
ran = false;
//并將任務狀態設置為錯誤
setException(ex);
}
//如果運行成功則將結果存到outcome中
if (ran)
set(result);
}
} finally {
//......
}
}
這里我們直接查看獲取結果成功后的狀態設置方法set方法,可以看到其內部先通過cas將status即狀態字段設置為COMPLETING,完成結果設置之后,再通過putOrderedInt將狀態設置為終態NORMAL,這么做的原因是為什么呢?
protected void set(V v) {
//先CAS設置為完成中
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//設置結果完成后,才能設置狀態為正常結束
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
多線程執行FutureTask情況下,大部分判斷都需要依賴于COMPLETING這個中間態(利用get和COMPLETING方法),所以這個狀態的可見性要求相對高一些,所以在進行結果設置之前,先通過CAS的方式進行更新status字段狀態,這種操作是需要將storeLoad屏障,雖然性能表現差一些,但可以保證可見性和有序性,所以先通過這個操作保證其他線程對于這個中態狀態可見保證并發操作一致性。
然后完成任務處理結果設置,此時在邏輯上我們可以認定這個任務是處理完成了,因為大部分的邏輯判斷都是依賴于COMPLETING,對于終態(NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED)的可見性要求不高,所以FutureTask的設計者直接采用putOrderedInt這種操作保證寫入不會被重排序,但不會立即刷到一致性內存行上,所以在性能表現上會出色一些。
上述對于正確處理結果的設置,可以在set這個方法的源碼上得以印證,讀者可以結合上文筆者所說和注釋自行了解這段邏輯:
protected void set(V v) {
//通過cas操作volatile變量status完成中態設置,又保證的多核心可見性
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//設置處理結果到outcome上,后續其他線程get都是從這個outcome變量上獲取
outcome = v;
//因為終態可見性要求不高,所以通過putOrderedInt設置終態,保證寫入有序性
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//喚醒其他線程處理當前任務
finishCompletion();
}
}
多線程如何獲取FutureTask執行結果
其他線程需要從get方法獲取結果時,其內部本質就是調用awaitDone等待完成,假設我們用putOrderedInt寫入,且狀態對于當前線程不可見,那么這個線程要做的也僅僅是yield讓出處理器的使用權,相比之下volatile寫這種需要增加內存屏障寫入的開銷,這種內存消耗無論從概率還是消耗上,都是劃得來的。
最后完成上述操作,通過finishCompletion通知其他的等待線程可以開始處理FutureTask了。
對此我們也給出get的源碼和注釋,讀者可結合上文感知一下邏輯:
public V get() throws InterruptedException, ExecutionException {
int s = state;
//狀態在COMPLETING及其之前調用awaitDone等待完成
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
finallong deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//......
//如果狀態處于中態完成中,則讓出線程對于處理器的使用權
elseif (s == COMPLETING) // cannot time out yet
Thread.yield();
//......
}
我們給出狀態設置為終態之后,finishCompletion的邏輯,比較簡單,可以看到它僅是獲取當前等待節點的后一個線程的WaitNode ,通過unpark將其喚醒,然后獲取其后繼節點繼續進行喚醒操作:
private void finishCompletion() {
// assert state > COMPLETING;
//獲取后繼節點
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
//將該等待節點直接喚醒
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
//指向已喚醒節點的后繼節點
WaitNode next = q.next;
//若為空直接退出
if (next == null)
break;
//將后繼指針設置為空,輔助gc,并讓q指向后續節點,繼續進行喚醒操作
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//......
}
FutureTask的異常處理
有了上文設置成功的邏輯解說,相信讀者對于setException的邏輯處理也就比較熟悉了,這里筆者就不再展開,讀者可自行查看代碼和注釋:
protected void setException(Throwable t) {
//原子操作設置為完成中,保證可見性
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//結果設置為null
outcome = t;
//通過高性能有序寫putOrderedInt設置終態為錯誤態
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//喚醒其他線程
finishCompletion();
}
}
FutureTask如何取消
任務取消操作有兩種情況,如果我們傳參為true,則會通過原子操作將狀態設置為打斷中,再嘗試打斷正在執行的線程,然后將狀態設置為已打斷這個終態INTERRUPTED,再喚醒其他線程。
若傳參設置為false,則直接原子操作設置為已取消,然后直接喚醒其他線程。
public boolean cancel(boolean mayInterruptIfRunning) {
//如果狀態不為new則進行狀態原子設置操作
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
returnfalse;
try {
//如果入參為true則嘗試打斷線程
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
//完成打斷設置狀態為INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//喚醒其他線程
finishCompletion();
}
returntrue;
}
FutureTask如何避免任務重復執行
我們假設當前任務為初始化NEW,這意味所有任務都可以嘗試進行上樂觀鎖獲取執行勸,如下所示只有設置成功的線程才可以進入后續的執行,而其他線程則直接返回:
public void run() {
//只有狀態為NEW的情況下,當前執行的線程才會通過CAS獲取樂觀鎖,之后獲取樂觀鎖成功的才能執行任務,其他的線程會直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
//......
//執行任務邏輯
}
JDK8版本FutureTask的變化
這一點,筆者參閱了1.6版本的FutureTask實現,其內部是通過AQS來維護需要執行任務的線程及其狀態,而JDK8版本則專門為FutureTask創建了一個state字段,多線程之間通過CAS操作進行維護。
我們先來說明一下早期版本的設計再說明原因,在較早的版本FutureTask內部線程競爭關系和任務狀態都采用AQS進行維護,假設當前任務被取消,則執行這個操作線程會通過原子操作將AQS隊列的state字段更新為CANCELLED。
同理執行任務時,也是先通過AQS的原子操作將狀態設置為RUNNING,執行完成之后將操作結果原子修改為RAN,并將結果記錄到FutureTask的reuslt中:
對此我們給出老版本的FutureTask的run方法,邏輯如上文所說,筆者這里就不多做贅述:
public void run() {
sync.innerRun();
}
void innerRun() {
//原子操作將狀態設置為RUNNING
if (!compareAndSetState(0, RUNNING))
return;
try {
//獲取當前線程,檢查當前狀態還是RUNNING則用innerSet當前執行執行callable,然后將結果設置到result中,并將狀態修改為RAN
runner = Thread.currentThread();
if (getState() == RUNNING) // recheck after setting thread
innerSet(callable.call());
else
releaseShared(0); // cancel
} catch (Throwable ex) {
//......
}
}
我們來看看cancel方法,可以看到只要不是RAN這種終態,就可以嘗試打斷線程:
public boolean cancel(boolean mayInterruptIfRunning) {
return sync.innerCancel(mayInterruptIfRunning);
}
boolean innerCancel(boolean mayInterruptIfRunning) {
for (; ; ) {
//獲取狀
int s = getState();
//如果是RAN或者canceled則直接返回
if (ranOrCancelled(s))
returnfalse;
//將狀態設置為CANCELLED
if (compareAndSetState(s, CANCELLED))
break;
}
// 按照mayInterruptIfRunning的布爾值決定是否打斷線程
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
r.interrupt();
}
releaseShared(0);
done();
returntrue;
}
有意思的來了,我們試想這樣一個場景:
- 假設我們執行上述run方法的任務處于RUNNING狀態,此時當前線程1就可以調用interrupt打斷這個線程
- 此時線程2看到當前任務處于Running(源碼說明不是ran或者canceled即可打斷)直接將其打斷
- 又假設我們這個線程跑去執行別的task任務
所以線程1就可能在執行別的任務期間被打斷,進而出現幻覺死:
于是就有了JDK7之后版本,FutureTask專門使用一個volatile的state維護任務的狀態,并且在打斷前設置一個中態INTERRUPTING 即打斷中,執行任務的線程1在運行完成將結果存入outcome之后,看到打斷中這個中態就會循環調用yield讓出執行權,直到執行cancel操作的線程完成,由此保證了打斷操作永遠被限制在當前FutureTask生命周期以內。
對應我們給出cancel操作的源碼,可以看到它是先設置為打斷中INTERRUPTING 然后在進行打斷再設置打斷完成INTERRUPTED的終態:
public boolean cancel(boolean mayInterruptIfRunning) {
//原子操作設置狀態為打斷中
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
returnfalse;
try {
//嘗試打斷線程,完成后設置為INTERRUPTED
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
returntrue;
}
假設上一步的線程的打斷操作還未完成,這里可以直接理解為執行interrupt打斷之前的代碼段,而執行我們的run的線程已經運行完成并將結果設置到outcome時,如下所示會執行finally 語句塊的handlePossibleCancellationInterrupt,它在看到打斷中的操作后會循環調用Thread.yield()讓當前線程讓出CPU執行權,直到其他線程的cancel操作完成:
public void run() {
//.....
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//執行邏輯運算
result = c.call();
ran = true;
} catch (Throwable ex) {
//......
}
//將結果設置到outcome中
if (ran)
set(result);
}
} finally {
//......
//讀取到打斷中的狀態,調用handlePossibleCancellationInterrupt讓線程yield出去,保證打斷操作限定在當前線程內
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
private void handlePossibleCancellationInterrupt(int s) {
//循環yield直到cacnel操作完成
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
關于這個問題我們也可以參考源碼上的注釋:
/*
* Revision notes: This differs from previous versions of this
* class that relied on AbstractQueuedSynchronizer, mainly to
* avoid surprising users about retaining interrupt status during
* cancellation races. Sync control in the current design relies
* on a "state" field updated via CAS to track completion, along
* with a simple Treiber stack to hold waiting threads.
*
* Style note: As usual, we bypass overhead of using
* AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
*/