看我如何教我同事處理SpringEvent接口超時問題
最近線上的一個接口前端訪問竟然超時了,前端設置的超時時間是10s,也就是說這個接口在10s內都還沒有返回數據,同事去看了說這個接口就是一個很簡單的新增數據接口,不應該導致超時的呀?
我看了下這個接口的邏輯,在結合日志初步定位是因為Spring Event中調用其他服務接口導致的超時。
問題追蹤
首先看下這個接口調用的主要邏輯
public void saveData(MergeDataParam param) {
MergeData data = convert(param);
dataMapper.insert(data);
applicationContext.publishEvent(new MergeDataEvent(new MergeDataMessage(param.getDataId())));
}
@EventListener
public void notifyOnboardServiceWhenMergeData(MergeDataEvent event) {
MergeDataMessage msg = (MergeDataMessage)event.getSource();
//記錄日志
log.info("notify onbaord service when merge data, data id is {}", msg.getDataId());
NotifyData notifyData = getData(msg.getDataId());
Response<Result> response = remoteOnboardService.notify(notifyData);
// 省略其他業(yè)務代碼
log.info("notify result is {}", JSON.toJSONString(response));
}
確實邏輯比較簡單,主要就是保存數據,然后通知下游系統(tǒng),但是從日志上來看,日志只打印了 notify onbaord service when merge data, data id is 這里的內容,然后前端就超時了。
接著我又去看了下游服務的實現,發(fā)現確實有一些超時業(yè)務邏輯,但是下游服務是別人寫的,我們改不了。
同事還一直以為這個事件是異步的,我告訴他不是的,如果要想變成異步的需要自己配置。
要想配置成異步的有兩種方式,一種是通過@Async注解,另一種方式是在EventMulticaster中指定線程池。
@Async注解使用
首先我們需要開啟異步支持,然后指定一個異步執(zhí)行器,不然使用的線程池配置是默認的
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("NotifyAsyncThread-");
executor.initialize();
return executor;
}
}
然后在我們事件監(jiān)聽器上加上@Async注解
@Async("asyncExecutor")
@EventListener
public void notifyOnboardServiceWhenMergeData(MergeDataEvent event) {
MergeDataMessage msg = (MergeDataMessage)event.getSource();
//記錄日志
log.info("notify onbaord service when merge data, data id is {}", msg.getDataId());
NotifyData notifyData = getData(msg.getDataId());
Response<Result> response = remoteOnboardService.notify(notifyData);
// 省略其他業(yè)務代碼
log.info("notify result is {}", JSON.toJSONString(response));
}
然后其他代碼不用改變,此時我們的監(jiān)聽器就可以異步執(zhí)行了。需要記住使用這個注解有一些注意事項
- 異步方法不能返回值。如果需要返回值,可以考慮使用 Future<T> 或 CompletableFuture<T>。
- 異步方法中拋出的異常不會被調用者直接捕獲。確保適當處理異常,或考慮使用 AsyncUncaughtExceptionHandler。
- 如果在同一個類中調用異步方法,直接調用不會觸發(fā)異步行為。這是因為Spring的AOP代理機制導致的。
- 確保你的 MyEvent 類是線程安全的,因為它可能被多個線程同時訪問。
如果對@Async默認線程池感興趣的同學可以通過這幾個類進行進一步查看
- 處理基礎類AsyncExecutionAspectSupport,定義了默認執(zhí)行器
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
// 省略其他代碼
return (Executor)beanFactory.getBean(TaskExecutor.class);
}
- 異步執(zhí)行器配置類TaskExecutionAutoConfiguration
@Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
ObjectProvider<TaskDecorator> taskDecorator) {
TaskExecutionProperties.Pool pool = properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
// 省略其他代碼
return builder;
}
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}
配置時間廣播器multicaster
首選創(chuàng)建一個配置類。
@Configuration
public class AsynchronousSpringEventsConfig {
@Bean(name = "applicationEventMulticaster")
public ApplicationEventMulticaster simpleApplicationEventMulticaster() {
SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
ThreadPoolExecutor e = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadFactoryBuilder().setNameFormat("notify-onboard-thread-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
eventMulticaster.setTaskExecutor(e);
// 設置錯誤處理器
eventMulticaster.setErrorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
log.error("get error when notify onboard service", t);
}
});
return eventMulticaster;
}
}
然后事件監(jiān)聽器的代碼不用修改,發(fā)布時間方式也不用修改。
使用這種方式有幾個注意點
- 所有有的事件監(jiān)聽器都變?yōu)楫惒綀?zhí)行。如果某些監(jiān)聽器需要同步執(zhí)行,你可能需要額外的配置或使用不同的多播器。
- 這里廣播器的名字一定要叫applicationEventMulticaster
// APPLICATION_EVENT_MULTICASTER_BEAN_NAME = applicationEventMulticaster
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
}
else {
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
}
}
這里為什么要指定errorHandler呢? 是因為如果不指定的話,然后執(zhí)行的業(yè)務邏輯又拋出了異常,這個時候異常是會往外拋的,所以我們就會在執(zhí)行的方法上加上try-catch, 而有了handler,我們就可以統(tǒng)一處理這個異常問題。
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
從 SimpleApplicationEventMulticaster中可以看出來,如果不指定Executor, 則最終事件的執(zhí)行是由同一 個線程按順序來完成的,同時任何一個報錯,都會導致后續(xù)的監(jiān)聽器執(zhí)行不了(如果存在多個監(jiān)聽器)。
總結
至此,我總結了兩種事件異步執(zhí)行的方式,我讓我同事選擇一種,他選了第二種,如果是你你會選擇哪一種?