成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

新手也能看懂的 SpringBoot 異步編程指南

開發 前端
異步編程在處理耗時操作以及多任務處理的場景下非常有用,我們可以更好的讓我們的系統利用好機器的 CPU 和 內存,提高它們的利用率。多線程設計模式有很多種,Future模式是多線程開發中非常常見的一種設計模式,本文也是基于這種模式來說明 SpringBoot 對于異步編程的知識。

 通過本文你可以了解到下面這些知識點:

  1. Future 模式介紹以及核心思想
  2. 核心線程數、最大線程數的區別,隊列容量代表什么;
  3. ThreadPoolTaskExecutor 飽和策略;
  4. SpringBoot 異步編程實戰,搞懂代碼的執行邏輯。
  5. [[278689]]

Future 模式

異步編程在處理耗時操作以及多任務處理的場景下非常有用,我們可以更好的讓我們的系統利用好機器的 CPU 和 內存,提高它們的利用率。多線程設計模式有很多種,Future模式是多線程開發中非常常見的一種設計模式,本文也是基于這種模式來說明 SpringBoot 對于異步編程的知識。

實戰之前我先簡單介紹一下 Future 模式的核心思想 吧!。

Future 模式的核心思想是 異步調用 。當我們執行一個方法時,假如這個方法中有多個耗時的任務需要同時去做,而且又不著急等待這個結果時可以讓客戶端立即返回然后,后臺慢慢去計算任務。當然你也可以選擇等這些任務都執行完了,再返回給客戶端。這個在 Java 中都有很好的支持,我在后面的示例程序中會詳細對比這兩種方式的區別。

SpringBoot 異步編程實戰

如果我們需要在 SpringBoot 實現異步編程的話,通過 Spring 提供的兩個注解會讓這件事情變的非常簡單。

  • @EnableAsync:通過在配置類或者Main類上加@EnableAsync開啟對異步方法的支持。
  • @Async 可以作用在類上或者方法上,作用在類上代表這個類的所有方法都是異步方法。

1. 自定義 TaskExecutor

很多人對于 TaskExecutor 不是太了解,所以我們花一點篇幅先介紹一下這個東西。從名字就能看出它是任務的執行者,它領導執行著線程來處理任務,就像司令官一樣,而我們的線程就好比一只只軍隊一樣,這些軍隊可以異步對敵人進行打擊👊。

