成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

詳解 CompletableFuture 實踐

開發
CompletableFuture繼承了CompletionStage接口和Future接口,在原有Future的基礎上增加了異步回調、流式處理以及任務組合,成為JDK8多任務協同場景下一個有效利器。

CompletableFuture繼承了CompletionStage接口和Future接口,在原有Future的基礎上增加了異步回調、流式處理以及任務組合,成為JDK8多任務協同場景下一個有效利器。所以筆者今天就以此文演示一下CompletableFuture基礎實踐案例。

CompletableFuture基本設計

因為本文著重講解CompletableFuture的使用,所以這里我們就簡單的從類的繼承關系了解一下CompletableFuture的基本理念,結合CompletableFuture源碼注釋的說法,它是一個針對異步流程化的工具類,即它支持某個異步Future任務完成之后按照指定編排的順序觸發下一個依賴動作:

A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion. When two or more threads attempt to complete, completeExceptionally, or cancel a CompletableFuture, only one of them succeeds.

舉個例子,例如我現在在瀏覽器上某個網站的商家的產品,我希望瀏覽器能夠做到以下幾點:

  • 我針對性點選擇3個商家。
  • 3個門店在我勾選點擊查看信息后,分別開始查詢各自的產品數據。
  • 3家數據都完成數據加載后,網站歸并這些數據,通過一個網頁渲染給我查看。

通過CompletableFuture,我們就可以完成通過CompletableFuture將上述商家的查詢任務進行異步提交:

對此我們也可以從CompletableFuture的繼承關系了解其設計理念:

  • 它繼承Future接口使之具備阻塞獲取異步回調的能力。
  • 繼承CompletionStage接口,它永遠thenApply等方法,通過這個繼承關系,使CompletableFuture具備異步任務順序編排的能力,即當前異步任務一處理完成就執行thenApply給定的異步邏輯,使得我們可以清晰明了的編排異步任務:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
 //......
}

提交有返回值的異步任務

通過supplyAsync提交我們的異步任務,然后通過get方法等待異步任務完成并獲取返回結果。

public static void main(String[] args) throws Exception {
        //提交一個CompletableFuture任務
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            long start = System.currentTimeMillis();

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("work complete! cost:" +(System.currentTimeMillis() - start)  + " ms");
            return 1;
        });


        System.out.println("main thread working");

        //通過get方法阻塞獲取任務執行結果
        System.out.println("supplyAsync result: " + task.get());

        System.out.println("main thread finish");
    }

輸出結果如下,可以看出CompletableFuture的get方法會阻塞主線程工作,直到得到返回值為止。

main thread working
work complete! cost:1001 ms
supplyAsync result: 1
main thread finish

對此我們不妨來看看get方法是如何做到阻塞主線程并等待異步線程任務執行完成的。從下面這段源碼我們可以看到get方法的執行步驟:

  • 調用reportGet查看異步任務是否將結果賦值給result。
  • 如果不為null直接返回。
  • 若為null則調用waitingGet等待任務返回。
public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }

查看reportGet方法可以看到邏輯也很簡單,如果r為空則直接拋中斷異常,如果r存在異常則直接將異常拋出,如果有結果則將結果返回。

private static <T> T reportGet(Object r)
        throws InterruptedException, ExecutionException {
        //如果結果為null直接拋出終端異常
        if (r == null) // by convention below, null means interrupted
            throw new InterruptedException();
         //如果結果有異常則將異常拋出
        if (r instanceof AltResult) {
            Throwable x, cause;
            if ((x = ((AltResult)r).ex) == null)
                return null;
            if (x instanceof CancellationException)
                throw (CancellationException)x;
            if ((x instanceof CompletionException) &&
                (cause = x.getCause()) != null)
                x = cause;
            throw new ExecutionException(x);
        }
        //如果r正常則直接將結果返回出去
        @SuppressWarnings("unchecked") T t = (T) r;
        return t;
    }

