成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

實現異步編程,這個工具類你得掌握!

開發 開發工具
在日常的Java8項目開發中,CompletableFuture是很強大的并行開發工具,其語法貼近java8的語法風格,與stream一起使用也能大大增加代碼的簡潔性。

 [[393773]]

本文轉載自微信公眾號「月伴飛魚」,作者日常加油站。轉載本文請聯系月伴飛魚公眾號。   

前言

最近看公司代碼,多線程編程用的比較多,其中有對CompletableFuture的使用,所以想寫篇文章總結下

在日常的Java8項目開發中,CompletableFuture是很強大的并行開發工具,其語法貼近java8的語法風格,與stream一起使用也能大大增加代碼的簡潔性

大家可以多應用到工作中,提升接口性能,優化代碼

基本介紹

CompletableFuture是Java 8新增的一個類,用于異步編程,繼承了Future和CompletionStage

這個Future主要具備對請求結果獨立處理的功能,CompletionStage用于實現流式處理,實現異步請求的各個階段組合或鏈式處理,因此completableFuture能實現整個異步調用接口的扁平化和流式處理,解決原有Future處理一系列鏈式異步請求時的復雜編碼

Future的局限性

1、Future 的結果在非阻塞的情況下,不能執行更進一步的操作

我們知道,使用Future時只能通過isDone()方法判斷任務是否完成,或者通過get()方法阻塞線程等待結果返回,它不能非阻塞的情況下,執行更進一步的操作。

2、不能組合多個Future的結果

假設你有多個Future異步任務,你希望最快的任務執行完時,或者所有任務都執行完后,進行一些其他操作

3、多個Future不能組成鏈式調用

當異步任務之間有依賴關系時,Future不能將一個任務的結果傳給另一個異步任務,多個Future無法創建鏈式的工作流。

4、沒有異常處理

現在使用CompletableFuture能幫助我們完成上面的事情,讓我們編寫更強大、更優雅的異步程序

基本使用

創建異步任務

通常可以使用下面幾個CompletableFuture的靜態方法創建一個異步任務

  1. public static CompletableFuture<Void> runAsync(Runnable runnable);              //創建無返回值的異步任務 
  2. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);     //無返回值,可指定線程池(默認使用ForkJoinPool.commonPool) 
  3. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);           //創建有返回值的異步任務 
  4. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); //有返回值,可指定線程池 

使用示例:

  1. Executor executor = Executors.newFixedThreadPool(10); 
  2. CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { 
  3.     //do something 
  4. }, executor); 
  5. int poiId = 111; 
  6. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { 
  7.  PoiDTO poi = poiService.loadById(poiId); 
  8.   return poi.getName(); 
  9. }); 
  10. // Block and get the result of the Future 
  11. String poiName = future.get(); 

使用回調方法

通過future.get()方法獲取異步任務的結果,還是會阻塞的等待任務完成

CompletableFuture提供了幾個回調方法,可以不阻塞主線程,在異步任務完成后自動執行回調方法中的代碼

  1. public CompletableFuture<Void> thenRun(Runnable runnable);            //無參數、無返回值 
  2. public CompletableFuture<Void> thenAccept(Consumer<? super T> action);         //接受參數,無返回值 
  3. public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn); //接受參數T,有返回值U 

使用示例:

  1. CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello"
  2.                            .thenRun(() -> System.out.println("do other things. 比如異步打印日志或發送消息")); 
  3. //如果只想在一個CompletableFuture任務執行完后,進行一些后續的處理,不需要返回值,那么可以用thenRun回調方法來完成。 
  4. //如果主線程不依賴thenRun中的代碼執行完成,也不需要使用get()方法阻塞主線程。 
  1. CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello"
  2.                            .thenAccept((s) -> System.out.println(s + " world")); 
  3. //輸出:Hello world 
  4. //回調方法希望使用異步任務的結果,并不需要返回值,那么可以使用thenAccept方法 
  1. CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> { 
  2.   PoiDTO poi = poiService.loadById(poiId); 
  3.   return poi.getMainCategory(); 
  4. }).thenApply((s) -> isMainPoi(s));   // boolean isMainPoi(int poiId); 
  5.  
  6. future.get(); 
  7. //希望將異步任務的結果做進一步處理,并需要返回值,則使用thenApply方法。 
  8. //如果主線程要獲取回調方法的返回,還是要用get()方法阻塞得到 

