成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Java8中新增新特性異步編程之CompletableFuture

開發 前端
Future是從JDK1.5開始有的,目的是獲取異步任務執行的結果,通常情況會結合ExecutorService及Callable一起使用。

環境:Java8

在Java 8中, 新增加了一個CompletableFuture類,該類提供了差不多50個左右的方法(都是用來完成各種異步場景需求),并且結合了Future的優點(繼承自Future類),提供了比Future更為強大的功能,這使得在異步編程方面變的簡單,同時還提供了函數式編程的能力,可以通過回調的方式處理計算結果,并且提供了轉換和組合CompletableFuture的各種方法。

Future基本應用

Future是從JDK1.5開始有的,目的是獲取異步任務執行的結果,通常情況會結合ExecutorService及Callable一起使用。

1. Future結合Callable使用

單任務執行

private static class Task implements Callable<String> {


  @Override
  public String call() throws Exception {
    TimeUnit.SECONDS.sleep(3) ;
    return "success";
  }
    
}
public static void main(String[] args) throws Exception {
  ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
  Future<String> future = executor.submit(new Task()) ;
  String result = future.get() ;
  System.out.println("執行結果:" + result) ;
}

當執行到future.get()方法的時候會阻塞,等待3s后繼續執行。

多個任務同時執行

private static class Task implements Callable<String> {
  private int sleep ;
  public Task(int sleep) {
    this.sleep = sleep ;
  }
    
  @Override
  public String call() throws Exception {
    TimeUnit.SECONDS.sleep(this.sleep) ;
    return "success";
  }
}
public static void main(String[] args) throws Exception {
  ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
  Future<String> future1 = executor.submit(new Task(3)) ;
  Future<String> future2 = executor.submit(new Task(2)) ;
  Future<String> future3 = executor.submit(new Task(1)) ;
  String result1 = future1.get() ;
  String result2 = future2.get() ;
  String result3 = future3.get() ;
  System.out.println("result1:" + result1 + "\t" + "result2:" + result2 + "\t" + "result3:" + result3) ;
}

以上代碼執行的3個任務分別用時3,2,1s。future1用時最長。

從運行的結果看到即便future2, future3執行時間短也必須等待future1執行完后才會繼續,雖然你可以倒過來獲取結果,但是在實際項目中的應用你應該是不能確認每個任務執行需要多長時間,誰先執行完就先獲取誰。

雖然這種同步阻塞的方式在有些場景下還是很有必要的。但由于它的同步阻塞導致了當前線程不能干其它的事必須一致等待。

CompletionService解決Future的缺點

CompletionService是一邊生產新的任務,一邊處理已經完成的任務。簡單地說就是CompletionService不管任務執行先后順序,誰先執行完就處理誰。

private static class Task implements Callable<String> {
  private int time;
  private String name ;
  public Task(int time, String name) {
    this.time = time ;
    this.name = name ;
  }
  @Override
  public String call() throws Exception {
    TimeUnit.SECONDS.sleep(this.time) ;
    return name ;
  }
    
}
public static void main(String[] args) throws Exception {
  ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
  CompletionService<String> cs = new ExecutorCompletionService<>(pool) ;
  cs.submit(new Task(3, "name" + 3)) ;
  cs.submit(new Task(1, "name" + 1)) ;
  cs.submit(new Task(2, "name" + 2)) ;
  for (int i = 0; i < 3; i++) {
    System.out.println(cs.take().get()) ;
  }
}

通過執行結果發現,任務的結果獲取是以誰先執行完處理誰與任務的執行先后沒有關系。

CompletableFuture異步編程

CompletableFuture通過如下4個靜態方法來執行異步任務

圖片圖片

2. 簡單異步任務鏈式調用執行

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
CompletableFuture.runAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(3) ;
    System.out.println(Thread.currentThread().getName() + ", 1 任務執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}, executor).thenRun(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 2 任務執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}) ;
System.out.println("主線程:" + Thread.currentThread().getName()) ;
executor.shutdown() ;

執行結果:

圖片圖片

3. 獲取上一步任務執行結果及任務完成處理

CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(3) ;
    System.out.println(Thread.currentThread().getName() + ", 1 任務執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return "1" ;
}, executor).thenApply(res -> {
  System.out.println("獲取到上一步任務執行結果:" + res) ;
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 2 任務執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return "2" ;
}).whenComplete((res, tx) -> {
  System.out.println("獲取到結果:" + res) ;
  if (tx != null) {
    System.err.println("發生錯誤了:" + tx.getMessage()) ;
  }
  executor.shutdown();
}) ;
System.out.println("主線程:" + Thread.currentThread().getName()) ;