waitingGet源碼相對復雜一些,整體步驟我們可以拆解為while循環內部和while循環外部,我們先來看看while循環內部的執行流程:

  1. while循環從任務中獲取result,如果result為空,則進入循環。
  2. 如果spins小于0,說明剛剛進入循環內部,可以自旋等待一下任務的獲取,設置好spins(spins的值從SPINS來,如果多核的情況下值為256),進入下一次循環。
  3. 進入循環發現spins大于0,則隨機生成一個數,如果這個數大于等于0則--spins,進入下次循環。
  4. 不斷執行步驟3的操作,知道spins等于0。
  5. 此時判斷來到q==null,說明任務自旋等待了一段時間還是沒有結果,我們需要將其掛起,首先將線程封裝成一個Signaller,進入下一次循環。
  6. 循環會判斷if (!queued),將要阻塞的任務放到棧中,進入下一次循環。
  7. 循環下一次會來到if (q.thread != null && result == null),說明q線程不為空且沒有結果,我們需要將其打斷,調用ForkJoinPool.managedBlock(q)將其打斷,直至有結果后才結束循環。

while循環外操作就簡單了,來到循環尾部時,result已經有值了,代碼執行postComplete完成任務,并將結果返回。

private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        //如果result為空則進入循環
        while ((r = result) == null) {
        //如果spins小于0,說明剛剛進入循環內部,可以自旋等待一下任務的獲取,設置好spins(spins的值從SPINS來,如果多核的情況下值為256),自此,第一次循環步驟結束
            if (spins < 0)
                spins = SPINS;

   //這一步的操作是自旋等待任務結果,所以代碼進入循環發現spins大于0,則隨機生成一個數,如果這個數大于等于0則--spins,進入下次循環,直到循環spins變為0
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }
            //此時判斷來到q==null,說明任務自旋等待了一段時間還是沒有結果,我們需要將其掛起,首先將線程封裝成一個Signaller,結束本次循環
            else if (q == null)
                q = new Signaller(interruptible, 0L, 0L);
   //上一步我們將任務封裝成Signaller,這里就將其存入棧中,然后結束循環
            else if (!queued)
                queued = tryPushStack(q);
            else if (interruptible && q.interruptControl < 0) {
                q.thread = null;
                cleanStack();
                return null;
            }
            //循環來到這說明q線程不為空且沒有結果,我們需要將其打斷,調用`ForkJoinPool.managedBlock(q)`將其打斷,直至有結果后才結束循環
            else if (q.thread != null && result == null) {
                try {
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible)
                    r = null; // report interruption
                else
                    Thread.currentThread().interrupt();
            }
        }
        //結束循環,調用postComplete結束任務并返回結果r
        postComplete();
        return r;
    }

提交無返回值的異步任務

通過runAsync提交一個無返回值的異步任務,這里我們為了實現任務執行完成再關閉主線程用了個get阻塞等待任務完成。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> supplyAsync = CompletableFuture.runAsync(() -> {
            long start = System.currentTimeMillis();
            System.out.println(Thread.currentThread().getName() + "開始工作了,執行時間:" + start);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "結束工作了,總執行時間:" + (System.currentTimeMillis() - start));
        });

        System.out.println("主線程開始運行");
        //get阻塞主線程等待任務結束
        supplyAsync.get();
        System.out.println("主線程運行結束");
    }

輸出結果:

主線程開始運行
ForkJoinPool.commonPool-worker-1開始工作了,執行時間:1651251489755
ForkJoinPool.commonPool-worker-1結束工作了,總執行時間:1010
主線程運行結束

將異步任務提交給自己的線程池處理

查看supplyAsync方法的源碼我們發現,我們提交的任務默認情況下會交給asyncPool這個線程池處理。

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

查看asyncPool 我們可以看到如果服務器是多核的情況下返回的是一個commonPool,commonPool默認線程池數為CPU核心數。

private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

所以如果某些情況下我們希望將任務提交到我們自己的線程池中,就建議通過supplyAsync的第二個參數告知CompletableFuture自己要用自定義線程池。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        //使用第二個參數告知CompletableFuture使用的線程池
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            long start = System.currentTimeMillis();
            System.out.println(Thread.currentThread() + "開始工作了,執行時間:" + start);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //打印當前執行任務的線程
            System.out.println(Thread.currentThread() + "結束工作了,總執行時間:" + (System.currentTimeMillis() - start));
            return 1;
        }, executorService);

        System.out.println("主線程開始運行");
        System.out.println("輸出結果 " + supplyAsync.get());
        System.out.println("主線程運行結束");

        executorService.shutdown();
        while (executorService.isTerminated()) {

        }
    }

從輸出結果也可以看出這里使用的線程池是我們自定義的線程池:

