事物消息的實現-RocketMQ知識體系6
分布式事務是指事務的參與者、支持事務的服務器、資源服務器以及事務管理器分別位于不同的分布式系統的不同節點之上。例如在大型電商系統中,下單接口通常會扣減庫存、減去優惠、生成訂單 id, 而訂單服務與庫存、優惠、訂單 id 都是不同的服務,下單接口的成功與否,不僅取決于本地的 db 操作,而且依賴第三方系統的結果,這時候分布式事務就保證這些操作要么全部成功,要么全部失敗。本質上來說,分布式事務就是為了保證不同數據庫的數據一致性。
目前解決分布式事物的解決方案有seata,lcn 等。
RocketMQ 分布式事物實現
RocketMQ提供了事務消息的功能,采用2PC(兩段式協議)+補償機制(事務回查)的分布式事務功能,通過消息隊列 RocketMQ 版事務消息能達到分布式事務的最終一致。
首先,我們要知道什么是半事物消息和消息回查:
- 半事務消息:
暫不能投遞的消息,發送方已經成功地將消息發送到了消息隊列 RocketMQ 版服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態,處于該種狀態下的消息即半事務消息。
- 消息回查:
由于網絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,消息隊列 RocketMQ 版服務端通過掃描發現某條消息長期處于“半事務消息”時,需要主動向消息生產者詢問該消息的最終狀態(Commit 或是 Rollback),該詢問過程即消息回查。
【交互流程】
事務消息發送步驟如下:
- 發送方將半事務消息發送至消息隊列 RocketMQ 版服務端。
- 消息隊列 RocketMQ 版服務端將消息持久化成功之后,向發送方返回 Ack。確認消息已經發送成功,此時消息為半事務消息。
- 發送方開始執行本地事務邏輯。
- 發送方根據本地事務執行結果向服務端提交二次確認(Commit 或是 Rollback),服務端收到 Commit 狀態則將半事務消息標記為可投遞,訂閱方最終將收到該消息;服務端收到 Rollback 狀態則刪除半事務消息,訂閱方將不會接受該消息。
事務消息回查步驟如下:
- 在斷網或者是應用重啟的特殊情況下,上述步驟 4 提交的二次確認最終未到達服務端,經過固定時間后服務端將對該消息發起消息回查。
- 發送方收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
- 發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟 4 對半事務消息進行操作。
總體而言RocketMQ事務消息分為兩條主線:
- 發送流程:發送half message(半消息),執行本地事務,發送事務執行結果
- 定時任務回查流程:MQ定時任務掃描半消息,回查本地事務,發送事務執行結果
源碼相關
Producer發送事務半消息的(prepare)
在本地應用發送事務消息的核心類是TransactionMQProducer,該類通過繼承DefaultMQProducer來復用大部分發送消息相關的邏輯,這個類的代碼量非常少只有100來行,下面是這個類的sendMessageTransaction方法
這里的transactionListener就是上面所說的消息回查的類,它提供了2個方法:
- executeLocalTransaction
執行本地事務
- checkLocalTransaction
回查本地事務
接著看DefaultMQProducer.sendMessageInTransaction()方法:
該方法主要做了以下事情
- 給消息打上事務消息相關的tag,用于broker區分普通消息和事務消息
- 發送半消息(half message)
- 發送成功則由transactionListener執行本地事務
- 執行endTransaction方法,告訴 broker 執行 commit/rollback。
執行本地事務
Producer 半事務消息發送成功后,會調用transactionListener.executeLocalTransaction方法執行本地事務。只有半消息發送成功后,才會執行本地事務,如果半消息發送失敗,則設置回滾。
結束事務(commit/rollback)
本地事務執行后,則調用this.endTransaction()方法,根據本地事務執行狀態,去提交事務或者回滾事務。
如果半消息發送失敗或本地事務執行失敗告訴服務端是刪除半消息,半消息發送成功且本地事務執行成功則告訴服務端生效半消息
Broker端處理事務消息
Broker端通過SendMessageProcessor.processRequest()方法接收處理 Producer 發送的消息 最后會調用到SendMessageProcessor.sendMessage(),判斷消息類型,進行消息存儲。
存儲半消息
代碼 prepareMessage(msgInner) :
在這一步,備份消息的原主題名稱與原隊列ID,然后取消事務消息的消息標簽,重新設置消息的主題為:RMQ_SYS_TRANS_HALF_TOPIC,隊列ID固定為0。與其他普通消息區分開,然后完成消息持久化。
到這里,Broker 就初步處理完了 Producer 發送的事務半消息。
半消息事務回查
兩段式協議發送與提交回滾消息,執行完本地事務消息的狀態為UNKNOW時,結束事務不做任何操作。通過事務狀態定時回查得到發送端的事務狀態是rollback或commit。
通過TransactionalMessageCheckService線程定時去檢測RMQ_SYS_TRANS_HALF_TOPIC主題中的消息,回查消息的事務狀態。
- RMQ_SYS_TRANS_HALF_TOPIC
prepare消息的主題,事務消息首先先進入到該主題。
- RMQ_SYS_TRANS_OP_HALF_TOPIC
當消息服務器收到事務消息的提交或回滾請求后,會將消息存儲在該主題下。
Broker處理END_TRANSACTION
當Producer或者回查定時任務提交/回滾事務的時候,Broker如何處理事務消息提交、回滾命令的?其核心實現如下:
- 根據commitlogOffset找到消息
- 如果是提交動作,就恢復原消息的主題與隊列,再次存入commitlog文件進而轉到消息消費隊列,供消費者消費,然后將原預處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理
- 回滾消息,則直接將原預處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理。
整體實現流程
如果消費端消費失敗了怎么辦?
如果有消息消費失敗了,則將失敗的消息回傳給broker,即重新寫入commitLog文件,消費者將重新消費;如果消息回傳的時候,consumer和broker之間網絡斷開,則consumer會調用submitConsumeRequestLater()方法,在consumer端進行重新消費,如果仍然消費失敗,會不斷重試直到達到默認的16次,你可以使用msg.getReconsumeTimes()方法來獲取當前重試次數,如果重試次數足夠多之后仍然無法消費成功,必須通過工單、日志等方式進行人工干預以讓producer事務進行回退處理。
Producer發送半消息失敗
可能由于網絡或者mq故障,導致 Producer 訂單系統 發送半消息(prepare)失敗。
這時訂單系統可以執行回滾操作,比如“訂單關閉”等,走逆向流程退款給用戶。
半消息發送成功,本地事務執行失敗
如果訂單系統發送的半消息成功了,但是執行本地事務失敗了,如更新訂單狀態為“已完成”。
這種情況下,執行本地事務失敗后,會返回rollback給 MQ,MQ會刪除之前發送的半消息。 也就不會調用優惠券系統了。
半消息發送成功,沒收到MQ返回的響應
假如訂單系統發送半消息成功后,沒有收到MQ返回的響應。
這個時候可能是因為網絡問題,或者其他異常報錯,訂單系統誤以為發送MQ半消息失敗,執行了逆向回滾流程。
但這個時候其實mq已經保存半消息成功了,那這個消息怎么處理?
這個時候MQ的后臺消息回查定時任務TransactionalMessageCheckService會每隔1分鐘掃描一次半消息隊列,判斷是否需要消息回查,然后回查訂單系統的本地事務,這時MQ就會發現訂單已經變成“已關閉”,此時就要發送rollback請求給mq,刪除之前的半消息。
如果commit/rollback失敗了
這個其實也是通過定時任務TransactionalMessageCheckService,它會發現這個消息超過一定時間還沒有進行二階段處理,就會回查本地事務。
小結
消息隊列RocketMQ分布式事務消息不僅可以實現應用之間的解耦,又能保證數據的最終一致性。同時,傳統的大事務可以被拆分為小事務,不僅能提升效率,還不會因為某一個關聯應用的不可用導致整體回滾,從而最大限度保證核心系統的可用性。在極端情況下,如果關聯的某一個應用始終無法處理成功,也只需對當前應用進行補償或數據訂正處理,而無需對整體業務進行回滾。
從RocketMQ事務型消息鏈路體現了面向失敗的設計思路,也體現了事務型系統的嚴謹性,在第二階段的消息沒有送達的時候,broker會主動請求producer端去做check,producer做完check后會將事務的狀態再次返回。雖然說實現最終一致的方案有很多,但是事務型消息是比較優雅實現方式之一。
本文轉載自微信公眾號「小汪哥寫代碼」,可以通過以下二維碼關注。轉載本文請聯系小汪哥寫代碼公眾號。