Spring 提供了TaskExecutor接口作為任務執行者的抽象,它和java.util.concurrent包下的Executor接口很像。稍微不同的 TaskExecutor接口用到了 Java 8 的語法@FunctionalInterface聲明這個接口口是一個函數式接口。

  1. org.springframework.core.task.TaskExecutor 
  2. @FunctionalInterface 
  3. public interface TaskExecutor extends Executor { 
  4.     void execute(Runnable var1); 

 

如果沒有自定義Executor, Spring 將創建一個 SimpleAsyncTaskExecutor 并使用它。

  1. import org.springframework.context.annotation.Bean; 
  2. import org.springframework.context.annotation.Configuration; 
  3. import org.springframework.scheduling.annotation.AsyncConfigurer; 
  4. import org.springframework.scheduling.annotation.EnableAsync; 
  5. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 
  6.  
  7. import java.util.concurrent.Executor; 
  8.  
  9. /** @author shuang.kou */ 
  10. @Configuration 
  11. @EnableAsync 
  12. public class AsyncConfig implements AsyncConfigurer { 
  13.  
  14.   private static final int CORE_POOL_SIZE = 6; 
  15.   private static final int MAX_POOL_SIZE = 10; 
  16.   private static final int QUEUE_CAPACITY = 100; 
  17.  
  18.   @Bean 
  19.   public Executor taskExecutor() { 
  20.     // Spring 默認配置是核心線程數大小為1,最大線程容量大小不受限制,隊列容量也不受限制。 
  21.     ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
  22.     // 核心線程數 
  23.     executor.setCorePoolSize(CORE_POOL_SIZE); 
  24.     // 最大線程數 
  25.     executor.setMaxPoolSize(MAX_POOL_SIZE); 
  26.     // 隊列大小 
  27.     executor.setQueueCapacity(QUEUE_CAPACITY); 
  28.     // 當最大池已滿時,此策略保證不會丟失任務請求,但是可能會影響應用程序整體性能。 
  29.     executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 
  30.     executor.setThreadNamePrefix("My ThreadPoolTaskExecutor-"); 
  31.     executor.initialize(); 
  32.     return executor; 
  33.   } 

ThreadPoolTaskExecutor 常見概念:

  • Core Pool Size : 核心線程數線程數定義了最小可以同時運行的線程數量。
  • Queue Capacity : 當新任務來的時候會先判斷當前運行的線程數量是否達到核心線程數,如果達到的話,信任就會被存放在隊列中。
  • Maximum Pool Size : 當隊列中存放的任務達到隊列容量的時候,當前可以同時運行的線程數量變為最大線程數。

一般情況下不會將隊列大小設為:Integer.MAX_VALUE,也不會將核心線程數和最大線程數設為同樣的大小,這樣的話最大線程數的設置都沒什么意義了,你也無法確定當前 CPU 和內存利用率具體情況如何。

如果隊列已滿并且當前同時運行的線程數達到最大線程數的時候,如果再有新任務過來會發生什么呢?

Spring 默認使用的是 ThreadPoolExecutor.AbortPolicy。在Spring的默認情況下,ThreadPoolExecutor 將拋出 RejectedExecutionException 來拒絕新來的任務 ,這代表你將丟失對這個任務的處理。對于可伸縮的應用程序,建議使用 ThreadPoolExecutor.CallerRunsPolicy。當最大池被填滿時,此策略為我們提供可伸縮隊列。

ThreadPoolTaskExecutor 飽和策略定義:

如果當前同時運行的線程數量達到最大線程數量時,ThreadPoolTaskExecutor 定義一些策略:

ThreadPoolExecutor.AbortPolicy:拋出 RejectedExecutionException來拒絕新任務的處理。

ThreadPoolExecutor.CallerRunsPolicy:調用執行自己的線程運行任務。您不會任務請求。但是這種策略會降低對于新任務提交速度,影響程序的整體性能。另外,這個策略喜歡增加隊列容量。如果您的應用程序可以承受此延遲并且你不能任務丟棄任何一個任務請求的話,你可以選擇這個策略。

ThreadPoolExecutor.DiscardPolicy: 不處理新任務,直接丟棄掉。

ThreadPoolExecutor.DiscardOldestPolicy:此策略將丟棄最早的未處理的任務請求。

2. 編寫一個異步的方法

下面模擬一個查找對應字符開頭電影的方法,我們給這個方法加上了@Async注解來告訴 Spring 它是一個異步的方法。另外,這個方法的返回值 CompletableFuture.completedFuture(results)這代表我們需要返回結果,也就是說程序必須把任務執行完成之后再返回給用戶。

請留意completableFutureTask方法中的第一行打印日志這句代碼,后面分析程序中會用到,很重要!

  1. import org.slf4j.Logger; 
  2. import org.slf4j.LoggerFactory; 
  3. import org.springframework.scheduling.annotation.Async; 
  4. import org.springframework.stereotype.Service; 
  5.  
  6. import java.util.ArrayList; 
  7. import java.util.Arrays; 
  8. import java.util.List; 
  9. import java.util.concurrent.CompletableFuture; 
  10. import java.util.stream.Collectors; 
  11.  
  12. /** @author shuang.kou */ 
  13. @Service 
  14. public class AsyncService { 
  15.  
  16.   private static final Logger logger = LoggerFactory.getLogger(AsyncService.class); 
  17.  
  18.   private List<String> movies = 
  19.       new ArrayList<>( 
  20.           Arrays.asList( 
  21.               "Forrest Gump"
  22.               "Titanic"
  23.               "Spirited Away"
  24.               "The Shawshank Redemption"
  25.               "Zootopia"
  26.               "Farewell "
  27.               "Joker"
  28.               "Crawl")); 
  29.  
  30.   /** 示范使用:找到特定字符/字符串開頭的電影 */ 
  31.   @Async 
  32.   public CompletableFuture<List<String>> completableFutureTask(String start) { 
  33.     // 打印日志 
  34.     logger.warn(Thread.currentThread().getName() + "start this task!"); 
  35.     // 找到特定字符/字符串開頭的電影 
  36.     List<String> results = 
  37.         movies.stream().filter(movie -> movie.startsWith(start)).collect(Collectors.toList()); 
  38.     // 模擬這是一個耗時的任務 
  39.     try { 
  40.       Thread.sleep(1000L); 
  41.     } catch (InterruptedException e) { 
  42.       e.printStackTrace(); 
  43.     } 
  44.     //返回一個已經用給定值完成的新的CompletableFuture。 
  45.     return CompletableFuture.completedFuture(results); 
  46.   } 

3. 測試編寫的異步方法

  1. /** @author shuang.kou */ 
  2. @RestController 
  3. @RequestMapping("/async"
  4. public class AsyncController { 
  5.   @Autowired  
  6.   AsyncService asyncService; 
  7.  
  8.   @GetMapping("/movies"
  9.   public String completableFutureTask() throws ExecutionException, InterruptedException { 
  10.     //開始時間 
  11.     long start = System.currentTimeMillis(); 
  12.     // 開始執行大量的異步任務 
  13.     List<String> words = Arrays.asList("F""T""S""Z""J""C"); 
  14.     List<CompletableFuture<List<String>>> completableFutureList = 
  15.         words.stream() 
  16.             .map(word -> asyncService.completableFutureTask(word)) 
  17.             .collect(Collectors.toList()); 
  18.     // CompletableFuture.join()方法可以獲取他們的結果并將結果連接起來 
  19.     List<List<String>> results = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList()); 
  20.     // 打印結果以及運行程序運行花費時間 
  21.     System.out.println("Elapsed time: " + (System.currentTimeMillis() - start)); 
  22.     return results.toString(); 
  23.   } 

請求這個接口,控制臺打印出下面的內容:

  1. 2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-1] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-1start this task! 
  2. 2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-6] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-6start this task! 
  3. 2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-5] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-5start this task! 
  4. 2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-4] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-4start this task! 
  5. 2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-3] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-3start this task! 
  6. 2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-2] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-2start this task! 
  7. Elapsed time: 1010 

