一套萬能的異步處理方案!
良好的系統(tǒng)設(shè)計必須要做到開閉原則,隨著業(yè)務(wù)的不斷迭代更新,核心代碼也會被不斷改動,出錯的概率也會大大增加。但是大部分增加的功能都是在擴展原有的功能,既要保證性能又要保證質(zhì)量,我們往往都會使用異步線程池來處理,然而卻增加了很多不確定性因素。由此我設(shè)計了一套通用的異步處理SDK,可以很輕松的實現(xiàn)各種異步處理。
目的
通過異步處理不僅能夠保證方法能夠得到有效的執(zhí)行而且不影響主流程。
更重要的是各種兜底方法保證數(shù)據(jù)不丟失,從而達到最終一致性。
優(yōu)點
無侵入設(shè)計,獨立數(shù)據(jù)庫,獨立定時任務(wù),獨立消息隊列,獨立人工執(zhí)行界面(統(tǒng)一登錄認證)。
使用spring事務(wù)事件機制,即使異步策略解析失敗也不會影響業(yè)務(wù)。
如果你的方法正在運行事務(wù),會等事務(wù)提交后或回滾后再處理事件。
就算事務(wù)提交了,異步策略解析失敗了,我們還有兜底方案執(zhí)行(除非數(shù)據(jù)庫有問題,消息隊列有問題,方法有bug)。
原理
容器初始化bean完成后遍歷所有方法,把有@AsyncExec注解的方法緩存起來。
方法運行時通過AOP切面發(fā)布事件。
事務(wù)事件監(jiān)聽處理異步執(zhí)行策略:
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)
fallbackExecution=true 沒有事務(wù)正在運行,依然處理事件。
TransactionPhase.AFTER_COMPLETION 事務(wù)提交后和事務(wù)回滾后都處理事件。
組件
- kafka 消息隊列
- xxl job 定時任務(wù)
- mysql 數(shù)據(jù)庫
- spring 切面
- vue 界面
設(shè)計模式
- 策略
- 模板方法
- 動態(tài)代理
- 流程圖
圖片
數(shù)據(jù)庫腳本
CREATE TABLE `async_scene` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主鍵ID',
`application_name` varchar(100) NOT NULL DEFAULT '' COMMENT '應(yīng)用名稱',
`method_sign` varchar(50) NOT NULL DEFAULT '' COMMENT '方法簽名',
`scene_name` varchar(200) NOT NULL DEFAULT '' COMMENT '業(yè)務(wù)場景描述',
`async_type` varchar(50) NOT NULL DEFAULT '' COMMENT '異步策略類型',
`queue_name` varchar(200) NOT NULL DEFAULT '' COMMENT '隊列名稱',
`theme_value` varchar(100) NOT NULL DEFAULT '' COMMENT '消費主題',
`exec_count` int NOT NULL DEFAULT '0' COMMENT '失敗重試次數(shù)',
`exec_deleted` int NOT NULL DEFAULT '0' COMMENT '執(zhí)行后是否刪除',
`async_version` varchar(50) NOT NULL DEFAULT '' COMMENT '組件版本號',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
`cdc_crt_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '記錄新增時間',
`cdc_upd_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '記錄修改時間',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `uk_application_sign` (`application_name`,`method_sign`) USING BTREE,
KEY `idx_cdc_upd_time` (`cdc_upd_time`)
) ENGINE=InnoDB COMMENT='異步場景表';
CREATE TABLE `async_req` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主鍵ID',
`application_name` varchar(100) NOT NULL DEFAULT '' COMMENT '應(yīng)用名稱',
`sign` varchar(50) NOT NULL DEFAULT '' COMMENT '方法簽名',
`class_name` varchar(200) NOT NULL DEFAULT '' COMMENT '全路徑類名稱',
`method_name` varchar(100) NOT NULL DEFAULT '' COMMENT '方法名稱',
`async_type` varchar(50) NOT NULL DEFAULT '' COMMENT '異步策略類型',
`exec_status` tinyint NOT NULL DEFAULT '0' COMMENT '執(zhí)行狀態(tài) 0:初始化 1:執(zhí)行失敗 2:執(zhí)行成功',
`exec_count` int NOT NULL DEFAULT '0' COMMENT '執(zhí)行次數(shù)',
`param_json` longtext COMMENT '請求參數(shù)',
`remark` varchar(200) NOT NULL DEFAULT '' COMMENT '業(yè)務(wù)描述',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_applocation_name` (`application_name`) USING BTREE,
KEY `idx_exec_status` (`exec_status`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='異步處理請求';
CREATE TABLE `async_log` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主鍵ID',
`async_id` bigint NOT NULL DEFAULT '0' COMMENT '異步請求ID',
`error_data` longtext COMMENT '執(zhí)行錯誤信息',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_async_id` (`async_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='異步處理日志';
異步策略
圖片
安全級別
圖片
執(zhí)行狀態(tài)
圖片
流程圖
圖片
圖片
圖片
apollo 配置
# 開關(guān):默認關(guān)閉
async.enabled=true
# 應(yīng)用名稱
spring.application.name=xxx
# 數(shù)據(jù)源 druid
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/fc_async?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true
spring.datasource.username=user
spring.datasource.password=xxxx
spring.datasource.filters=config
spring.datasource.cnotallow=config.decrypt=true;config.decrypt.key=yyy
#靜態(tài)地址
spring.resources.add-mappings=true
spring.resources.static-locatinotallow=classpath:/static/
# 以下配置都有默認值
# 核心線程數(shù)
async.executor.thread.corePoolSize=10
# 最大線程數(shù)
async.executor.thread.maxPoolSize=50
# 隊列容量
async.executor.thread.queueCapacity=10000
# 活躍時間
async.executor.thread.keepAliveSecnotallow=600
# 執(zhí)行成功是否刪除記錄:默認刪除
async.exec.deleted=true
# 自定義隊列名稱前綴:默認應(yīng)用名稱
async.topic=${spring.application.name}
# 重試執(zhí)行次數(shù):默認5次
async.exec.count=5
# 重試最大查詢數(shù)量
async.retry.limit=100
# 補償最大查詢數(shù)量
async.comp.limit=100
# 登錄攔截:默認false
async.login=false
用法
1.異步開關(guān)
scm.async.enabled=true
2.在需要異步執(zhí)行的方法加注解 (必須是spring代理方法)
@AsyncExec(type = AsyncExecEnum.SAVE_ASYNC, remark = "數(shù)據(jù)字典")
3.人工處理地址
http://localhost:8004/async/index.html
注意
1.應(yīng)用名稱
spring.application.name
2.隊列名稱
${async.topic:${spring.application.name}}_async_queue
3.自己業(yè)務(wù)要做冪等
4.一個應(yīng)用公用一個隊列
自產(chǎn)自消
5.定時任務(wù)
- 異步重試定時任務(wù)(2分鐘重試一次,可配置重試次數(shù))。
- 異步補償定時任務(wù)(一小時補償一次,創(chuàng)建時間在一小時之前的)。
效果展示
圖片
圖片