主線程開始運行
Thread[pool-1-thread-1,5,main]開始工作了,執行時間:1651251851358
Thread[pool-1-thread-1,5,main]結束工作了,總執行時間:2005
輸出結果 1
主線程運行結束

thenApply和thenApplyAsync

thenApply 適用那些需要順序執行的異步任務,例如我們希望將第一個任務的返回值交給第二個異步任務,就可以使用thenApply將兩個任務組合起來。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + "開始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread() + "結束工作了");
            return 100;
        }, executorService);

        //將兩個任務組合起來
        CompletableFuture<String> task2 = task1.thenApply((data) -> {
            System.out.println("第二個線程:" + Thread.currentThread() + "開始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "第一個線程的結果為 " + data;
        });



        System.out.println("獲取組合任務結果");
        System.out.println("組合任務處理結果為: " + task2.get());
        System.out.println("獲取組合任務結果結束");

        executorService.shutdown();
        while (executorService.isTerminated()) {

        }
    }

輸出結果可以看到,任務1執行完成后任務2接著執行了。

Thread[pool-1-thread-1,5,main]開始工作了
獲取組合任務結果
Thread[pool-1-thread-1,5,main]結束工作了
第二個線程:Thread[pool-1-thread-1,5,main]開始工作了
組合任務處理結果為: 第一個線程的結果為 100
獲取組合任務結果結束

thenApplyAsync與thenApply不同的是,在第一個異步任務有指定線程池的情況下,第二個異步任務會被提交到其他線程池中,所以這里我們可以說明一個規律,帶有Async關鍵字的方法支持組合任務時,將任務提交到不同的線程池中。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread()+"開始工作了");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread()+"結束工作了");
            return 100;
        },executorService);

        CompletableFuture<String> task2 = task1.thenApplyAsync((data) -> {
            System.out.println("第二個線程:" + Thread.currentThread() + "開始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "第一個線程的結果為 " + data;
        });



        System.out.println("獲取任務結果開始");
        System.out.println("任務的結果 "+task2.get());
        System.out.println("獲取任務結果結束");

        executorService.shutdown();
        while (executorService.isTerminated()){

        }
    }

輸出結果:

Thread[pool-1-thread-1,5,main]開始工作了
獲取任務結果開始
Thread[pool-1-thread-1,5,main]結束工作了
第二個線程:Thread[ForkJoinPool.commonPool-worker-9,5,main]開始工作了
任務的結果 第一個線程的結果為 100
獲取任務結果結束

thenAccept和thenRun

thenAccept和thenRun都會在上一個任務執行結束后才會繼續執行。兩者唯一區別時:

  • thenAccept在上一個任務執行結束后,將上一個任務返回結果作為入參,但無返回值。

  • thenRun會在上一個任務執行結束后才開始處理,既沒有入參也沒有返回值。

以下便是筆者的使用示例:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);


        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("task線程:" + Thread.currentThread().getName() + "開始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task線程:" + Thread.currentThread().getName() + "結束工作了");
            return 200;
        }, executorService);

        CompletableFuture<Integer> task2 = task.thenApply((data) -> {
            System.out.println("task2線程:" + Thread.currentThread().getName() + "開始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task2線程:" + Thread.currentThread().getName() + "執行結束");
            return data;
        });

        //thenAccept 收上一個任務的入參,但無返回值
        CompletableFuture<Void> task3 = task2.thenAccept((data) -> {
            System.out.println("task3線程:" + Thread.currentThread().getName() + ",該任務接收上一個任務的結果,但無返回值,收到上一個任務的結果值為 " + data);
        });

        //thenRun在上一個任務結束后執行,既無入參也無出參
        CompletableFuture<Void> task4 = task3.thenRun(() -> {
            System.out.println("task4在上一個任務結束后繼續執行,無入參,也無返回值");
        });


        System.out.println("嘗試獲取最終執行結果");
        task4.get();
        System.out.println("執行任務直至task4 ");
        System.out.println("任務全部執行結束");

        executorService.shutdown();
        while (executorService.isTerminated()) {

        }
    }

輸出結果:

task線程:pool-1-thread-1開始工作了
嘗試獲取最終執行結果
task線程:pool-1-thread-1結束工作了
task2線程:pool-1-thread-1開始工作了
task2線程:pool-1-thread-1執行結束
task3線程:pool-1-thread-1,該任務接收上一個任務的結果,但無返回值,收到上一個任務的結果值為 200
task4在上一個任務結束后繼續執行,無入參,也無返回值
執行任務直至task4 
任務全部執行結束