組合兩個異步任務

  1. //thenCompose方法中的異步任務依賴調用該方法的異步任務 
  2. public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);  
  3. //用于兩個獨立的異步任務都完成的時候 
  4. public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,  
  5.                                               BiFunction<? super T,? super U,? extends V> fn);  

使用示例:

  1. CompletableFuture<List<Integer>> poiFuture = CompletableFuture.supplyAsync( 
  2.   () -> poiService.queryPoiIds(cityId, poiId) 
  3. ); 
  4. //第二個任務是返回CompletableFuture的異步方法 
  5. CompletableFuture<List<DealGroupDTO>> getDeal(List<Integer> poiIds){ 
  6.   return CompletableFuture.supplyAsync(() ->  poiService.queryPoiIds(poiIds)); 
  7. //thenCompose 
  8. CompletableFuture<List<DealGroupDTO>> resultFuture = poiFuture.thenCompose(poiIds -> getDeal(poiIds)); 
  9. resultFuture.get(); 

thenCompose和thenApply的功能類似,兩者區別在于thenCompose接受一個返回CompletableFuture的Function,當想從回調方法返回的CompletableFuture中直接獲取結果U時,就用thenCompose

如果使用thenApply,返回結果resultFuture的類型是CompletableFuture>>,而不是CompletableFuture>

  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello"
  2.   .thenCombine(CompletableFuture.supplyAsync(() -> "world"), (s1, s2) -> s1 + s2); 
  3. //future.get() 

組合多個CompletableFuture

當需要多個異步任務都完成時,再進行后續處理,可以使用allOf方法

  1. CompletableFuture<Void> poiIDTOFuture = CompletableFuture 
  2.  .supplyAsync(() -> poiService.loadPoi(poiId)) 
  3.   .thenAccept(poi -> { 
  4.     model.setModelTitle(poi.getShopName()); 
  5.     //do more thing 
  6.   }); 
  7.  
  8. CompletableFuture<Void> productFuture = CompletableFuture 
  9.  .supplyAsync(() -> productService.findAllByPoiIdOrderByUpdateTimeDesc(poiId)) 
  10.   .thenAccept(list -> { 
  11.     model.setDefaultCount(list.size()); 
  12.     model.setMoreDesc("more"); 
  13.   }); 
  14. //future3等更多異步任務,這里就不一一寫出來了 
  15.  
  16. CompletableFuture.allOf(poiIDTOFuture, productFuture, future3, ...).join();  //allOf組合所有異步任務,并使用join獲取結果 

該方法挺適合C端的業務,比如通過poiId異步的從多個服務拿門店信息,然后組裝成自己需要的模型,最后所有門店信息都填充完后返回

這里使用了join方法獲取結果,它和get方法一樣阻塞的等待任務完成

多個異步任務有任意一個完成時就返回結果,可以使用anyOf方法

  1. CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { 
  2.     try { 
  3.         TimeUnit.SECONDS.sleep(2); 
  4.     } catch (InterruptedException e) { 
  5.        throw new IllegalStateException(e); 
  6.     } 
  7.     return "Result of Future 1"
  8. }); 
  9.  
  10. CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { 
  11.     try { 
  12.         TimeUnit.SECONDS.sleep(1); 
  13.     } catch (InterruptedException e) { 
  14.        throw new IllegalStateException(e); 
  15.     } 
  16.     return "Result of Future 2"
  17. }); 
  18.  
  19. CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { 
  20.     try { 
  21.         TimeUnit.SECONDS.sleep(3); 
  22.     } catch (InterruptedException e) { 
  23.        throw new IllegalStateException(e); 
  24.       return "Result of Future 3"
  25. }); 
  26.  
  27. CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3); 
  28.  
  29. System.out.println(anyOfFuture.get()); // Result of Future 2 

