聊聊 JDK8 的 CompletableFuture ,你明白了嗎?
前段時間,阿粉已經說過一次CompletableFuture了,但是還是有讀者說,感覺不是很清晰,有點亂的樣子,今天阿粉就再來說一下這個CompletableFuture的一些API的方法。
CompletableFuture
CompletableFuture是java.util.concurrent庫在java 8中新增的主要工具,同傳統的Future相比,其支持流式計算、函數式編程、完成通知、自定義異常處理等很多新的特性。
supplyAsync方法
通過該函數創建的CompletableFuture實例會異步執行當前傳入的計算任務。在調用端,則可以通過get或join獲取最終計算結果。
這個有兩個不同的實現方式,一種是我們傳入我們自己創建的線程池,然后使用我們創建的線程池進行操作,還有一種就是不傳線程池,讓程序是使用默認的線程池進行操作。
//使用默認線程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
//使用自定義線程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
第一種只需傳入一個Supplier實例(一般使用lamda表達式),此時框架會默認使用ForkJoin的線程池來執行被提交的任務。
我們來自定義一個代碼看一下:
ExecutorService executors = Executors.newFixedThreadPool(5);
CompletableFuture<List<Order>> bFuture = CompletableFuture.supplyAsync(() -> {
//執行查詢
QueryWrapper<Order> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("userId",params.getUserId());
queryWrapper.eq("SHOP_ID",params.getShopId());
List<Order> list = orderService.list(queryWrapper);
return list;
}, executors);
當我們執行查詢的時候,這時候實際上就屬于異步的查詢的,我們可以寫多個查詢,比如,上面的代碼我們查詢的是訂單,下面我們可以查詢用戶的信息,還是使用同樣的線程池。
CompletableFuture<List<User>> aFuture = CompletableFuture.supplyAsync(() -> {
//執行查詢
QueryWrapper<User> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("SHOP_ID",params.getShopId());
List<User> list = userService.list(queryWrapper);
return list;
}, executors);
這時候,我們就可以獲取一下這個返回的結果,那么獲取返回結果我們需要調用什么方法呢?
可以通過get或join獲取最終計算結果。這時候我們可以打印一下,獲取結果的長度。
List<User> list = aFuture.get();
List<Order> list = bFuture.get();
這樣,我們就可以使用異步查詢來完成我們對結果集的查詢。
這就有小伙伴想問,如果我想要用第一個的結果,去在第二個數據中去查詢,我們應該怎么做呢?
thenApply
thenApply提交的任務類型需遵從Function簽名,也就是有入參和返回值,其中入參為前置任務的結果。
什么意思呢?其實很簡單,我們這里直接看代碼:
CompletableFuture<List<Order>> cFuture = bFuture.thenApply((list) -> {
List<String> collect = list.stream().map(User::getUserId).collect(Collectors.toList());
...
return orderList;
});
這實際上,就是我們根據查詢出的所有用戶的集合,直接獲取到他的userId,然后我們根據UserId,把這些用戶下的訂單數據都提取出來,當然,在實際使用中,我們理論上可以無限連接后續計算任務,從而實現鏈條更長的流式計算。
但是這種鏈式也不是都非常的好用,畢竟要控制住線程池,大家記得在使用完成之后,可以把自己創建的線程池小回調,調用shutDown方法就可以了。我們再接著往下說。
thenAccept
實際上thenAccept的效果,和thenApply 的效果等同,但是thenAccept提交的任務類型需遵從Consumer簽名,也就是有入參但是沒有返回值,其中入參為前置任務的結果。
實際上調用的是和之前一樣的,但是就是沒有返回值了。
CompletableFuture<void> cFuture = bFuture.thenAccept((list) -> {
List<String> collect = list.stream().map(User::getUserId).collect(Collectors.toList());
...
});
這就是區別,不需要進行返回了,這種一般用的地方比較特殊,就比如說我們執行了很多操作,但是在其中需要夾雜一些比如說給某個服務進行通知,但是我們并不需要這個服務給我們返回值的結果的時候,實際上完全可以使用這種方式。方便還快捷。當然,你想直接調用,阿粉也是沒有意見的。
whenComplete
whenComplete主要用于注入任務完成時的回調通知邏輯。這個解決了傳統future在任務完成時,無法主動發起通知的問題。前置任務會將計算結果或者拋出的異常作為入參傳遞給回調通知函數。
這個方法就是相當于是把前一個任務中的結果,通過第二個方法獲取結果,也并不會影響第二個的邏輯。
示例代碼如下:
CompletableFuture<List<Order>> listCompletableFuture = bFuture.whenComplete((r, e) -> {
if (e != null) {
System.out.println("compute failed!");
} else {
System.out.println("received result is " + r);
}
});
實際上最后答應的就是接受者的信息。大家學會了么?