exceptionally

假如我們的任務1執行過程中可能報錯,我們希望能夠從邏輯的角度處理掉,那么我們就可以在任務1后面接一個exceptionally方法,然后再接上任務2。這樣一來,任務1執行報錯就會走到exceptionally,反之就會走到任務2的代碼段:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task1 開始工作了");
            //隨機生成被除數,為0會拋出算術異常
            int num = RandomUtil.randomInt(0, 2);
            int result = 10 / num;
            System.out.println("task1 結束工作");
            return 200;
        });

        //假如task1報錯,任務會走到這個任務上
        CompletableFuture<Integer> exceptionally = task1.exceptionally((e) -> {
            System.out.println("上一個任務報錯了,錯誤信息" + e.getMessage());
            return -1;
        });

        CompletableFuture task2 = task1.thenAccept((param) -> {
            System.out.println("走到正常的結束分支了,task1執行結果:" + param);
        });

        System.out.println("主線程開始運行");
//        調用錯誤捕獲的任務執行結束也會自動走到正常結束的分支
        System.out.println("輸出結果 " + exceptionally.get());
        System.out.println("主線程運行結束");
    }

執行正常的輸出結果:

task1 開始工作了
主線程開始運行
task1 結束工作
走到正常的結束分支了:200
輸出結果 200
主線程運行結束

執行異常的輸出結果:

task1 開始工作了
主線程開始運行
上一個任務報錯了,錯誤信息java.lang.ArithmeticException: / by zero
輸出結果 -1
主線程運行結束

whenComplete

對于上面的例子,我們完全可以用whenComplete來簡化,whenComplete會接收兩個入參:

  • 入參1為上一個任務的返回值。
  • 入參2比較特殊,如果上一個任務拋出異常,則第2個入參不為空。

所以上一個例子的代碼我們可以簡化成這樣,需要注意的是whenComplete返回結果是上一個任務的執行結果,我們無法返回任務2的執行結果。

public static void main(String[] args) {

        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("任務1開始工作");
            int num = RandomUtil.randomInt(0, 2);
            int result = 10 / num;
            System.out.println("任務1執行結束,執行結果:" + result);
            return result;
        });

        CompletableFuture<Integer> task2 = task.whenComplete((result, err) -> {
            System.out.println("任務2開始工作");

            if (err != null) {
                System.out.println("任務1執行報錯,報錯原因:" + err.getMessage());
                return;
            }

            System.out.println("任務1正常結束,執行結果:" + result);

        });


        try {
            System.out.println("task2拿到最終執行結果 " + task2.get());
        } catch (Exception e) {

        }
        System.out.println("全流程結束");


    }

錯誤的輸出結果:

任務1開始工作
任務2開始工作
任務1執行報錯,報錯原因:java.lang.ArithmeticException: / by zero
全流程結束

正確執行的輸出結果:

任務1開始工作
任務1執行結束,執行結果:10
任務2開始工作
任務1正常結束,執行結果:10
task2拿到最終執行結果 10
全流程結束

handle

handle使用和whenComplete差不多,唯一的區別就是whenComplete返回的是上一個任務的結果,而handle可以返回自己的結果。

代碼如下所示:

public static void execute1() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + "開始工作了");
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            Random random = new java.util.Random();
            int num = random.nextInt(10);
            if (num < 5) {
                throw new RuntimeException("報錯了 num:" + num);
            }
            System.out.println(Thread.currentThread() + "結束工作了");
            return num;
        });

        CompletableFuture<String> future2 = future.handle((result, err) -> {
            System.out.println("第二個線程:" + Thread.currentThread() + "開始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if (err != null) {
                System.out.println(err.getMessage());
                ;return "fail";
            }
            return "sucdess";
        });


        System.out.println("拿第1個任務的結果");
        System.out.println("第1個任務的結果 " + future2.get());
        System.out.println("第1個任務結果結束");



        /**
         * 輸出結果
         * Thread[pool-1-thread-1,5,main]開始工作了
         * 拿第一個任務的結果
         * Thread[pool-1-thread-1,5,main]結束工作了
         * 第二個線程:Thread[pool-1-thread-1,5,main]開始工作了
         * 100
         * 第一個任務結果結束
         * 拿第2個任務的結果
         * 第二個任務的結果 第一個線程的結果為 100
         * 第2個任務結果結束
         */

    }