執行結果:

圖片圖片

這里如果任務執行的時候發生了異常那么在whenComplete方法中的res 會為空,tx為發生異常的對象。沒有異常時res有執行的機構,tx異常對象為空。

4. 異步任務異常處理

CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(3) ;
    System.out.println(Thread.currentThread().getName() + ", 1 任務執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return "1" ;
}, executor).thenApply(res -> {
  System.out.println("獲取到上一步任務執行結果:" + res) ;
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 2 任務執行完成") ;
    System.out.println(1 / 0) ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return "2" ;
}).exceptionally(tx -> {
  System.out.println(Thread.currentThread().getName() + ", 任務執行發生了異常") ;
  return "error" ;
}).whenComplete((res, tx) -> {
  System.out.println("獲取到結果:" + res) ;
  if (tx != null) {
    System.err.println("發生錯誤了:" + tx.getMessage()) ;
  }
  executor.shutdown();
}) ;
System.out.println("主線程:" + Thread.currentThread().getName()) ;

這里我們人為的制造異常 1 / 0 。

執行結果:

圖片圖片

根據執行結果當發生異常時進入exceptionally方法,最終進入whenComplete方法此時 tx異常對象是發生異常的異常對象。

5. 所有任務完成才算完成任務

CompletableFuture.allOf

CompletableFuture<Double> calc1 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", calc1任務執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 10D ;
}, executor) ;
    
CompletableFuture<Double> calc2 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(5) ;
    System.out.println(Thread.currentThread().getName() + ", calc2任務執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 20D ;
}, executor) ;
// 當任何一個任務發生異常,這里的tx都不會為null
CompletableFuture.allOf(calc1, calc2).whenComplete((res, tx) -> {
  System.out.println("獲取到結果:" + res + ", " + tx) ;
  try {
    System.out.println(calc1.get()) ;
    System.out.println(calc2.get()) ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  } catch (ExecutionException e) {
    e.printStackTrace();
  }
  executor.shutdown();
}) ;

執行結果:

圖片

在這里whenComplete中的res是沒有結果的,要獲取數據我們的分別調用get方法獲取。

6. handle方法對結果處理

CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 1 任務執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return "0" ;
}, executor).handle((res, tx) -> {
  // 處理結果數據
  return res + "1" ;
}).whenComplete((res, tx) -> {
  System.out.println("獲取到結果:" + res) ;
  if (tx != null) {
    System.err.println("發生錯誤了:" + tx.getMessage()) ;
  }
  executor.shutdown();
}) ;

執行結果:

正確

圖片圖片

發生異常時:

圖片圖片

當發生異常時handle方法中的res是沒有值的,tx異常對象為發生異常的異常對象。

7. 合并異步任務

將兩個異步任務完成后合并處理

CompletableFuture.thenCombine

CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 任務1執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 10d ;
}, executor) ;
CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 任務2執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 20d ;
}, executor) ;
task1.thenCombine(task2, (t1, t2) -> {
  System.out.println(Thread.currentThread().getName() + ", 合并任務完成") ;
  return t1 + "," + t2 ;
}).whenComplete((res, tx) -> {
  System.out.println("獲取到結果:" + res) ;
  if (tx != null) {
    System.err.println("發生錯誤了:" + tx.getMessage()) ;
  }
  executor.shutdown();
}) ;

執行結果:

圖片圖片

8. 異步任務誰快誰就進入下一步的執行

CompletableFuture.applyToEither

兩個異步任務誰先執行完誰就繼續執行后續的操作。

CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 任務1執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 10d ;
}, executor) ;
CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 任務2執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 20d ;
}, executor) ;
task1.applyToEither(task2, res -> {
  return res ;
}).whenComplete((res, tx) -> {
  System.out.println("獲取到結果:" + res) ;
  if (tx != null) {
    System.err.println("發生錯誤了:" + tx.getMessage()) ;
  }
  executor.shutdown();
}) ;