異常處理

  1. Integer age = -1; 
  2.  
  3. CompletableFuture<Void> maturityFuture = CompletableFuture.supplyAsync(() -> { 
  4.   if(age < 0) { 
  5.     throw new IllegalArgumentException("Age can not be negative"); 
  6.   } 
  7.   if(age > 18) { 
  8.     return "Adult"
  9.   } else { 
  10.     return "Child"
  11.   } 
  12. }).exceptionally(ex -> { 
  13.   System.out.println("Oops! We have an exception - " + ex.getMessage()); 
  14.   return "Unknown!"
  15. }).thenAccept(s -> System.out.print(s)); 
  16. //Unkown! 

exceptionally方法可以處理異步任務的異常,在出現異常時,給異步任務鏈一個從錯誤中恢復的機會,可以在這里記錄異常或返回一個默認值

使用handler方法也可以處理異常,并且無論是否發生異常它都會被調用

  1. Integer age = -1; 
  2.  
  3. CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> { 
  4.     if(age < 0) { 
  5.         throw new IllegalArgumentException("Age can not be negative"); 
  6.     } 
  7.     if(age > 18) { 
  8.         return "Adult"
  9.     } else { 
  10.         return "Child"
  11.     } 
  12. }).handle((res, ex) -> { 
  13.     if(ex != null) { 
  14.         System.out.println("Oops! We have an exception - " + ex.getMessage()); 
  15.         return "Unknown!"
  16.     } 
  17.     return res; 
  18. }); 

分片處理

分片和并行處理:分片借助stream實現,然后通過CompletableFuture實現并行執行,最后做數據聚合(其實也是stream的方法)

CompletableFuture并不提供單獨的分片api,但可以借助stream的分片聚合功能實現