首先我們可以看到處理所有任務花費的時間大概是 1 s。這與我們自定義的 ThreadPoolTaskExecutor 有關,我們配置的核心線程數是 6 ,然后通過通過下面的代碼模擬分配了 6 個任務給系統執行。這樣每個線程都會被分配到一個任務,每個任務執行花費時間是 1 s ,所以處理 6 個任務的總花費時間是 1 s。

  1. List<String> words = Arrays.asList("F""T""S""Z""J""C"); 
  2. List<CompletableFuture<List<String>>> completableFutureList = 
  3.         words.stream() 
  4.             .map(word -> asyncService.completableFutureTask(word)) 
  5.             .collect(Collectors.toList()); 

你可以自己驗證一下,試著去把核心線程數的數量改為 3 ,再次請求這個接口你會發現處理所有任務花費的時間大概是 2 s。

另外,從上面的運行結果可以看出,當所有任務執行完成之后才返回結果。這種情況對應于我們需要返回結果給客戶端請求的情況下,假如我們不需要返回任務執行結果給客戶端的話呢? 就比如我們上傳一個大文件到系統,上傳之后只要大文件格式符合要求我們就上傳成功。普通情況下我們需要等待文件上傳完畢再返回給用戶消息,但是這樣會很慢。采用異步的話,當用戶上傳之后就立馬返回給用戶消息,然后系統再默默去處理上傳任務。這樣也會增加一點麻煩,因為文件可能會上傳失敗,所以系統也需要一點機制來補償這個問題,比如當上傳遇到問題的時候,發消息通知用戶。

下面會演示一下客戶端不需要返回結果的情況:

