CompletableFuture真香,可以替代CountDownLatch!
在對類的命名這篇長文中,我們提到了Future和Promise。
Future相當于一個占位符,代表一個操作將來的結(jié)果。一般通過get可以直接阻塞得到結(jié)果,或者讓它異步執(zhí)行然后通過callback回調(diào)結(jié)果。
但如果回調(diào)中嵌入了回調(diào)呢?如果層次很深,就是回調(diào)地獄。Java中的CompletableFuture其實就是Promise,用來解決回調(diào)地獄問題。Promise是為了讓代碼變得優(yōu)美而存在的。
有多優(yōu)美?這么說吧,一旦你使用了CompletableFuture,就會愛不釋手,就像初戀女友一樣,天天想著她。
一系列靜態(tài)方法
從它的源代碼中,我們可以看到,CompletableFuture直接提供了幾個便捷的靜態(tài)方法入口。其中有run和supply兩組。
run的參數(shù)是Runnable,而supply的參數(shù)是Supplier。前者沒有返回值,而后者有,否則沒有什么兩樣。
這兩組靜態(tài)函數(shù),都提供了傳入自定義線程池的功能。如果你用的不是外置的線程池,那么它就會使用默認的ForkJoin線程池。默認的線程池,大小和用途你是控制不了的,所以還是建議自己傳遞一個。
典型的代碼,寫起來是這個樣子。
- CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
- return "test";
- });
- String result = future.join();
拿到CompletableFuture后,你就可以做更多的花樣。
這些花樣有很多
我們說面說了,CompletableFuture的主要作用,就是讓代碼寫起來好看。配合Java8之后的stream流,可以把整個計算過程抽象成一個流。前面任務(wù)的計算結(jié)果,可以直接作為后面任務(wù)的輸入,就像是管道一樣。
- thenApply
- thenApplyAsync
- thenAccept
- thenAcceptAsync
- thenRun
- thenRunAsync
- thenCombine
- thenCombineAsync
- thenCompose
- thenComposeAsync
比如,下面代碼的執(zhí)行結(jié)果是99,并不因為是異步就打亂代碼執(zhí)行的順序了。
- CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
- .thenApplyAsync((e) -> {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- return e * 10;
- }).thenApplyAsync(e -> e - 1);
- cf.join();
- System.out.println(cf.get());
同樣的,函數(shù)的作用還要看then后面的動詞。
- apply 有入?yún)⒑头祷刂担雲(yún)榍爸萌蝿?wù)的輸出
- accept 有入?yún)o返回值,會返回CompletableFuture
- run 沒有入?yún)⒁矝]有返回值,同樣會返回CompletableFuture
- combine 形成一個復(fù)合的結(jié)構(gòu),連接兩個CompletableFuture,并將它們的2個輸出結(jié)果,作為combine的輸入
- compose 將嵌套的CompletableFuture平鋪開,用來串聯(lián)兩個CompletableFuture
when和handle
上面的函數(shù)列表,其實還有很多。比如:
- whenComplete
when的意思,就是任務(wù)完成時候的回調(diào)。比如我們上面的例子,打算在完成任務(wù)后,輸出一個done。它也是屬于只有入?yún)]有出參的范疇,適合放在最后一步進行觀測。
- CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
- .thenApplyAsync((e) -> {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- return e * 10;
- }).thenApplyAsync(e -> e - 1)
- .whenComplete((r, e)->{
- System.out.println("done");
- })
- ;
- cf.join();
- System.out.println(cf.get());
handle和exceptionally的作用,和whenComplete是非常像的。
- public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
- public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture的任務(wù)是串聯(lián)的,如果它的其中某一步驟發(fā)生了異常,會影響后續(xù)代碼的運行的。
exceptionally從名字就可以看出,是專門處理這種情況的。比如,我們強制某個步驟除以0,發(fā)生異常,捕獲后返回-1,它將能夠繼續(xù)運行。
- CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
- .thenApplyAsync(e->e/0)
- .thenApplyAsync(e -> e - 1)
- .exceptionally(ex->{
- System.out.println(ex);
- return -1;
- });
- cf.join();
- System.out.println(cf.get());
handle更加高級一些,因為它除了一個異常參數(shù),還有一個正常的入?yún)ⅰL幚矸椒ㄒ捕碱愃疲辉儋樖觥?/p>
當然,CompletableFuture的函數(shù)不僅僅這些,還有更多,根據(jù)函數(shù)名稱很容易能夠了解到它的作用。它還可以替換復(fù)雜的CountDownLatch,這要涉及到幾個比較難搞的函數(shù)。
替代CountDownLatch
考慮下面一個場景。某一個業(yè)務(wù)接口,需要處理幾百個請求,請求之后再把這些結(jié)果給匯總起來。
如果順序執(zhí)行的話,假設(shè)每個接口耗時100ms,那么100個接口,耗時就需要10秒。假如我們并行去獲取的話,那么效率就會提高。
使用CountDownLatch可以解決。
- ExecutorService executor = Executors.newFixedThreadPool(5);
- CountDownLatch countDown = new CountDownLatch(requests.size());
- for(Request request:requests){
- executor.execute(()->{
- try{
- //some opts
- }finally{
- countDown.countDown();
- }
- });
- }
- countDown.await(200,TimeUnit.MILLISECONDS);
我們使用CompletableFuture來替換它。
- ExecutorService executor = Executors.newFixedThreadPool(5);
- List<CompletableFuture<Result>> futureList = requests
- .stream()
- .map(request->
- CompletableFuture.supplyAsync(e->{
- //some opts
- },executor))
- .collect(Collectors.toList());
- CompletableFuture<Void> allCF = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
- allCF.join();
我們這里用到了一個主要的函數(shù),那就是allOf,用來把所有的CompletableFuture組合在一起;類似的還有anyOf,表示只運行其中一個。常用的,還有三個函數(shù):
- thenAcceptBoth 處理兩個任務(wù)的情況,有兩個任務(wù)結(jié)果入?yún)ⅲ瑹o返回值
- thenCombine 處理兩個任務(wù)的情況,有入?yún)⒂蟹祷刂担钕矚g
- runAfterBoth 處理兩個任務(wù)的情況,無入?yún)ⅲ瑹o返回值
End
自從認識了CompletableFuture,我已經(jīng)很少硬編碼Future了。相對于各種回調(diào)的嵌套,CompletableFuture為我們提供了更直觀、更優(yōu)美的API。在“多個任務(wù)等待完成狀態(tài)”這個應(yīng)用場景,CompletableFuture已經(jīng)成了我的首選。
唯一的問題是,它的函數(shù)有點多,你需要熟悉一小段時間。另外,有一個小小的問題,個人覺得,這個類如果叫做Promise的話,就能夠和JS的統(tǒng)一起來,算是錦上添花吧。
作者簡介:小姐姐味道 (xjjdog),一個不允許程序員走彎路的公眾號。聚焦基礎(chǔ)架構(gòu)和Linux。十年架構(gòu),日百億流量,與你探討高并發(fā)世界,給你不一樣的味道。