舉個例子:

 

  1. //請求商品數量過多時,做分批異步處理 
  2. List<List<Long>> skuBaseIdsList = ListUtils.partition(skuIdList, 10);//分片 
  3. //并行 
  4. List<CompletableFuture<List<SkuSales>>> futureList = Lists.newArrayList(); 
  5. for (List<Long> skuId : skuBaseIdsList) { 
  6.   CompletableFuture<List<SkuSales>> tmpFuture = getSkuSales(skuId); 
  7.   futureList.add(tmpFuture); 
  8. //聚合 
  9. futureList.stream().map(CompletalbleFuture::join).collent(Collectors.toList()); 

舉個例子

帶大家領略下CompletableFuture異步編程的優勢

這里我們用CompletableFuture實現水泡茶程序

首先還是需要先完成分工方案,在下面的程序中,我們分了3個任務:

  • 任務1負責洗水壺、燒開水
  • 任務2負責洗茶壺、洗茶杯和拿茶葉
  • 任務3負責泡茶。其中任務3要等待任務1和任務2都完成后才能開始

下面是代碼實現,你先略過runAsync()、supplyAsync()、thenCombine()這些不太熟悉的方法,從大局上看,你會發現:

  • 無需手工維護線程,沒有繁瑣的手工維護線程的工作,給任務分配線程的工作也不需要我們關注;
  • 語義更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能夠清晰地表述任務3要等待任務1和任務2都完成后才能開始;
  • 代碼更簡練并且專注于業務邏輯,幾乎所有代碼都是業務邏輯相關的
  1. //任務1:洗水壺->燒開水 
  2. CompletableFuture f1 =  
  3.   CompletableFuture.runAsync(()->{ 
  4.   System.out.println("T1:洗水壺..."); 
  5.   sleep(1, TimeUnit.SECONDS); 
  6.  
  7.   System.out.println("T1:燒開水..."); 
  8.   sleep(15, TimeUnit.SECONDS); 
  9. }); 
  10. //任務2:洗茶壺->洗茶杯->拿茶葉 
  11. CompletableFuture f2 =  
  12.   CompletableFuture.supplyAsync(()->{ 
  13.   System.out.println("T2:洗茶壺..."); 
  14.   sleep(1, TimeUnit.SECONDS); 
  15.  
  16.   System.out.println("T2:洗茶杯..."); 
  17.   sleep(2, TimeUnit.SECONDS); 
  18.  
  19.   System.out.println("T2:拿茶葉..."); 
  20.   sleep(1, TimeUnit.SECONDS); 
  21.   return "龍井"
  22. }); 
  23. //任務3:任務1和任務2完成后執行:泡茶 
  24. CompletableFuture f3 =  
  25.   f1.thenCombine(f2, (__, tf)->{ 
  26.     System.out.println("T1:拿到茶葉:" + tf); 
  27.     System.out.println("T1:泡茶..."); 
  28.     return "上茶:" + tf; 
  29.   }); 
  30. //等待任務3執行結果 
  31. System.out.println(f3.join()); 
  32.  
  33. void sleep(int t, TimeUnit u) { 
  34.   try { 
  35.     u.sleep(t); 
  36.   }catch(InterruptedException e){} 

注意事項

1.CompletableFuture默認線程池是否滿足使用

前面提到創建CompletableFuture異步任務的靜態方法runAsync和supplyAsync等,可以指定使用的線程池,不指定則用CompletableFuture的默認線程池

  1. private static final Executor asyncPool = useCommonPool ? 
  2.         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 

可以看到,CompletableFuture默認線程池是調用ForkJoinPool的commonPool()方法創建,這個默認線程池的核心線程數量根據CPU核數而定,公式為Runtime.getRuntime().availableProcessors() - 1,以4核雙槽CPU為例,核心線程數量就是4*2-1=7個

這樣的設置滿足CPU密集型的應用,但對于業務都是IO密集型的應用來說,是有風險的,當qps較高時,線程數量可能就設的太少了,會導致線上故障

所以可以根據業務情況自定義線程池使用

 

2.get設置超時時間不能串行get,不然會導致接口延時線程數量*超時時間

 

責任編輯:武曉燕 來源: 月伴飛魚
相關推薦

2022-03-03 08:30:41

GeneratorES6函數

2025-05-14 00:01:10

RxJS異步編程響應式

2019-12-31 14:10:58

Excel文章SQL

2022-10-24 07:31:53

Python編程裝飾器

2024-04-30 11:11:33

aiohttp模塊編程

2013-04-01 15:38:54

異步編程異步編程模型

2021-05-07 16:19:36

異步編程Java線程

2024-04-01 09:45:50

TAP模式.NET異步編程

2024-03-15 08:23:26

異步編程函數

2024-11-08 09:48:38

異步編程I/O密集

2023-07-06 08:31:50

Python對象編程

2025-05-19 09:30:42

FastAPI接口代碼

2020-03-29 08:27:05

Promise異步編程前端

2023-11-24 16:13:05

C++編程

2023-09-18 07:46:28

2011-02-24 12:53:51

.NET異步傳統

2019-04-30 15:10:42

Python調試工具編程語言

2020-10-27 10:58:07

Linux內核操作系統

2022-01-25 12:41:31

ChromeResponse接口

2013-04-01 15:25:41

異步編程異步EMP
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 男人天堂国产 | 久久久久久久久久爱 | 亚洲国产成人在线视频 | 欧美高清hd | 日韩中文字幕高清 | 亚洲97 | 日韩视频在线播放 | 久久久激情 | 欧美综合一区二区 | 久久精品国产一区二区三区 | 久久一区二区三区免费 | 成人精品一区二区户外勾搭野战 | 国产99视频精品免费播放照片 | 国产精品一区二区不卡 | 99re视频在线 | 欧美日韩亚洲系列 | 国产精品成人国产乱一区 | 亚洲精品成人免费 | 龙珠z在线观看 | 亚洲精品乱码久久久久久蜜桃 | 求毛片| 在线免费国产视频 | 国产精品视频免费播放 | 亚洲国产中文字幕 | 农夫在线精品视频免费观看 | 亚洲一区二区在线播放 | 99精品视频在线观看免费播放 | 欧美精品片 | 日本视频在线播放 | 小h片免费观看久久久久 | 免费av观看 | 午夜精品一区二区三区在线观看 | 精品免费国产视频 | www视频在线观看 | 欧美日韩电影免费观看 | 日韩视频观看 | 日韩不卡在线 | 久久中文字幕av | www国产成人免费观看视频,深夜成人网 | 国产亚洲精品美女久久久久久久久久 | 色综合一区二区三区 |