thenCombine / thenAcceptBoth / runAfterBoth

這幾個方法都是將兩個任務組合起來執行的,只有兩個任務都順利完成了,才會執行之后的方法,唯一的區別是:

(1) thenCombine 接收兩個任務的返回值,并返回自己的返回值。

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task開始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task結束工作");
            return num;
        });


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task2開始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task2結束工作");
            return num;
        });

        //通過thenCombine將兩個任務組合起來
        CompletableFuture<Integer> completableFuture = task1.thenCombine(task2, (result1, result2) -> {
            System.out.println("task1返回結果:" + result1 + "  task2返回結果:" + result2);
            return result1 + result2;
        });


        System.out.println(completableFuture.get());


    }

輸出結果如下:

task開始工作
task2開始工作
task結束工作
task2結束工作
task1返回結果:30  task2返回結果:1
31

(2) thenAcceptBoth 接收兩個參數返回值,但沒有返回值。

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task開始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task結束工作");
            return num;
        });


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task2開始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task2結束工作");
            return num;
        });

        //通過 thenAcceptBoth 將兩個任務組合起來,獲取前兩個任務處理結果,但自己不返回結果
        CompletableFuture<Void> completableFuture = task1.thenAcceptBoth(task2, (result1, result2) -> {
            System.out.println("task1返回結果:" + result1 + "  task2返回結果:" + result2);

        });


        completableFuture.get();


    }

輸出結果:

task開始工作
task2開始工作
task結束工作
task2結束工作
task1返回結果:66  task2返回結果:10

(3) runAfterBoth 既不能接收入參,也無返回值,待前兩個任務執行完成后才能執行。

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task開始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task結束工作");
            return num;
        });


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task2開始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task2結束工作");
            return num;
        });

        //通過 runAfterBoth 將兩個任務組合起來,待前兩個組合任務完成后執行,無入參、無出參
        CompletableFuture<Void> completableFuture = task1.runAfterBoth(task2,()-> {
            System.out.println("task1、task2處理完成" );

        });


        completableFuture.get();


    }

輸出結果:

task開始工作
task2開始工作
task結束工作
task2結束工作
task1、task2處理完成

applyToEither / acceptEither / runAfterEither

這種組合模式只要有一個異步任務成功,就會觸發后續的方法,比如我們組合任務1和任務2,如果任務1執行完成就直接執行任務3,無視任務2。反之任務2先完成直接執行任務3,無視任務1。

和上一個組合模式一樣,依次規律也是:

(1) 接收入參,含返回值。

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> 1);


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 2);

        CompletableFuture<String> completableFuture = task.applyToEither(task2, (result) -> {
            if (result == 1) {
                System.out.println("task1先完成任務");
                return "task1";
            }
            System.out.println("task2先完成任務");
            return "task2";
        });


        System.out.println("最先完成任務的是:" + completableFuture.get());


    }

(2) 接收入參,無返回值。

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> 1);


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 2);

        CompletableFuture<Void> completableFuture = task.acceptEither(task2, (result) -> {
            System.out.println("result:" + result);
            if (result == 1) {
                System.out.println("task1先完成任務");
                return;
            }
            System.out.println("task2先完成任務");
        });


        completableFuture.get();


    }

(3) 無入參,無返回值。

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("task1開始工作");
            try {
                TimeUnit.SECONDS.sleep(RandomUtil.randomInt(0,2));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task1結束工作");
            return 1;
        });


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync( () -> {
            System.out.println("task2 開始工作");
            try {
                TimeUnit.SECONDS.sleep(RandomUtil.randomInt(0,2));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task2 結束工作");
            return 2;
        });

        CompletableFuture<Void> completableFuture = task.runAfterEither(task2, () -> {
            System.out.println("有一個任務完成了");
        });


        completableFuture.get();


    }

輸出結果:

task1開始工作
task2 開始工作
task1結束工作
有一個任務完成了

thenCompose

