SpringBoot:Event實現發布/訂閱模式
作者:拔土豆的程序員
不管是否基于spring boot 的發布訂閱模型,最終都是開啟了線程執行任務,和使用第三方的MQ消息組件,問題在于重啟服務器或者未知原因崩潰的時候,消息的恢復機制要自行處理。
如圖所示支付業務中,用戶支付成功之后,后續還有很多的業務流程,但是對于用戶來講是透明的,所以為了提高接口的響應速率,提高用戶體驗,后續操作都會選擇異步執行。
異步執行方式
異步執行主體
@Service
public class OrderService {
public void orderSuccess(){
// 訂單完成異步任務開啟 可以再統一封裝
Order order = new Order();
order.setOrderNo(String.valueOf(System.currentTimeMillis()));
Map<String, OrderSuccessService> orderSuccessServiceMap = SpringContextUtil.getBeansOfType(OrderSuccessService.class);
orderSuccessServiceMap.values().forEach(service -> {
service.orderSuccess(order);
});
}
}
異步執行接口
public interface OrderSuccessService {
/**
* 訂單支付成功
* @param order
*/
public CompletableFuture<Boolean> orderSuccess(Order order);
}
@Slf4j
@Service
public class MerchantNoticeServiceImpl implements OrderSuccessService {
@Override
@Async("taskExecutor")
public CompletableFuture<Boolean> orderSuccess(Order order) {
log.info("{}商戶通知:{}",Thread.currentThread(),order);
// 返回異步調用的結果
return CompletableFuture.completedFuture(true);
}
}
@Slf4j
@Service
public class MerchantNoticeServiceImpl implements OrderSuccessService {
@Override
@Async("taskExecutor")
public CompletableFuture<Boolean> orderSuccess(Order order) {
log.info("{}商戶通知:{}",Thread.currentThread(),order);
// 返回異步調用的結果
return CompletableFuture.completedFuture(true);
}
}
@Slf4j
@Service
public class MerchantNoticeServiceImpl implements OrderSuccessService {
@Override
@Async("taskExecutor")
public CompletableFuture<Boolean> orderSuccess(Order order) {
log.info("{}商戶通知:{}",Thread.currentThread(),order);
// 返回異步調用的結果
return CompletableFuture.completedFuture(true);
}
}
自定義線程池,線程池隔離,開啟異步任務執行
@Configuration // 配置類
@EnableAsync // @Async注解能夠生效
public class TaskConfiguration {
@Bean("taskExecutor")
public Executor taskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 線程池創建時候初始化的線程數
executor.setCorePoolSize(5);
// 線程池最大的線程數,只有在緩沖隊列滿了之后,才會申請超過核心線程數的線程
executor.setMaxPoolSize(10);
// 用來緩沖執行任務的隊列
executor.setQueueCapacity(200);
// 當超過了核心線程之外的線程,在空閑時間到達之后會被銷毀
executor.setKeepAliveSeconds(60);
// 可以用于定位處理任務所在的線程池
executor.setThreadNamePrefix("taskExecutor-orderSuccess-");
// 這里采用CallerRunsPolicy策略,當線程池沒有處理能力的時候,該策略會直接在execute方法的調用線程中運行被拒絕的任務;
// 如果執行程序已關閉,則會丟棄該任務
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 設置 線程池關閉 的時候 等待 所有任務都完成后,再繼續 銷毀 其他的 Bean,
// 這樣這些 異步任務 的 銷毀 就會先于 數據庫連接池對象 的銷毀。
executor.setWaitForTasksToCompleteOnShutdown(true);
// 該方法用來設置線程池中 任務的等待時間,如果超過這個時間還沒有銷毀就 強制銷毀,以確保應用最后能夠被關閉,而不是阻塞住。
executor.setAwaitTerminationSeconds(60);
return executor;
}
}
Spring Event實現發布/訂閱模式
自定義事件:通過繼承ApplicationEve?
nt,并重寫構造函數,實現事件擴展。
public class OrderApplicationEvent extends ApplicationEvent {
public OrderApplicationEvent(OrderData orderData){
super(orderData);
}
}
定義事件的消息體
@Data
public class OrderData {
/**
* 訂單號
*/
private String orderNo;
}
事件監聽
@Slf4j
@Service
public class MerchantNoticeListener {
@Async("asyncEventTaskExecutor")
@EventListener
public CompletableFuture<Boolean> orderSuccess(OrderApplicationEvent event) {
log.info("{}商戶通知:{}",Thread.currentThread(),event);
// 返回異步調用的結果
return CompletableFuture.completedFuture(true);
}
}
@Slf4j
@Service
public class UserNoticeListener implements ApplicationListener<OrderApplicationEvent> {
@Override
@Async("asyncEventTaskExecutor")
public void onApplicationEvent(OrderApplicationEvent event) {
log.info("{}用戶通知:{}",Thread.currentThread(),event);
}
}
@Slf4j
@Service
public class UserNoticeListener implements ApplicationListener<OrderApplicationEvent> {
@Override
@Async("asyncEventTaskExecutor")
public void onApplicationEvent(OrderApplicationEvent event) {
log.info("{}用戶通知:{}",Thread.currentThread(),event);
}
}
@Slf4j
@Service
public class UserNoticeListener implements ApplicationListener<OrderApplicationEvent> {
@Override
@Async("asyncEventTaskExecutor")
public void onApplicationEvent(OrderApplicationEvent event) {
log.info("{}用戶通知:{}",Thread.currentThread(),event);
}
}
自定義線程池
@Configuration
@Slf4j
@EnableAsync // @Async注解能夠生效
public class AsyncConfiguration implements AsyncConfigurer {
@Bean("asyncEventTaskExecutor")
public ThreadPoolTaskExecutor executor(){
//Spring封裝的一個線程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(30);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("asyncEventTaskExecutor--orderSuccess-");
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor(){
return executor();
}
/**
* 異常處理
* @return
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
}
}
事件發布
@Service
@Slf4j
public class OrderEventService {
private final ApplicationEventPublisher applicationEventPublisher;
public OrderEventService(ApplicationEventPublisher applicationEventPublisher){
this.applicationEventPublisher = applicationEventPublisher;
}
public void success(){
OrderData orderData = new OrderData();
orderData.setOrderNo(String.valueOf(System.currentTimeMillis()));
// 消息
OrderApplicationEvent orderApplicationEvent = new OrderApplicationEvent(orderData);
// 發布事件
applicationEventPublisher.publishEvent(orderApplicationEvent);
}
}
寫在最后:不管是否基于spring boot 的發布訂閱模型,最終都是開啟了線程執行任務,和使用第三方的MQ消息組件,問題在于重啟服務器或者未知原因崩潰的時候,消息的恢復機制要自行處理。
建議使用在一些邊緣業務,比如記錄日志,這些要求沒有那么高的業務。
責任編輯:武曉燕
來源:
今日頭條