RocketMQ事務(wù)消息解析!
單體架構(gòu)下的事務(wù)
在單體系統(tǒng)的開(kāi)發(fā)過(guò)程中,假如某個(gè)場(chǎng)景下需要對(duì)數(shù)據(jù)庫(kù)的多張表進(jìn)行操作,為了保證數(shù)據(jù)的一致性,一般會(huì)使用事務(wù),將所有的操作全部提交或者在出錯(cuò)的時(shí)候全部回滾。
以創(chuàng)建訂單為例,假設(shè)下單后需要做兩個(gè)操作:
?在訂單表生成訂單。
在積分表增加本次訂單增加的積分記錄。
在單體架構(gòu)下只需使用@Transactional開(kāi)啟事務(wù),就可以保證數(shù)據(jù)的一致性。
@Transactional
public void order() {
String orderId = UUID.randomUUID().toString();
// 生成訂單
orderService.createOrder(orderId);
// 增加積分
creditService.addCredits(orderId);
}
?
但在分布式架構(gòu)下,訂單系統(tǒng)和積分系統(tǒng)可能是兩個(gè)獨(dú)立的服務(wù),此時(shí)就不能使用上述的方法開(kāi)啟事務(wù)了,因?yàn)樗鼈儾惶幱谕粋€(gè)事務(wù)中。
- 在出錯(cuò)的情況下,無(wú)法進(jìn)行全部回滾,只能對(duì)當(dāng)前服務(wù)的事務(wù)進(jìn)行回滾。
所以就有可能出現(xiàn)訂單生成成功但是積分服務(wù)增加積分失敗的情況(也可能相反),此時(shí)數(shù)據(jù)處于不一致的狀態(tài)。
分布式架構(gòu)下的事務(wù)
以下單流程為例,在分布式架構(gòu)下的處理流程如下:
?訂單服務(wù)生成訂單。
發(fā)送訂單生成的MQ消息,積分服務(wù)訂閱消息,有新的訂單生成之后消費(fèi)消息,增加對(duì)應(yīng)的積分記錄。
普通MQ消息存在的問(wèn)題
?假如訂單創(chuàng)建成功,MQ消息發(fā)送成功,但是order方法在返回的前一刻,服務(wù)突然宕機(jī)。
由于開(kāi)啟了事務(wù),事務(wù)還未提交(方法結(jié)束后才會(huì)正常提交)。
所以訂單表并未生成記錄,但是MQ卻已經(jīng)發(fā)送成功并且被積分服務(wù)消費(fèi),此時(shí)就會(huì)存在訂單未創(chuàng)建但是積分記錄增加的情況。
假如先發(fā)送MQ消息再創(chuàng)建訂單,如果MQ消息發(fā)送成功,創(chuàng)建訂單失敗,那么同樣處于不一致的狀態(tài)。
@Transactional
public void order() {
String orderId = UUID.randomUUID().toString();
// 創(chuàng)建訂單
Order order = orderService.createOrder(orderDTO.getOrderId());
// 發(fā)送訂單創(chuàng)建的MQ消息
sendOrderMessge(order);
return;
}
可以使用RocketMQ事務(wù)消息解決上述問(wèn)題。
RocketMQ事務(wù)消息基礎(chǔ)流程
?Apache RocketMQ在4.3.0版中已經(jīng)支持分布式事務(wù)消息。
事務(wù)消息是 RocketMQ 提供的一種消息類(lèi)型,支持在分布式場(chǎng)景下保障消息生產(chǎn)和本地事務(wù)的最終一致性。
RocketMQ采用了2PC的思想來(lái)實(shí)現(xiàn)了提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來(lái)處理二階段超時(shí)或者失敗的消息。
基本流程
?第一階段:
- 發(fā)送 Message,Half Message,即半事務(wù)消息。
- 此類(lèi)型的 Message 是不會(huì)被 Consumer 消費(fèi)。
第二階段:如果半事務(wù)消息投遞成功,則會(huì)開(kāi)始執(zhí)行本地事務(wù)。
分為如下三種 Case:
- 本地事務(wù)執(zhí)行成功:
- 會(huì)向 Broker 發(fā)送 commit 消息,被 commit 過(guò)后的 Message 才能被 Consumer 消費(fèi)到。
- 本地事務(wù)執(zhí)行失敗:
會(huì)向 Broker 發(fā)送 rollback 消息,Broker 則會(huì)將剛剛投遞的半事務(wù)消息刪除,從而保證上下游數(shù)據(jù)的一致性。
如果 Producer 實(shí)例或者網(wǎng)絡(luò)出現(xiàn)了問(wèn)題,Producer 沒(méi)能及時(shí)地將本地事務(wù)執(zhí)行的結(jié)果通知 Broker。
Broker 會(huì)通過(guò)掃描發(fā)現(xiàn)某條 Message 長(zhǎng)時(shí)間處于半事務(wù)消息狀態(tài)。
Broker 會(huì)主動(dòng)地向 Producer 詢(xún)問(wèn)此 Message 對(duì)應(yīng)的事務(wù)狀態(tài)。
值得注意的是:
?
RocketMQ 并不會(huì)無(wú)休止的的信息事務(wù)狀態(tài)回查,默認(rèn)回查 15 次。
如果 15 次回查還是無(wú)法得知事務(wù)狀態(tài),RocketMQ 默認(rèn)回滾該消息。
RocketMQ事務(wù)消息使用限制
?
事務(wù)消息不支持延時(shí)消息和批量消息。
事務(wù)性消息可能不止一次被檢查或消費(fèi),所以消費(fèi)者端需要做好消費(fèi)冪等。
事務(wù)消息的生產(chǎn)者 ID 不能與其他類(lèi)型消息的生產(chǎn)者 ID 共享。
- 與其他類(lèi)型的消息不同,事務(wù)消息允許反向查詢(xún)、MQ服務(wù)器能通過(guò)它們的生產(chǎn)者 ID 查詢(xún)到消費(fèi)者。
RocketMQ事務(wù)消息基本原理
采用2PC兩階段設(shè)計(jì)。
?
將 Message 原本真實(shí)的 Topic 和 MessageQueue 進(jìn)行備份。
- 放入到PROPERTY_REAL_TOPIC、PROPERTY_REAL_QUEUE_ID中保存。
將消息投遞到一個(gè)內(nèi)部Topic中RMQ_SYS_TRANS_HALF_TOPIC,該隊(duì)列專(zhuān)門(mén)存儲(chǔ)事務(wù)消息。
所有的 Half Message 全部都寫(xiě)入到 queueId 為 0 的 MessageQueue。
因?yàn)橐粋€(gè) Topic 下只有 1 個(gè) MessageQueue:
- 這個(gè) Topic 下的所有 Message 就是全局有序的,它們會(huì)按照先來(lái)后到的順序被消費(fèi)。
如果本地事務(wù)執(zhí)行成功進(jìn)行Commit,則將RMQ_SYS_TRANS_HALF_TOPIC 隊(duì)列中的消息投遞到真實(shí)的Topic中,供后續(xù)流程執(zhí)行。
- 并刪除這條 Half Message ,但刪除也是假刪除,只是給 Message 打上一個(gè)刪除的 Tag。
如果本地事務(wù)執(zhí)行失敗進(jìn)行rollback,則直接刪除這條 Half Message ,但刪除也是假刪除。
如果本地事務(wù)遲遲沒(méi)有返回結(jié)果 (默認(rèn)時(shí)間是6s),則會(huì)觸發(fā)事務(wù)回查機(jī)制
- 執(zhí)行回查之前需要校驗(yàn)檢查次數(shù)是否到達(dá)了最大值(需要手動(dòng)設(shè)置,沒(méi)有默認(rèn)值)。
- 或者是當(dāng)前 Half Message 存在是否超過(guò)了 Message 保存的上限,即 3天。
- 如果滿(mǎn)足上面條件中的一種Half Message 會(huì)被放進(jìn) TRANS_CHECK_MAX_TIME_TOPIC Topic 當(dāng)中。
- 一旦判定為需要執(zhí)行事務(wù)回查邏輯,那么當(dāng)前這條 Half Message 就算已經(jīng)被消費(fèi)了。
- 在沒(méi)達(dá)到最大的校驗(yàn)次數(shù)之前,都還需要將其投遞到事務(wù)隊(duì)列當(dāng)中,以便下次重試時(shí)再次執(zhí)行 Check 邏輯。
- 如果回查成功則刪除投遞的 Half Message。
源碼解讀
發(fā)送事務(wù)消息調(diào)用的是TransactionMQProducer的sendMessageInTransaction方法:
主要有以下幾個(gè)步驟:
?獲取事務(wù)監(jiān)聽(tīng)器TransactionListener,如果獲取為空或者本地事務(wù)執(zhí)行器LocalTransactionExecuter為空將拋出異常。
因?yàn)樾枰ㄟ^(guò)TransactionListener或者LocalTransactionExecuter來(lái)執(zhí)行本地事務(wù),所以不能為空。
在消息中設(shè)置prepared屬性,此時(shí)與普通消息(非事務(wù)消息)相比多了PROPERTY_TRANSACTION_PREPARED屬性。
調(diào)用send方法發(fā)送prepared消息也就是half消息,發(fā)送消息的流程與普通消息一致。
根據(jù)消息的發(fā)送結(jié)果判斷:
- 如果發(fā)送成功執(zhí)行本地事務(wù),并返回本地事務(wù)執(zhí)行結(jié)果狀態(tài),如果返回的執(zhí)行狀態(tài)結(jié)果為空,將本地事務(wù)狀態(tài)設(shè)置為UNKNOW。
- 發(fā)送成功之外的其他情況,包括FLUSH_DISK_TIMEOUT刷盤(pán)超時(shí)、FLUSH_SLAVE_TIMEOUT和SLAVE_NOT_AVAILABLE從節(jié)點(diǎn)不可用三種情況。
- 此時(shí)意味著half消息發(fā)送失敗,本地事務(wù)狀態(tài)置為ROLLBACK_MESSAGE回滾消息。
調(diào)用endTransaction方法結(jié)束事務(wù)。
參考
《RocketMQ技術(shù)內(nèi)幕》
https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md。
https://github.com/apache/rocketmq/blob/master/docs/cn/design.md。