CompletableFuture深度解析
本文將深入解析 CompletableFuture,希望對各位讀者能有所幫助。
CompletableFuture 適用于以下場景
- 并發執行多個異步任務,等待它們全部完成或獲取其中任意一個的結果。
- 對已有的異步任務進行進一步的轉換、組合和操作。
- 異步任務之間存在依賴關系,需要按照一定的順序進行串行執行。
- 需要對異步任務的結果進行異常處理、超時控制或取消操作。
如何使用
下面是一個演示 CompletableFuture 如何使用的代碼示例:
public class CompletableFutureExample {
public static void main(String[] args) {
// 創建CompletableFuture對象,并定義異步任務
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 異步任務的邏輯代碼
// 在這里執行耗時操作或其他需要異步執行的任務
try {
TimeUnit.SECONDS.sleep(2); // 模擬耗時操作
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, ";
});
// 添加任務完成后的回調方法
CompletableFuture<String> resultFuture = future.thenApplyAsync(result -> {
// 任務完成后的處理邏輯
// result為上一步任務的結果
return result + "World!";
});
// 組合多個CompletableFuture對象
CompletableFuture<String> combinedFuture = future.thenCombine(resultFuture, (result1, result2) -> {
// 對多個CompletableFuture的結果進行組合處理
return result1 + result2 + " Welcome to the CompletableFuture world!";
});
// 異常處理
CompletableFuture<String> exceptionHandledFuture = combinedFuture.exceptionally(ex -> {
// 異常處理邏輯
System.out.println("任務執行出現異常:" + ex.getMessage());
return "Fallback Result";
});
// 等待并獲取任務的結果
try {
String result = exceptionHandledFuture.get();
System.out.println("任務的最終結果為:" + result);
} catch (InterruptedException | ExecutionException e) {
// 處理異常情況
e.printStackTrace();
}
}
}
結果輸出:
任務的最終結果為:Hello, Hello, World! Welcome to the CompletableFuture world!
首先,我們創建了一個CompletableFuture對象future。在future中,我們使用supplyAsync方法定義了一個異步任務,其中 lambda表達式 中的代碼會在另一個線程中執行。在這個例子中,我們模擬了一個耗時操作,通過TimeUnit.SECONDS.sleep(2)暫停了2秒鐘。
然后,我們添加了一個回調方法resultFuture。在這個回調方法中,將前一個異步任務的結果作為參數進行處理,并返回處理后的新結果。在這個例子中,我們將前一個任務的結果與字符串 "World!" 連接起來,形成新的結果。
接下來,我們使用thenCombine方法組合了兩個CompletableFuture對象:future和resultFuture。在這個組合任務中,我們將兩個任務的結果進行組合處理,返回最終的結果。在這個例子中,我們將前兩個任務的結果與字符串 " Welcome to the CompletableFuture world!" 連接起來。
此外,我們還處理了異常情況。通過exceptionally方法,我們定義了一個異常處理回調方法。如果在任務執行過程中發生了異常,我們可以在這里對異常進行處理,并返回一個默認值作為結果。
最后,我們使用get方法等待并獲取最終的任務結果。需要注意的是,get方法可能會阻塞當前線程,直到任務完成并返回結果。在這個例子中,我們使用try-catch塊捕獲可能的異常情況,并打印出最終的任務結果。
這個例子只是部分展示了CompletableFuture的功能,實際上它比你想象的還要強大!
源碼解析
CompletableFuture 的源碼非常龐大和復雜,涉及到并發、線程池、同步機制等多方面的知識。在這里,我們只重點介紹 CompletableFuture 的核心實現原理。
基本結構
圖片
CompletableFuture 的作者是大名鼎鼎的 Doug Lea。CompletableFuture 類是實現了 Future 和 CompletionStage 接口的一個關鍵類。它可以表示異步計算的結果,并提供了一系列方法來操作和處理這些結果。
CompletableFuture 內部使用了一個屬性result來保存計算結果,以及若干個屬性waiters來保存等待結果的任務。當計算完成后,CompletableFuture將會通知所有等待結果的任務,并將結果傳遞給它們。
為了實現鏈式操作,CompletableFuture還定義了內部類:Completion, UniCompletion, 和 BiCompletion。
Completion, UniCompletion, 和 BiCompletion 是 CompletableFuture 內部用于處理異步任務完成的輔助類。
- Completion 是一個通用的輔助類,它包含了任務完成后的回調方法,以及處理異常的方法。
- UniCompletion 是 Completion 的子類,是一元依賴的基類,用于處理單個任務的完成情況,并提供了更多的方法來處理結果和異常。
- BiCompletion 是 UniCompletion 的子類,是二元依賴的基類,同時也是多元依賴的基類,用于處理兩個任務的完成情況,并提供了更多的方法來組合和處理這兩個任務的結果和異常。
這些輔助類在 CompletableFuture 的內部被使用,以實現異步任務的執行、結果的處理和組合等操作。它們提供了一種靈活的方式來處理異步任務的完成情況,并通過回調方法或其他一些方法來處理任務的結果和異常。
內部原理
圖片
CompletableFuture中包含兩個字段:result 和 stack。result 用于存儲當前CF的結果,stack (Completion)表示當前CF完成后需要觸發的依賴動作(Dependency Actions),去觸發依賴它的CF的計算,依賴動作可以有多個(表示有多個依賴它的CF),以棧(Treiber stack)的形式存儲,stack表示棧頂元素。
CompletableFuture 在設計思想上類似 “觀察者模式,每個 CompletableFuture 都可以被看作一個被觀察者,其內部有一個Completion類型的鏈表成員變量stack,用來存儲注冊到其中的所有觀察者。當被觀察者執行完成后會彈棧stack屬性,依次通知注冊到其中的觀察者。
執行流程
CompletableFuture 的執行流程如下:
- 創建CompletableFuture對象:通過調用CompletableFuture類的構造方法或靜態工廠方法創建一個新的CompletableFuture對象。
- 定義異步任務:使用supplyAsync()、runAsync()等方法定義需要在后臺線程中執行的異步任務,這些方法接受一個 lambda表達式 或 Supplier/Runnable 接口作為參數。
- 啟動異步任務:一旦CompletableFuture對象創建并定義了異步任務,任務會立即在后臺線程中開始執行,并返回一個代表異步計算結果的CompletableFuture對象。
- 異步任務執行過程:
當異步任務完成時,它會設置自己的結果值,將狀態標記為已完成。
如果有其他線程在此之前調用了complete()、completeExceptionally()、cancel()等方法,可能會影響任務的最終狀態。
- 注冊回調方法:
- 使用thenApply(), thenAccept(), thenRun()等方法來注冊回調函數,當異步任務完成或異常時,這些回調函數會被觸發。
- 回調函數也可以是異步的,通過thenApplyAsync(), thenAcceptAsync(), thenRunAsync()等方法注冊。
- 組合多個CompletableFuture:
- 使用thenCompose(), thenCombine(), allOf(), anyOf()等方法,可以將多個CompletableFuture對象進行組合,形成更復雜的異步任務處理流程。
- 處理異常:
- 通過使用exceptionally(), handle(), whenComplete()等方法,可以注冊異常處理函數,當異步任務出現異常時,這些處理函數會被觸發。
- 等待結果:
- 使用get()或join()方法來阻塞當前線程,并等待CompletableFuture對象的完成并獲取最終的結果。
- get()方法會拋出可能的異常(InterruptedException, ExecutionException)。
- join()方法與get()類似,但不會拋出 checked 異常。
- 取消任務:通過調用CompletableFuture對象的cancel()方法取消異步任務的執行。
請注意,以上步驟的順序和具體實現可能略有不同,但大致上反映了CompletableFuture的執行流程。在實際應用中,我們可以根據需求選擇適合的方法來處理異步任務的完成情況、結果、異常以及任務之間的關系。
方法介紹
CompletableFuture類提供了一系列用于處理和組合異步任務的方法。以下是這些方法的介紹:
創建對象
創建一個 CompletableFuture 對象有以下幾種方法:
- 使用 CompletableFuture 的構造方法
CompletableFuture<String> future = new CompletableFuture<>();
- 使用 CompletableFuture 的靜態工廠方法
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 異步任務邏輯
return "Result";
});
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 異步任務邏輯
});
- 使用轉換方法
CompletableFuture<Integer> transformedFuture = originalFuture.thenApply(result -> {
// 轉換邏輯
return result.length();
});
originalFuture.thenAccept(result -> {
// 處理結果邏輯
System.out.println("Result: " + result);
});
CompletableFuture<Void> runnableFuture = originalFuture.thenRun(() -> {
// 在結果完成后執行的操作
});
- 直接創建一個已完成狀態的CompletableFuture
//CompletableFuture.completedFuture()直接創建一個已完成狀態的CompletableFuture
CompletableFuture<String> cf2 = CompletableFuture.completedFuture("result");
//先初始化一個未完成的CompletableFuture,然后通過complete()、completeExceptionally(),也完成該CompletableFuture
CompletableFuture<String> cf = new CompletableFuture<>();
cf.complete("success");
- toCompletableFuture
CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<Integer> future = stage.toCompletableFuture();
用于將當前的 CompletionStage 對象轉換為一個 CompletableFuture 對象。
異步執行任務
以下是在 CompletableFuture 對象上異步執行任務的一些方法示例:
- supplyAsync(Supplier<U> supplier):異步執行一個有返回值的供應商(Supplier)任務。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 異步任務邏輯
return "Result";
});
- runAsync(Runnable runnable):異步執行一個沒有返回值的任務。
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 異步任務邏輯
});
鏈式操作
CompletableFuture提供了不同的方式來對異步任務進行鏈式操作。
- thenRun
CompletableFuture<Void> executedFuture = future.thenRun(() -> executeTask());
thenRun方法用于在CompletableFuture完成后執行一個Runnable任務。它返回一個新的CompletableFuture對象,該對象沒有返回值。
- thenAccept
CompletableFuture<Void> acceptedFuture = future.thenAccept(result -> processResult(result));
thenAccept方法用于在CompletableFuture完成后對結果進行處理。它接收一個Consumer函數作為參數,并返回一個新的CompletableFuture對象。
- thenApply
CompletableFuture<U> appliedFuture = future.thenApply(result -> transformResult(result));
thenApply方法用于在CompletableFuture完成后對結果進行轉換。它接收一個Function函數作為參數,并返回一個新的CompletableFuture對象。
- thenCompose
CompletableFuture<U> composedFuture = future.thenCompose(result -> executeAnotherTask(result));
用于對異步任務的結果進行處理,并返回一個新的異步任務。
- whenComplete
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<Void> whenCompleteFuture = future.whenComplete((result, exception) -> {
if (exception != null) {
System.out.println("Exception occurred: " + exception.getMessage());
} else {
System.out.println("Result: " + result);
}
});
whenCompleteFuture.join();
用于在異步任務完成后執行指定的動作。它允許你在任務完成時處理結果或處理異常。
- thenCompose() 用于對異步任務的結果進行處理,并返回一個新的異步任務。它接受一個函數式接口參數,根據原始任務的結果創建并返回一個新的 CompletionStage 對象。
- whenComplete() 用于在異步任務完成后執行指定的動作。它接受一個消費者函數式接口參數,用于處理任務的結果或異常,但沒有返回值。
異步任務組合
CompletableFuture還提供了一系列方法來組合和處理多個異步任務的結果。
- allOf
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2, future3);
allOf方法接收一組CompletableFuture對象作為參數,并返回一個新的CompletableFuture對象,該對象在所有給定的CompletableFuture都完成時完成。這樣我們可以等待所有任務都完成后再進行下一步操作。
- anyOf
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2, future3);
anyOf方法與allOf類似,不同之處在于它返回的CompletableFuture對象在任何一個給定的CompletableFuture完成時就完成。這樣我們可以獲取最先完成的任務的結果。
- thenCombine
CompletableFuture<U> combinedFuture = future1.thenCombine(future2, (result1, result2) -> combineResults(result1, result2));
thenCombine方法接收兩個CompletableFuture對象和一個函數作為參數,用于指定當這兩個CompletableFuture都完成時如何處理它們的結果。返回的新的CompletableFuture對象將接收到計算后的結果。
- applyToEither
CompletableFuture<U> resultFuture = future1.applyToEither(future2, result -> processResult(result));
applyToEither方法用于獲取兩個CompletableFuture中任意一個完成的結果,并對該結果進行處理。它接收一個Function函數作為參數,并返回一個新的CompletableFuture對象。
- acceptEither
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
future1.acceptEither(future2, result -> {
System.out.println("Result: " + result);
});
用于在兩個 CompletableFuture 對象中任意一個完成時執行指定的操作。該方法接收兩個參數:另一個 CompletableFuture 對象和一個消費者函數(Consumer)。當其中任何一個 CompletableFuture 完成時,將其結果作為參數傳遞給消費者函數進行處理。
- runAfterBoth
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> combinedFuture = future1.runAfterBoth(future2, () -> {
System.out.println("Both futures completed");
});
combinedFuture.join();
用于在兩個異步任務都完成后執行指定的動作,需要注意的是,runAfterBoth() 方法是一個非阻塞方法,動作將在兩個異步任務都完成后立即執行。
- runAfterEither
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 42;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello";
});
CompletableFuture<Void> eitherFuture = future1.runAfterEither(future2, () -> {
System.out.println("One of the futures completed");
});
eitherFuture.join();
用于在兩個異步任務中任意一個完成后執行指定的動作。
- thenAcceptBoth
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> thenAcceptBothFuture = future1.thenAcceptBoth(future2, (result1, result2) -> {
System.out.println("Action executed with thenAcceptBoth(): " + result1 + ", " + result2);
});
thenAcceptBothFuture.join();
用于在兩個異步任務都完成后執行指定的動作。它的作用是接收兩個異步任務的結果,并將結果作為參數傳遞給指定的消費者函數。
異常處理
CompletableFuture提供了多種方式來處理異步任務的異常情況。
- exceptionally
CompletableFuture<U> exceptionHandledFuture = future.exceptionally(ex -> handleException(ex));
通過exceptionally方法,我們可以對CompletableFuture的異常情況進行處理。它接收一個Function函數作為參數,用于處理異常并返回一個新的CompletableFuture對象。
- handle
CompletableFuture<U> handledFuture = future.handle((result, ex) -> handleResult(result, ex));
handle方法可以同時處理正常結果和異常情況。它接收一個BiFunction函數作為參數,用于處理結果和異常,并返回一個新的CompletableFuture對象。
- completeExceptionally
future.completeExceptionally();
異常地完成 CompletableFuture,將結果設置為一個異常。
- isCompletedExceptionally
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Something went wrong");
});
boolean completedExceptionally = future.isCompletedExceptionally();
System.out.println("Is completed exceptionally: " + completedExceptionally);
該方法返回一個布爾值,表示當前異步任務是否已經異常完成。
- obtrudeException
CompletableFuture<Integer> future = new CompletableFuture<>();
future.obtrudeException(new RuntimeException("Something went wrong"));
boolean completedExceptionally = future.isCompletedExceptionally();
System.out.println("Is completed exceptionally: " + completedExceptionally);
用于強制將指定的異常作為異步任務的結果,調用 obtrudeException(Throwable ex) 方法后,異步任務將立即完成,并將指定的異常作為結果返回。
取值與狀態
- join
future.join()
join() 方法不會拋出已檢查異常,因為它是基于 CompletableFuture 類設計的,如果異步任務拋出異常,join() 方法會將該異常包裝在 CompletionException 中并拋出。
- get
future.get()
get() 方法會拋出一個 InterruptedException 異常和一個 ExecutionException 異常,前者表示獲取結果時被中斷,后者表示獲取結果時任務本身拋出了異常。
future.get(1,TimeUnit.Hours)
有異常則拋出異常,最長等待一個小時,一個小時之后,如果還沒有數據,則異常。
- getNow
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 異步任務邏輯
return 42;
});
int result = future.getNow(0); // 獲取異步操作的結果,如果尚未完成,則返回默認值0
System.out.println("Result: " + result);
getNow(T value) 是 CompletableFuture 類的一個方法,用于獲取異步操作的結果,如果異步操作尚未完成,則返回給定的默認值,該方法會立即返回結果,不會阻塞當前線程。
超時控制與取消操作
CompletableFuture也支持超時控制和取消操作,以便更好地管理異步任務的執行。
- completeOnTimeout
CompletableFuture<U> timeoutFuture = future.completeOnTimeout(defaultResult, timeout, timeUnit);
completeOnTimeout方法在指定的超時時間內等待CompletableFuture的完成,如果超時則將其設置為默認結果。它返回一個新的CompletableFuture對象。
- cancel
boolean isCancelled = future.cancel(true);
cancel方法可用于取消CompletableFuture的執行。它接收一個boolean參數,指示是否中斷正在執行的任務。返回值表示是否成功取消了任務。
- isCancelled
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 異步任務邏輯
return 42;
});
future.cancel(true); // 取消異步任務
boolean isCancelled = future.isCancelled();
System.out.println("Is cancelled: " + isCancelled);
isCancelled() 是 CompletableFuture 類的一個方法,用于判斷當前異步任務是否已被取消。如果異步任務已被取消,則返回 true;否則返回 false。
依賴
- getNumberOfDependents
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);
int numberOfDependents = combinedFuture.getNumberOfDependents();
System.out.println("Number of dependents: " + numberOfDependents);
getNumberOfDependents() 用于獲取當前 CompletableFuture 對象所依賴的其他異步任務的數量。如果沒有任何依賴任務,或者所有依賴任務已經完成,則返回的數量為0。
完成
- complete
future.complete("米飯");
complete(T value):該方法返回布爾值,表示是否成功地將結果設置到 CompletableFuture 中。如果 CompletableFuture 未完成,則將結果設置,并返回 true;如果 CompletableFuture 已經完成,則不進行任何操作并返回 false。
- obtrudeValue
CompletableFuture<Integer> future = new CompletableFuture<>();
future.obtrudeValue(42);
boolean completedNormally = future.isDone() && !future.isCompletedExceptionally();
System.out.println("Is completed normally: " + completedNormally);
用于強制將指定的值作為異步任務的結果,調用 obtrudeValue(T value) 方法后,異步任務將立即完成,并將指定的值作為結果返回。
與 complete() 不同,obtrudeValue() 必須在任務已經完成的情況下調用,否則會引發 IllegalStateException 異常。并且complete() 方法對于已經完成的任務會忽略額外的完成操作,并返回 false。而obtrudeValue() 方法即使任務已經完成,仍然會強制使用新的結果值,并返回 true。
- isDone
CompletableFuture<Integer> future = CompletableFuture.completedFuture(42);
boolean done = future.isDone();
System.out.println("Is done: " + done);
用于判斷當前異步任務是否已經完成(無論是正常完成還是異常完成)。
并發限制
CompletableFuture也支持并發限制,以控制同時執行的異步任務數量。
Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<U> future = CompletableFuture.supplyAsync(() -> doSomething(), executor);
我們可以通過使用線程池來限制CompletableFuture的并發執行數量。通過創建一個固定大小的線程池,并將其作為參數傳遞給CompletableFuture,就可以控制并發執行任務的數量。
記憶竅門
CompletableFuture類提供了許多方法,但實際上常用的方法只有幾個。為了方便記憶,以下是一些總結的規律:
- 方法名帶Async的都是異步方法,對應的沒有Async則是同步方法,比如 thenAccept 與 thenAcceptAsync 。
- 方法名帶run的入參為Runnable,且無返回值。
- 方法名帶supply的入參為Supplier,且有返回值。
- 方法名帶Accept的入參為Consumer,且無返回值。
- 方法名帶Apply的入參為Function,且有返回值。
- 方法名帶Either的方法表示誰先完成就消費誰。
- 方法名帶Both的方法表示兩個任務都完成才消費。
掌握以上規律后,就可以基本記住大部分方法,剩下的其他方法可以單獨記憶。
總結
本文詳細探討了 CompletableFuture 的原理和方法,學習了如何在任務完成后執行操作、處理結果和轉換結果。
CompletableFuture是Java中強大的異步編程工具之一,合理利用它的方法和策略可以更好地處理異步任務和操作。