將completableFutureTask方法變為 void 類型

  1. @Async 
  2. public void completableFutureTask(String start) { 
  3.   ...... 
  4.   //這里可能是系統對任務執行結果的處理,比如存入到數據庫等等...... 
  5.   //doSomeThingWithResults(results); 

Controller 代碼修改如下:

  1. @GetMapping("/movies"
  2.  public String completableFutureTask() throws ExecutionException, InterruptedException { 
  3.    // Start the clock 
  4.    long start = System.currentTimeMillis(); 
  5.    // Kick of multiple, asynchronous lookups 
  6.    List<String> words = Arrays.asList("F""T""S""Z""J""C"); 
  7.        words.stream() 
  8.            .forEach(word -> asyncService.completableFutureTask(word)); 
  9.    // Wait until they are all done 
  10.    // Print results, including elapsed time 
  11.    System.out.println("Elapsed time: " + (System.currentTimeMillis() - start)); 
  12.    return "Done"
  13.  } 

請求這個接口,控制臺打印出下面的內容:

  1. Elapsed time: 0 
  2. 2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-4] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-4start this task! 
  3. 2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-3] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-3start this task! 
  4. 2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-2] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-2start this task! 
  5. 2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-1] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-1start this task! 
  6. 2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-6] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-6start this task! 
  7. 2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-5] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-5start this task! 

可以看到系統會直接返回給用戶結果,然后系統才真正開始執行任務。

待辦

  • Future vs. CompletableFuture
  • 源代碼分析

Reference

  • https://spring.io/guides/gs/async-method/
  • https://medium.com/trendyol-tech/spring-boot-async-executor-management-with-threadpooltaskexecutor-f493903617d

 

 

責任編輯:武曉燕 來源: JavaGuide
相關推薦

2018-12-24 08:46:52

Kubernetes對象模型

2019-11-18 10:38:03

線程池Java框架

2018-05-16 10:07:02

監控報警系統

2017-02-22 15:04:52

2022-07-04 08:31:42

GitOpsGit基礎設施

2020-02-15 17:16:05

Kubernetes容器

2019-03-26 11:15:34

AI機器學習人工智能

2013-09-22 10:34:08

碼農機器學習算法

2019-09-05 14:21:22

JavaNIOBIO

2024-11-01 05:10:00

2017-11-02 12:08:56

2021-11-01 15:15:37

Context項目代碼

2020-11-16 16:38:30

人工智能AI

2025-02-17 10:09:54

2025-02-17 13:00:00

ChatGPT大模型AI

2018-03-06 10:38:23

云計算大數據人工智能

2013-03-15 10:35:17

編程語言編程笑話

2024-01-19 13:39:00

死鎖框架排查

2020-01-21 10:16:15

Kubernetes教程容器

2020-12-01 09:03:22

分庫分表MySQL
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲精品2 | 国产精品视频久久 | caoporon| 天天夜干| 国产网站在线播放 | 亚洲欧美精品久久 | 你懂的在线视频播放 | 国产女人与拘做受视频 | 久久精品视频在线观看 | 91亚洲欧美 | 91久久夜色精品国产网站 | 国产精品视频网 | 国产三区视频在线观看 | 国产999精品久久久久久 | 午夜精品在线 | 成人久久一区 | 一级毛毛片 | 色视频网站在线观看 | www.色五月.com| 欧美中文字幕一区二区三区亚洲 | 日韩亚洲视频在线 | 日批免费看 | 色精品 | 美女视频网站久久 | 色婷婷在线视频 | 久久久成人精品 | 九色av| 欧美激情在线一区二区三区 | 精品久久香蕉国产线看观看亚洲 | 久久精品中文字幕 | 在线免费观看日本 | av在线黄 | 综合网在线 | 久久久精 | 激情自拍偷拍 | 中文字幕 亚洲一区 | 亚洲国产成人av好男人在线观看 | 噜噜噜噜狠狠狠7777视频 | 亚洲视频在线播放 | 精品一区二区三区不卡 | 色站综合 |