聊聊 Spring 異步任務教程
阿粉最近碰到一個場景,用戶注冊之后需要發送郵件給其郵箱。原先設計中,這是一個同步過程,注冊方法需要等待郵件發送成功才能返回。
由于郵件發送流程對于注冊來說并不是一個關鍵節點,我們可以將郵件發送異步執行,減少注冊方法執行時間。
我們可以自己創建線程池,然后執行異步任務,示例代碼如下:
- // 生產使用線程池的最佳實踐,一定要自定義線程池,不要嫌麻煩,使用 Executors 創建線程池
- private ThreadPoolExecutor threadPool =
- new ThreadPoolExecutor(5,
- 10,
- 60l,
- TimeUnit.SECONDS,
- new LinkedBlockingDeque<>(200),
- new ThreadFactoryBuilder().setNameFormat("register-%d").build());
- /**
- * 使用線程池執行發送郵件的任務
- */
- private void sendEmailByThreadPool() {
- threadPool.submit(() -> emailService.sendEmail());
- }
ps: 生產使用線程池的最佳實踐,一定要自定義線程池,根據業務場景設置合理的線程池參數,另外給線程設置具有明確意義的前綴,這樣排查問題就非常簡單。
千萬不要為了方便,使用 Executors 相關方法創建線程池。
上面代碼中使用線程池完成了發送郵件的異步任務,可以看到這個示例還是有點麻煩,我們不僅要自定義線程池,還需要在創建相關任務執行類。
Spring 提供執行異步任務功能,我們使用一個注解就可以輕松完成上面的功能。
今天阿粉就來講解一下如何使用 Spring 異步任務,以及 Spring 異步任務使用過程中一些注意點。
異步任務使用方式
Spring 異步任務需要在相關的方法上設置 @Async 注解,這里為了舉例,我們創建一個 EmailService 類,專用完成郵件服務。
代碼如下所示:
- @Slf4j
- @Service
- public class EmailService {
- /**
- * 異步發送任務
- *
- * @throws InterruptedException
- */
- @SneakyThrows
- @Async
- public void sendEmailAsync() {
- log.info("使用 Spring 異步任務發送郵件示例");
- // 模擬郵件發送耗時
- TimeUnit.SECONDS.sleep(2l);
- }
- }
這里要注意了,Spring 異步任務默認關閉的,我們需要使用 @EnableAsync開啟異步任務。
如果還在使用 Spring XML 配置,我們需要配置如下配置:
- <task:annotation-driven/>
上述配置完成之后,我們只需要在調用方,比如上一層 Controller 注入這個 EmailService ,然后直接調用這個方法,該方法將會在異步線程中執行。
- @Slf4j
- @RestController
- public class RegisterController {
- @Autowired
- EmailService emailService;
- @RequestMapping("register")
- public String register() {
- log.info("注冊流程開始");
- emailService.sendEmailAsync();
- return "success";
- }
- }
輸出日志如下:
從日志上可以看到,兩個方法執行線程不一樣,這就說明了EmailService#sendEmailAsync 被異步線程成功執行。
帶有返回值的異步任務
上面的異步任務比較簡單,但是有時我們有需要獲取異步任務返回值。
如果使用線程池執行異步任務,我們可以使用 threadPool#submit 獲取返回對象 Future,接著我們就可以調用其內 get 方法,獲取返回結果。
在 Spring 異步任務中,我們也可以使用 Future 獲取返回結果,示例代碼如下:
- @Async
- @SneakyThrows
- public Future<String> sendEmailAsyncWithResult() {
- log.info("使用 Spring 異步任務發送郵件,并且獲取任務返回結果示例");
- TimeUnit.SECONDS.sleep(2l);
- return AsyncResult.forValue("success");
- }
這里需要注意,這里返回對象我們需要使用 Spring 內部類 AsyncResult。
Controller 層調用代碼如下所示:
- private void sendEmailWithResult() {
- Future<String> future = emailService.sendEmailAsyncWithResult();
- try {
- String result = future.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- }
- }
我們知道 Future#get 方法將會一直阻塞,直到異步任務執行成功。
有時候我們獲取異步任務的返回值是為了做一下后續業務,但是主流程方法是無需返回異步任務的返回值。如果我們使用了 Future#get方法,主流程就會一直被阻塞。
對于這種場景,我們可以使用 org.springframework.util.concurrent.ListenableFuture稍微改造一下上面的方法。
ListenableFuture 這個類允許我們注冊回調函數,一旦異步任務執行成功,或者執行異常,將會立刻執行回調函數。通過這種方式就可以不用阻塞執行的主線程。
示例代碼如下:
- @Async
- @SneakyThrows
- public ListenableFuture<String> sendEmailAsyncWithListenableFuture() {
- log.info("使用 Spring 異步任務發送郵件,并且獲取任務返回結果示例");
- TimeUnit.SECONDS.sleep(2l);
- return AsyncResult.forValue("success");
- }
Controller 層代碼如下所示:
- ListenableFuture<String> listenableFuture = emailService.sendEmailAsyncWithListenableFuture();
- // 異步回調處理
- listenableFuture.addCallback(new SuccessCallback<String>() {
- @Override
- public void onSuccess(String result) {
- log.info("異步回調處理返回值");
- }
- }, new FailureCallback() {
- @Override
- public void onFailure(Throwable ex) {
- log.error("異步回調處理異常",ex);
- }
- });
看到這里,如果有同學有疑惑,我們返回對象是 AsyncResult,為什么方法返回類可以是 Future,又可以是 ListenableFuture?
看完這張類繼承關系,大家應該就知道答案了。
異常處理方式
異步任務中異常處理方式,不是很難,我們只要在方法中將整個代碼塊 try...catch 即可。
- try {
- // 其他代碼
- } catch (Exception e) {
- e.printStackTrace();
- }
一般來說,我們只需要捕獲 Exception 異常,就可以應對大部分情況
但是極端情況下,比如方法內發生 OOM,將會拋出 OutOfMemoryError。如果發生Error 錯誤,以上的捕獲代碼就會失效。
Spring 的異步任務,默認提供幾種異常處理方式,可以統一處理異步任務中的發生的異常。
帶有返回值的異常處理方式
如果我們使用帶有返回值的異步任務,處理方式就比較簡單了,我們只需要捕獲 Future#get 拋出的異常就好了。
- Future<String> future = emailService.sendEmailAsyncWithResult();
- try {
- String result = future.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
如果我們使用 ListenableFuture 注冊回調函數處理,那我們在方法內增加一個 FailureCallback,在這個實現類處理相關異常即可。
- ListenableFuture<String> listenableFuture = emailService.sendEmailAsyncWithListenableFuture();
- // 異步回調處理
- listenableFuture.addCallback(new SuccessCallback<String>() {
- @Override
- public void onSuccess(String result) {
- log.info("異步回調處理返回值");
- }
- // 異常處理
- }, new FailureCallback() {
- @Override
- public void onFailure(Throwable ex) {
- log.error("異步回調處理異常",ex);
- }
- });
統一異常處理方式
沒有返回值的異步任務處理方式就比較復雜了,我們需要繼承 AsyncConfigurerSupport,實現 getAsyncUncaughtExceptionHandler 方法,示例代碼如下:
- @Slf4j
- @Configuration
- public class AsyncErrorHandler extends AsyncConfigurerSupport {
- @Override
- public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
- AsyncUncaughtExceptionHandler handler = (throwable, method, objects) -> {
- log.error("全局異常捕獲", throwable);
- };
- return handler;
- }
- }
ps:這個異常處理方式只能處理未帶返回值的異步任務。
異步任務使用注意點
異步線程池設置
Spring 異步任務默認使用 Spring 內部線程池 SimpleAsyncTaskExecutor 。
這個線程池比較坑爹,不會復用線程。也就是說來一個請求,將會新建一個線程。極端情況下,如果調用次數過多,將會創建大量線程。
Java 中的線程是會占用一定的內存空間 ,所以創建大量的線程將會導致 OOM 錯誤。
所以如果需要使用異步任務,我們需要一定要使用自定義線程池替換默認線程池。
XML 配置方式
如果當前使用 Spring XML 配置方式,我們可以使用如下配置設置線程池:
- <task:annotation-driven/>
- <task:executor id="executor" pool-size="10" queue-capacity="200"/>
注解方式
如果注解方式配置,配置方式如下:
- @Configuration
- public class AsyncConfiguration {
- @Bean
- public ThreadPoolTaskExecutor taskExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setThreadNamePrefix("task-Executor-");
- executor.setMaxPoolSize(10);
- executor.setCorePoolSize(5);
- executor.setQueueCapacity(200);
- // 還有其他參數可以設置
- return executor;
- }
- }
只要我們配置了這個線程池Bean,Spring 的異步任務都將會使用該線程池執行。
如果我們應用配置了多個線程池Bean,異步任務需要指定使用某個線程池執行,我們只需要在 @Async注解上設置相應 Bean 的名字即可。示例代碼如下:
- @Async("taskExecutor")
- public void sendEmailAsync() {
- log.info("使用 Spring 異步任務發送郵件示例");
- TimeUnit.SECONDS.sleep(2l);
- }
Spring Boot 方式
如果是 SpringBoot 項目,從阿粉的測試情況來看,默認將會創建核心線程數為 8,最大線程數為 Integer.MAX_VALUE,隊列數也為 Integer.MAX_VALUE線程池。
雖然上面的線程池不用擔心創建過多線程的問題,不是還是有可能隊列任務過多,導致 OOM 的問題。所以還是建議使用自定義線程池嗎,或者在配置文件修改默認配置,例如:
- spring.task.execution.pool.core-size=10
- spring.task.execution.pool.max-size=20
- spring.task.execution.pool.queue-capacity=200
ps:如果我們使用注解方式自定義了一個線程池,那么 Spring 異步任務都將會使用這個線程池。通過 SpringBoot 配置文件創建的線程池將會失效。
異步方法失效
Spring 異步任務背后原理是使用 AOP ,而使用 Spring AOP 時我們需要注意,切勿在方法內部調用其他使用 AOP 的方法,可能有點拗口,我們來看下代碼:
- @Async
- @SneakyThrows
- public ListenableFuture<String> sendEmailAsyncWithListenableFuture() {
- // 這樣調用,sendEmailAsync 不會異步執行
- sendEmailAsync();
- log.info("使用 Spring 異步任務發送郵件,并且獲取任務返回結果示例");
- TimeUnit.SECONDS.sleep(2l);
- return AsyncResult.forValue("success");
- }
- /**
- * 異步發送任務
- *
- * @throws InterruptedException
- */
- @SneakyThrows
- @Async("taskExecutor")
- public void sendEmailAsync() {
- log.info("使用 Spring 異步任務發送郵件示例");
- TimeUnit.SECONDS.sleep(2l);
- }
上面兩個方法都處于同一個類中,這樣調用將會導致 AOP 失效,無法起到 AOP 的效果。
其他類似的 @Transactional,以及自定義的 AOP 注解都會有這個問題,大家使用過程,千萬需要注意這一點。
總結
Spring 異步任務幫我們大大解決簡化開發了流程,只要使用一個@Async就可以輕松解決異步任務。
不過,雖然使用方式比較簡單,大家使用過程一定要注意設置合理的線程池。