事務消息應用場景、實現原理與項目實戰
1、活動中心場景介紹
在電商系統上線初期,往往會進行一些“拉新”活動,例如活動部門提出新用戶注冊送積分、送優惠券活動。
基于分布式、微服務的設計理念,通常的架構設計(子系統交互)如下圖所示:
其核心系統介紹如下:
- 賬戶中心
提供用戶登錄、用戶注冊等服務,一個新用戶注冊時,向MQ服務器中的USER_REGISTER主題發送一條消息,主流程結束,與送積分,送優惠券等過程解耦。
- 優惠券(券系統)
提供發放優惠券、使用優惠券等與券相關的基礎服務。
- 積分中心
提供積分相關的服務,例如積分贈送、積分消費、積分查詢等基礎服務。
- 送積分服務(消費者)
訂閱MQ,按照規則決定是否需要贈送積分,如果需要則調用積分相關的基礎接口,完成積分的發放。
- 送優惠券(消費者)
訂閱MQ,按照規則決定是否需要贈送優惠券,如果需要則調用券系統相關的基礎接口,完成優惠券的發放。
上面的架構設計非常優雅,但并不是無懈可擊,讀者們肯定會想到如果新用戶注冊成功,但消息發送到MQ失敗,或者消息成功發送到MQ,但發送完MQ后系統出現異常導致用戶注冊失敗又該如何呢?
上面的問題其實就是典型的分布式事務問題:即如何保證用戶注冊(數據庫操作)與MQ消息發送這兩個分布式操作的一致性。
RocketMQ事務消息閃亮登場。
2、事務消息實現原理
一言以蔽之:RocketMQ事務消息要解決的問題是消息發送與業務的一致性,其解決思路:二階段提交與事務狀態回查,其具體實現流程如下圖所示:
其核心設計理念:
- 應用程序開啟一個數據庫事務,進行數據庫操作,并且在事務中發送一條PREPARE消息,PREPARE消息發送成功后通知應用程序記錄本地事務狀態,然后提交本地事務。
- RocketMQ在收到類型為PREPARE的消息時,首先備份消息的原主題與原消息消費隊列,然后將消息存儲在主題為RMQ_SYS_TRANS_HALF_TOPIC的消息隊列中,故PREPARE的消息是不會被客戶端消費的。
- Broker消息服務器開啟一個定時任務處理RMQ_SYS_TRANS_HALF_TOPIC中的消息,會每隔指定時間向消息發送者發起事務狀態查詢請求 ,詢問消息發送者客戶端本地事務是否成功,然后根據回查狀態決定是提交還是回滾,即對處于PREPARE狀態進行提交或回滾操作。
- 發送者如果明確得知事務成功,則可以返回COMMIT,服務端會提交該條消息,具體操作是恢復原消息的主題與隊列,重新發送到Broker,消費端感知后消費。
- 發送者如果無法明確得知事務狀態,則返回UNOWN,此時服務端會等待一定時間后再次向發送者詢問,默認詢問15次。
- 發送者如果非常明確得知事務失敗,則可以返回ROLLBACK。
在具體實踐中,消息發送者在無法獲取事務狀態時不要武斷的返回ROLLBACK,而是要返回UNOWN,讓服務端定時重試回查,說明如下:
在將PREPARE消息發送到Broker后,服務端發起事務查詢時本地事務可能還未提交,為了避免無效的事務回查機制,RocketMQ通常至少在收到PREPARE消息6s后才會發起第一次事務回查,可通過 transactionTimeOut 配置。故客戶端在實現事務回查時無法證明事務狀態時不應該返回ROLLBACK,而是返回UNOWN。
3、事務消息實戰
光說不練假把式,接下來以一個新用戶注冊送優惠券的場景來詳細介紹如何使用事務消息。
項目模塊職責說明如下:
事務消息的核心代碼組裝在transaction-service,其核心類圖如下:
其中核心要點如下:
- UserServiceImpl
Dubbo接口業務實現類,類似MVC的控制層,在這里做一些參數驗證,但不執行具體的業務邏輯,只是發送一條事務消息到MQ。
- UserRegTransactionListener
事務監聽器,在 executeLocalTransaction 方法中執行業務邏輯,數據庫本地事務加在該方法。
溫馨提示:之所以不在UserServicveImpl中執行本地事務,是因為 executeLocalTransaction 中拋出的異常會被RocketMQ框架捕捉,及異常無法被UserServiceImpl感知,即無法實現其事務的一致性。
接下來展示其核心代碼,全部源碼已上傳到github倉庫。
倉庫地址:https://github.com/dingwpmz/rocketmq-learning
3.1 UserServiceImpl 核心實現
UserServiceImpl 的核心要點如下:
- 首先應該對參數進行校驗、業務邏輯進行校驗,如果不滿足業務條件,會發送一些無效消息到MQ,雖然不會造成業務異常,但會消耗性能
- 發送事務消息,建議對消息設置Key,Key的值可以用業務處理流水號(可唯一表示該業務操作)或者核心業務字段(例如訂單編號)
- 業務入口類可通過事務消息發送狀態來判斷業務是否失敗。
3.2 UserRegTransactionListener 核心實現
事務監聽器需要實現執行本地事務與事務回查兩個接口。
3.2.1 實現 executeLocalTransaction
首先需要實現 executeLocalTransaction 方法,執行本地事務,其代碼如下圖所示:
其中幾個關鍵點說明如下:
- 在該方法上添加數據庫事務標簽。
- 執行業務邏輯,示例Demo只是將用戶數據存儲到數據庫。
- 如果業務執行失敗,可明確告知需要回滾,上層調用方也可根據ROLLBACK_MESSAGE進行相應的處理。
- 如果業務成功,不建議直接返回COMMIT,而是建議返回UNKNOW,因為該方法盡管在方法最后一行,但可能發生斷電等異常情況,數據庫并沒有成功。
3.2.2 實現 checkLocalTransaction
其次需要實現事務狀態回查,用來RocketMQ服務端感知事務是否成功,其實現原理如下圖所示:
其實現關鍵點如下:
- 如果能明確得知本地事務成功,則返回COMMIT_MESSAGE
- 如該不能明確得知本地事務成功,不能返回ROLLBACK_MESSAGE,而是返回UNKNOW,等待服務端下一次事務回查(不會立即觸發),服務端默認回查15次,如果15次都得到UNKNOW,則會回滾該消息。
3.3 代碼獲取
上文只是將事務消息的核心代碼加以解讀,并重點闡述每個步驟的實現關鍵點,筆者基于SpringBoot,嘗試結合場景學習RocketMQ的使用技巧,其代碼上傳到了github倉庫。
https://github.com/dingwpmz/rocketmq-learning
丁威,《RocketMQ技術內幕》作者,RocketMQ社區優秀布道師,主打成體系分享JAVA主流中間件,打造完備的互聯網架構體系,目前涵蓋Java并發、微服務、消息、調度、數據異構等領域,未來繼續關注監控、在線診斷等領域。
本文轉載自微信公眾號「中間件興趣圈」,可以通過以下二維碼關注。轉載本文請聯系中間件興趣圈公眾號。