你用錯了!詳解SpringBoot異步任務&任務調度&異步請求線程池的使用及原理
環境:SpringBoot2.7.12
1. 簡介
異步任務: 它允許將耗時的任務異步執行,從而提高系統的并發能力和響應速度。異步任務可以在啟動類上使用注解@EnableAsync進行啟用,并在需要異步執行的任務上使用@Async標注該方法為異步任務。通過這種方式,可以快速地在SpringBoot應用程序中實現異步處理。
任務調度: 是SpringBoot中用于管理和執行任務的機制。通過任務調用,可以輕松地調度、并行處理任務??梢栽趩宇惿咸砑覢EnableScheduling注解進行開啟。在需要執行的調度方法上使用@Scheduled注解。
異步請求: 是Web請求的一種處理方式,它允許后端在接收到請求后,新開一個線程來執行業務邏輯,釋放請求線程,避免請求線程被大量耗時的請求沾滿,導致服務不可用。在SpringBoot中,異步請求可以通過Controller的返回值來控制,支持多種類型。
以上不管是任務還是請求都會在一個異步線程中執行,這異步線程是使用的同一個嗎?還是各自都有各自的線程池?接下來將通過源碼的方式分析他們各自使用的線程池及自定義線程池。
2. 源碼分析
2.1 異步任務(@EnableAsync)
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {}
核心類AsyncConfigurationSelector
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
// 向容器注冊ProxyAsyncConfiguration
return new String[] {ProxyAsyncConfiguration.class.getName()};
// ...
}
}
}
ProxyAsyncConfiguration配置類中主要就是注冊了BeanPostProcessor用來處理@Async注解的方法。
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
// 處理器,該處理器獲取父類中的Supplier<Executor>
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
// ...
return bpp;
}
}
AbstractAsyncConfiguration
@Configuration(proxyBeanMethods = false)
public abstract class AbstractAsyncConfiguration implements ImportAware {
@Nullable
protected Supplier<Executor> executor;
// 自動裝配AsyncConfigurer,我們可以通過實現該接口來實現自己的Executor
// 默認情況下系統也沒有提供默認的AsyncConfigurer.
@Autowired
void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
List<AsyncConfigurer> candidates = configurers.stream().collect(Collectors.toList());
if (CollectionUtils.isEmpty(candidates)) {
return null;
}
// 如果存在多個將拋出異常(也就相當于容器中有多個Executor)
if (candidates.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
return candidates.get(0);
});
// 調用AsyncConfigurer#getAsyncExecutor方法,默認返回的null
// 所以最終這里返回的null
this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
}
private <T> Supplier<T> adapt(Supplier<AsyncConfigurer> supplier, Function<AsyncConfigurer, T> provider) {
return () -> {
AsyncConfigurer configurer = supplier.get();
return (configurer != null ? provider.apply(configurer) : null);
};
}
}
AsyncConfigurer
public interface AsyncConfigurer {
@Nullable
default Executor getAsyncExecutor() {
return null;
}
@Nullable
default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
到此,默認情況下通過ProxyAsyncConfiguration是不能得到Executor執行器,也就是到目前為止AsyncAnnotationBeanPostProcessor中的線程池對象還是null。接下來進入到AsyncAnnotationBeanPostProcessor處理器中。
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
public void setBeanFactory(BeanFactory beanFactory) {
// 創建切面類,而這里的executor根據上文的分析還是null。
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
// ...
}
}
AsyncAnnotationAdvisor
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// 構建通知類,這里傳入的executor為null
this.advice = buildAdvice(executor, exceptionHandler);
}
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler)
// 創建攔截器
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
// executor為null, configure是父類(AsyncExecutionAspectSupport )方法
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
// 這個方法非常重要,我們的AnnotationAsyncExecutionInterceptor 并沒有
// 被容器管理,所以內部的BeanFactory對象沒法注入是null,所以進行了設置
public void setBeanFactory(BeanFactory beanFactory) {
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}
}
AsyncExecutionAspectSupport
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// 這里的defaultExecutor獲取到d餓還是null,所以這里提供了默認的獲取Executor的方法
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
}
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// 從容器中查找TaskExecutor類型的Bean對象
// 由于SpringBoot提供了一個自動配置TaskExecutionAutoConfiguration
// 所以這里直接就返回了
return beanFactory.getBean(TaskExecutor.class);
}
// 如果容器中有多個TaskExecutor,那么將會拋出下面這個異常
catch (NoUniqueBeanDefinitionException ex) {
try {
// 獲取beanName=taskExecutor 獲取Executor
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
// 如果沒有這樣的Bean則結束了
catch (NoSuchBeanDefinitionException ex2) {
}
}
// 如果容器中沒有TaskExecutor類型的Bean拋出該異常
catch (NoSuchBeanDefinitionException ex) {
try {
// 這里與上面一樣了
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
}
}
// 最終返回null,那么程序最終也會報錯。
return null;
}
}
如果Executor返回的null,那么最終程序拋出異常
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport {
public Object invoke(final MethodInvocation invocation) throws Throwable {
// ...
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
}
}
到此應該說清楚了系統是如何查找Executor執行器的(線程池對象)。接下來通過實例演示:
@Service
public class AsyncService {
@Async
public void async() {
System.out.printf("%s - 異步任務執行...%n", Thread.currentThread().getName()) ;
}
}
默認情況
taskExecutor-2 - 異步任務執行...
可以通過如下配置修改默認配置
spring:
task:
execution:
pool:
core-size: 2
max-size: 2
allow-core-thread-timeout: false
thread-name-prefix: pack-
輸出
pack-2 - 異步任務執行...
自定義TaskExecutor
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);//核心線程數
pool.setMaxPoolSize(10);//最大線程數
pool.setQueueCapacity(25);//線程隊列
pool.setThreadNamePrefix("pack-custom-") ;
pool.initialize();//線程初始化
return pool;
}
根據上面源碼的分析,也可以自定義AsyncConfigurer
@Component
public class PackAsyncConfigurer implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
return new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)) ;
}
}
以上就是關于異步任務的源碼分析及自定義實現。
2.2 異步請求
在SpringMVC中我們可以將Controller接口的返回值定義為:DeferredResult、Callable、Reactive Types等。只要返回值是這些,我們耗時的代碼就可以放到一個異步的線程中執行。這里我們一Callable為例,先看看效果
@GetMapping("/callable")
public Callable<Map<String, Object>> callable() {
long start = System.currentTimeMillis() ;
System.out.printf("%s - 開始時間:%d%n", Thread.currentThread().getName(), start) ;
Callable<Map<String, Object>> callable = new Callable<Map<String, Object>>() {
public Map<String, Object> call() throws Exception {
Map<String, Object> result = new HashMap<>() ;
try {
// 這里模擬耗時操作
TimeUnit.SECONDS.sleep(1) ;
// 將執行結果保存
result.put("code", 1) ;
result.put("data", "你的業務數據") ;
System.out.println(Thread.currentThread().getName()) ;
} catch (InterruptedException e) {}
return result ;
}
} ;
long end = System.currentTimeMillis() ;
System.out.printf("%s - 結束時間:%d%n", Thread.currentThread().getName(), end) ;
System.out.printf("總耗時:%d毫秒%n", (end - start)) ;
return callable ;
}
執行結果
http-nio-8882-exec-4 - 開始時間:1705560641226
http-nio-8882-exec-4 - 結束時間:1705560641227
總耗時:1毫秒
pack-2
根據最后一行的輸出,我們在配置文件中配置的生效了。也就是異步請求與異步任務這一點是相同的,默認會使用系統默認提供的線程池(上面介紹了默認的自動配置)。
源碼分析
在SpringMVC中具體Controller接口的方法的調用是通過HandlerMapping,而這個具體實現是RequestMappingHandlerAdapter,所以我們就先從這里開始看
public class RequestMappingHandlerAdapter {
// 根據輸出實際并沒有使用這里默認的
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("MvcAsync");
protected ModelAndView handleInternal(HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
invokeHandlerMethod(request, response, handlerMethod);
}
protected ModelAndView invokeHandlerMethod(HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
// ...
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
// 設置異步任務執行器(線程池)
asyncManager.setTaskExecutor(this.taskExecutor);
// ...
}
public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
}
既然沒有使用默認的線程池對象,那么這里是如果設置的系統默認TaskExecutor? 這是在容器實例化RequestMappingHandlerAdapter Bean對象時設置。
public class WebMvcConfigurationSupport {
@Bean
public RequestMappingHandlerAdapter requestMappingHandlerAdapter(...) {
RequestMappingHandlerAdapter adapter = createRequestMappingHandlerAdapter();
// ...
AsyncSupportConfigurer configurer = getAsyncSupportConfigurer();
if (configurer.getTaskExecutor() != null) {
adapter.setTaskExecutor(configurer.getTaskExecutor());
}
// ...
return adapter;
}
protected AsyncSupportConfigurer getAsyncSupportConfigurer() {
// 默認為null
if (this.asyncSupportConfigurer == null) {
this.asyncSupportConfigurer = new AsyncSupportConfigurer();
// 調用子類DelegatingWebMvcConfiguration方法(子類重寫了)
configureAsyncSupport(this.asyncSupportConfigurer);
}
return this.asyncSupportConfigurer;
}
}
// 子類
public class DelegatingWebMvcConfiguration extends WebMvcConfigurationSupport {
private final WebMvcConfigurerComposite configurers = new WebMvcConfigurerComposite();
// 獲取當前環境下所有自定義的WebMvcConfigurer
@Autowired(required = false)
public void setConfigurers(List<WebMvcConfigurer> configurers) {
if (!CollectionUtils.isEmpty(configurers)) {
this.configurers.addWebMvcConfigurers(configurers);
}
}
protected void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 在組合器中依次調用自定義的WebMvcConfigurer(如果有重寫對應的configureAsyncSupport方法)
this.configurers.configureAsyncSupport(configurer);
}
}
SpringBoot默認提供了一個自定義的WebMvcConfigurer且重寫了configureAsyncSupport方法。
public static class WebMvcAutoConfigurationAdapter implements WebMvcConfigurer {
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 判斷容器中是否以applicationTaskExecutor為beanName的bean
// SpringBoot自動配置有提供這樣的bean(TaskExecutionAutoConfiguration)
if (this.beanFactory.containsBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME)) {
// 獲取bean對象
Object taskExecutor = this.beanFactory
.getBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME);
if (taskExecutor instanceof AsyncTaskExecutor) {
configurer.setTaskExecutor(((AsyncTaskExecutor) taskExecutor));
}
}
// ...
}
}
到此,異步請求使用的線程池應該清楚了使用的是系統默認的線程池可通過配置文件修改默認值。我們也可以通過自定義WebMvcConfigurer來重寫對應的方法實現自己的線程池對象。
總結:在默認的情況下,異步任務與異步請求使用的是同一個線程池對象。
2.3 任務調用
先通過一個示例,查看默認執行情況
@Service
public class SchedueService {
@Scheduled(cron = "*/2 * * * * *")
public void task() {
System.out.printf("%s: %d - 任務調度%n", Thread.currentThread().getName(), System.currentTimeMillis()) ;
}
}
輸出
scheduling-1: 1705564144014 - 任務調度
scheduling-1: 1705564146010 - 任務調度
scheduling-1: 1705564148003 - 任務調度
scheduling-1: 1705564150005 - 任務調度
scheduling-1: 1705564152001 - 任務調度
使用的scheduling相應的線程池,每隔2s執行任務。
源碼分析
@Import(SchedulingConfiguration.class)
public @interface EnableScheduling {}
核心類SchedulingConfiguration
@Configuration(proxyBeanMethods = false)
public class SchedulingConfiguration {
// 該bean專門用來處理@Scheduled注解的核心處理器類
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
ScheduledAnnotationBeanPostProcessor
public class ScheduledAnnotationBeanPostProcessor {
// 看到這個就能猜到,系統默認是獲取的taskScheduler bean對象
public static final String DEFAULT_TASK_SCHEDULER_BEAN_NAME = "taskScheduler";
public void onApplicationEvent(ContextRefreshedEvent event) {
// 是同一個容器
if (event.getApplicationContext() == this.applicationContext) {
// 注冊任務
finishRegistration();
}
}
private void finishRegistration() {
if (this.beanFactory instanceof ListableBeanFactory) {
// 獲取所有SchedulingConfigurer,我們可以實現該接口自定義配置線程池
Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
// 在這里可以自定義的不僅僅是線程池了,還有其它的東西
configurer.configureTasks(this.registrar);
}
}
// 在默認情況下,上面是沒有自定義的SchedulingConfigurer。
// 有調度的任務并且當前的調度任務注冊器中沒有線程池對象
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
try {
// 通過類型查找TaskScheduler類型的bean對象。這里就會獲取到系統默認的
// 由TaskSchedulingAutoConfiguration自動配置
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
// 如果查找到了多個則進入這個catch(如你自己也定義了多個)
catch (NoUniqueBeanDefinitionException ex) {
try {
// 通過beanName進行查找
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
}
}
catch (NoSuchBeanDefinitionException ex) {
try {
// 如果不存在則在查找ScheduledExecutorService
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
}
// 如果有多個則再根據名字查找
catch (NoUniqueBeanDefinitionException ex2) {
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
}
}
}
}
this.registrar.afterPropertiesSet();
}
}
以上就是任務調用如何查找使用線程池對象的。根據上面的分析我們也可以通過如下的方式進行自定義處理。
自定義SchedulingConfigurer
@Component
public class PackSchedulingConfigurer implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler() ;
taskScheduler.setThreadNamePrefix("pack-schedule-") ;
taskScheduler.afterPropertiesSet() ;
taskRegistrar.setTaskScheduler(taskScheduler ) ;
}
}
執行結果
pack-schedule-1: 1705567234013 - 任務調度
pack-schedule-1: 1705567236011 - 任務調度
自定義ThreadPoolTaskScheduler
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler() ;
scheduler.setThreadNamePrefix("pack-custom-scheduler") ;
return scheduler ;
}
執行結果
pack-custom-scheduler1: 1705567672013 - 任務調度
pack-custom-scheduler1: 1705567674011 - 任務調度
通過配置文件自定義默認配置
spring:
task:
scheduling:
pool:
size: 2
thread-name-prefix: pack-custom-
注意:系統默認的任務調用pool.size=1。所以如果你有多個調度任務要當心了。