執行結果:

圖片圖片

9. 兩個異步任務都執行完了才繼續執行

只有兩個任務都執行完成了后才會繼續。

CompletableFuture.runAfterBoth

CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 任務1執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 10d ;
}, executor) ;
CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 任務2執行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 20d ;
}, executor) ;
task1.runAfterBoth(task2, () -> {
  System.out.println("任務都執行完成了...") ;
}).whenComplete((res, tx) -> {
  System.out.println("獲取到結果:" + res) ;
  if (tx != null) {
    System.err.println("發生錯誤了:" + tx.getMessage()) ;
  }
  executor.shutdown();
}) ;

執行結果:

圖片圖片

10. 等待所有任務執行完成

CompletableFuture.anyOf

CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
  sleep(1000) ;
  System.out.println("我是任務1") ;
  return "Task1" ;
}, executor) ;


CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
  sleep(3000) ;
  System.out.println("我是任務2") ;
  System.out.println(1 / 0) ;
  return "Task2" ;
}, executor) ;
// 任意一個任務執行完成就算完成
// 當任務執行發生異常后,th才不會為null
CompletableFuture.anyOf(task1, task2).whenCompleteAsync((v, th) -> {
  System.out.println("v = " + v) ;
  System.out.println("th = " + th) ;
}, executor) ;

執行結果:

圖片圖片

11. 接收上一個任務的執行結果

CompletableFuture.supplyAsync(() -> {
  sleep(2000) ;
  System.out.println("第一個任務執行完成...") ;
  // System.out.println(1 / 0) ;
  return new Random().nextInt(10000) ;
}, executor).thenAcceptAsync(res -> { // 接收上一個任務的執行結果
  System.out.println("任務執行結果:" + res) ;
}, executor) ;

執行結果:

圖片 圖片

責任編輯:武曉燕 來源: 實戰案例錦集
相關推薦

2020-05-29 07:20:00

Java8異步編程源碼解讀

2024-04-18 08:20:27

Java 8編程工具

2022-05-31 07:32:19

JDK8API工具

2021-02-21 14:35:29

Java 8異步編程

2013-08-06 13:58:27

2021-06-06 16:56:49

異步編程Completable

2017-12-21 15:48:11

JavaCompletable

2022-07-08 14:14:04

并發編程異步編程

2021-03-02 09:34:41

Nodejs14前端代碼

2021-03-04 08:14:37

Java8開發接口

2021-03-02 07:13:54

Java8版本升級

2022-05-25 07:22:07

ES12JavaScript語言

2021-02-22 11:51:15

Java開發代碼

2024-10-09 08:42:03

2025-02-06 16:51:30

2014-07-15 14:48:26

Java8

2024-08-06 09:43:54

Java 8工具編程

2022-12-30 09:24:23

Java8Stream操作

2022-12-09 07:48:10

Java8Stream表達式

2023-04-13 07:33:31

Java 8編程工具
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 中文字幕日本一区二区 | 日韩电影免费在线观看中文字幕 | 秋霞电影一区二区 | 精品三区 | 亚洲iv一区二区三区 | 国产精品久久久久久婷婷天堂 | 农夫在线精品视频免费观看 | 户外露出一区二区三区 | 色综合久久天天综合网 | 欧美日韩久久精品 | 一级二级三级黄色 | 久久久久91 | 91av免费观看 | 在线国产一区二区三区 | 日韩精品久久 | 亚洲免费视频在线观看 | 亚洲一区二区免费电影 | 中文字幕一级毛片 | 在线视频a| 在线观看国产视频 | 视频在线观看一区 | 欧美激情国产精品 | 一级片网站视频 | 青春草91| 日韩免费高清视频 | 国产在线视频在线观看 | 欧美日韩专区 | 亚洲va国产日韩欧美精品色婷婷 | 欧美激情 一区 | 亚洲欧美激情网 | 日韩成人一区二区 | 亚洲国产中文在线 | 国产精品黄色 | 国产精品毛片一区二区三区 | 精品无码久久久久久国产 | 国产精品1区2区3区 一区中文字幕 | 久久久成 | 国产精品久久久久久久久久不蜜臀 | 成人av网站在线观看 | 日本黄色片免费在线观看 | 亚州精品天堂中文字幕 |