thenCompose方法會在某個任務執行完成后,將該任務的執行結果作為方法入參然后執行指定的方法,該方法會返回一個新的CompletableFuture實例,例如我們希望任務1執行完成后執行任務2,任務2執行完成后返回執行任務3,最終結果是從任務3中獲取。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 創建異步執行任務:
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
            System.out.println("task1開始工作");
            int num=RandomUtil.randomInt(0,5);
            try {
                TimeUnit.SECONDS.sleep(num);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task1結束工作,處理結果:"+num);
            return num;
        });


        CompletableFuture<String> task2= task1.thenCompose((r)->{

            System.out.println("task2 開始工作");
            int num=RandomUtil.randomInt(0,5);
            try {
                TimeUnit.SECONDS.sleep(num);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task2 結束工作");


            return CompletableFuture.supplyAsync(()->{
                System.out.println("task3 開始工作,收到任務1的執行結果:"+r);
                return "task3 finished";
            });
        });

        System.out.println("執行結果->"+task2.get());


    }

輸出結果:

task1開始工作
task1結束工作,處理結果:1
task2 開始工作
task2 結束工作
task3 開始工作,收到任務1的執行結果:1
執行結果->task3 finished

allOf / anyOf

allOf返回的CompletableFuture是所有任務都執行完成后才會執行,只要有一個任務執行異常,則返回的CompletableFuture執行get方法時會拋出異常。

public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            // 模擬異步任務1
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            // 模擬異步任務2
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "World";
        });

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);

        allFutures.thenRun(() -> {
            // 所有異步任務完成后打印它們的結果
            String result1 = future1.join();
            String result2 = future2.join();
            System.out.println(result1 + " " + result2);
        });

        // 等待所有異步任務完成
        allFutures.join();
    }

輸出結果:

Hello World

而anyOf則是只要有一個任務完成就可以觸發后續方法,并且可以返回先完成任務的返回值,這一點和上述applyToEither 例子差不多。

public class Main {

    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            // 模擬異步任務1
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            // 模擬異步任務2
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "World";
        });

        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);

        anyFuture.thenAccept(result -> {
            // 任何一個異步任務完成后打印它的結果
            System.out.println(result);
        });

        // 等待任何一個異步任務完成
        anyFuture.join();
    }
}
責任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關推薦

2017-12-21 15:48:11

JavaCompletable

2021-06-06 16:56:49

異步編程Completable

2024-08-30 09:53:17

Java 8編程集成

2024-03-06 08:13:33

FutureJDKCallable

2022-05-13 12:34:16

美團開發實踐

2024-01-11 12:14:31

Async線程池任務

2021-08-30 19:00:46

靜態CompletableCountDownLa

2012-11-19 10:35:18

阿里云云計算

2024-12-26 12:59:39

2020-03-17 09:21:20

MariaDBSpider存儲

2024-05-21 09:55:43

AspectOrientedAOP

2024-11-21 14:42:31

2024-08-06 09:43:54

Java 8工具編程

2024-10-28 13:31:33

性能@Async應用

2021-09-27 13:01:52

線程阻塞排查

2021-03-16 15:12:57

CompletableFuture機制java

2015-06-16 11:06:42

JavaCompletable

2021-02-21 14:35:29

Java 8異步編程

2019-12-26 15:49:14

微服務架構業務

2023-03-30 07:52:03

Golang接口
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产一区二区免费 | 久久国产精品久久久久久 | 亚洲精品中文字幕 | 亚洲精品中文字幕 | 欧美日韩不卡合集视频 | 久久久久黄 | 一区二区三区精品在线 | 国产成人99久久亚洲综合精品 | 91国产视频在线 | 亚洲福利在线观看 | av免费在线观看网站 | 亚洲精品电影网在线观看 | 日韩av免费在线电影 | 欧美精品tv| 欧美久久久久久久久中文字幕 | 精品国产一区二区在线 | 成人免费网视频 | 久久av.com | 色在线看| 国产一级在线 | 国产成人精品一区二 | www成人免费| 精品日韩欧美一区二区 | 91精品国产欧美一区二区 | 中文字幕视频在线 | 日韩成人免费 | 午夜精品一区二区三区在线 | 精品无码久久久久久久动漫 | 成人做爰www免费看视频网站 | 欧美一区二区激情三区 | 久久精品欧美视频 | 亚洲美女在线一区 | 男人的天堂中文字幕 | 国产精品久久久久久久久久久久 | 久久久www成人免费无遮挡大片 | 有码在线| 色婷婷一区二区三区四区 | 免费成人高清在线视频 | www.啪啪.com| 久国产视频 | 国产电影一区二区 |