實戰與原理:如何基于RocketMQ實現分布式事務?
使用事務消息
在DailyMart系統中,用戶發起支付后,訂單系統需要調用庫存服務執行庫存扣減邏輯。
由于這是跨服務調用,因此會產生分布式事務。在這里,我們使用RocketMQ的事務消息來實現分布式事務。
1、首先,在訂單服務的應用服務層處理支付邏輯,并調用RocketMQ發送事務消息:
@Override
public String payment(String orderSn) {
// todo 集成支付寶支付
// 支付流水號
String outOrderNo = IdUtils.get32UUID();
TradeOrder tradeOrder = Optional.ofNullable(tradeOrderService.getByOrderSn(orderSn)).orElseThrow(() -> new BusinessException("訂單編號不存在"));
// 如果訂單處于待支付狀態
if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.WAITING_PAYMENT.getStatus())) {
OrderPaidEvent orderPaidEvent = new OrderPaidEvent(orderSn, outOrderNo);
TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID");
if (SendStatus.SEND_OK == sendResult.getSendStatus() && sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
return tradeOrder.getOrderSn();
} else {
throw new BusinessException("支付失敗...");
}
} else {
throw new BusinessException("訂單已支付,請勿重復提交...");
}
}
2、在訂單服務的基礎設施層,創建一個類實現 RocketMQLocalTransactionListener 接口:
該接口有兩個方法:
- executeLocalTransaction:用于執行本地事務。
- checkLocalTransaction:在RocketMQ執行消息回查時檢查本地事務執行結果,用于確定消息提交還是回滾。
@Component
@Slf4j
public class OrderPaidTransactionConsumer implements RocketMQLocalTransactionListener {
@Resource
private TransactionTemplate transactionTemplate;
@Resource
private TradeOrderService tradeOrderService;
/**
* 執行本地事務
* 將訂單狀態修改成已支付
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);
try {
// 放到同一個本地事務中
this.transactionTemplate.executeWithoutResult(status -> {
String orderSn = orderPaidEvent.getOrderSn();
// 修改成待發貨
tradeOrderService.changeOrderStatus(orderSn, OrderStatusEnum.AWAITING_SHIPMENT);
});
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("修改訂單狀態失敗", e);
// ROLLBACK 則回滾消息,rocketmq將廢棄這條消息
return RocketMQLocalTransactionState.ROLLBACK;
// 如果是UNKNOWN, 則觸發回查
}
}
/**
* 檢查本地事務執行狀態
* 消息回查時,對于正在進行中的事務不要返回Rollback或Commit結果,應繼續保持Unknown的狀態。
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);
String orderSn = orderPaidEvent.getOrderSn();
TradeOrder tradeOrder = tradeOrderService.getByOrderSn(orderSn);
// 如果已經修改成待發貨說明本地事務執行成功,此時消費端可以直接消費
if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.AWAITING_SHIPMENT.getStatus())) {
return RocketMQLocalTransactionState.COMMIT;
} else {
// 這里查不到的時候返回 UNKNOWN在于,有可能事務還沒有提交,回查就開始了
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
3、在庫存服務的基礎設施層,監聽消息以執行庫存扣減邏輯:
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "dailymart_inventory_group", topic = "TRADE-ORDER", selectorExpression = "ORDER-PAID")
public class InventoryDeductionConsumer extends EnhanceMessageHandler<OrderPaidEvent> implements RocketMQListener<OrderPaidEvent> {
@Resource
private InventoryDomainService inventoryDomainService;
@Override
public void onMessage(OrderPaidEvent orderPaidEvent) {
super.dispatchMessage(orderPaidEvent);
}
@Override
protected void handleMessage(OrderPaidEvent orderPaidEvent) throws Exception {
// 執行庫存扣減邏輯
String orderSn = orderPaidEvent.getOrderSn();
inventoryDomainService.deductionInventory(orderSn);
}
}
通過以上步驟,我們完成了RocketMQ事務消息的發送,利用事務消息的特性保證分布式事務的最終一致性。與普通消息相比,事務消息在處理時需要實現 RocketMQLocalTransactionListener 接口,這是事務消息的核心。
介紹完事務消息的使用,接下來我們再來聊聊事務消息的原理。
事務消息的原理
首先,讓我們思考一下,如果不使用事務消息會有什么問題。
很容易想到的一個問題就是消息丟失。當保存訂單后由于網絡問題導致消息丟失,如下圖所示:
圖片
在不使用RocketMQ的情況下,我們往往會通過 本地消息表 + 補償重試 的機制來保證消息一定會發送出去。其原理可以參考上篇文章 [Dailymart26:微服務中躲不過的坑 - 分布式事務]。
那RocketMQ是如何解決這個問題的呢?
1. 發送half消息,探測MQ是否正常
在基于RocketMQ的事務消息中,我們不是先執行自身的訂單支付邏輯,而是先讓訂單系統發送一條 half消息 到MQ去。這個half消息本質上是一個訂單支付成功的消息,只不過此時庫存系統是看不見這個half消息的。然后,我們等待接收這個half消息寫入成功的響應通知。
圖片
發送half消息的本質其實是為了探測MQ是否仍然正常運行。但問題來了,如上所述,消息會發生丟失,那么half消息丟失怎么辦呢?
2. half消息發送失敗
在發送half消息時,由于網絡原因或者MQ直接掛了,就會導致half消息發送失敗。這個時候訂單系統需要執行一系列的回滾操作。在我們的場景中,應該執行退款操作,將錢退還給用戶,并告知用戶交易失敗。
3. half消息成功,訂單系統執行自己的業務邏輯
如果成功收到half消息的正常響應,此時訂單系統應該執行自己的業務邏輯。在我們這個場景中,就是修改訂單數據庫狀態,將其修改為待發貨狀態。這部分邏輯就對應上述代碼中的executeLocalTransaction()方法。
圖片
4. 訂單本地事務執行失敗
如果訂單系統執行本地事務失敗,則需要發送一個rollback請求給MQ,讓其刪除這條half消息。
圖片
5. 訂單本地事務執行成功
如果訂單系統的本地事務執行正常,此時需要發送一個commit請求給MQ,要求MQ對之前的half消息進行commit操作,這樣庫存系統就可以消費這條消息了。
圖片
訂單創建消息處于half狀態時,庫存系統是看不見它的。必須等到訂單系統執行commit請求,消息被commit后,庫存系統才能看到并獲取這條消息進行后續處理。
6. half消息發送成功,但是沒收到half的響應
以上就是RocketMQ事務消息的正向流程。
然而,還有一個問題:如果訂單系統發送half消息成功后卻沒有收到half消息的響應,該如何處理呢?
在這種情況下,訂單系統可能會誤以為是發送half消息到MQ失敗了。訂單系統就會執行回滾流程,退還支付金額,關閉訂單。
圖片
然而,此時MQ系統中已經存在了一條half消息。這條half消息又該如何處理呢?
在RocketMQ中,有一套補償流程。RocketMQ會定期掃描處于half狀態的消息。如果一直沒有對這個消息執行 commit/rollback 操作,超過了一定的時間,RocketMQ就會回調你的訂單系統的一個接口,用以確認你本地事務的情況。
當訂單系統收到MQ的回查請求時,就需要檢索一下數據庫,根據訂單狀態決定執行commit還是rollback。
這部分邏輯就對應上述代碼中checkLocalTransaction()方法。
圖片
7. rollback 或者 commit 失敗怎么辦?
通過上述說明,可以看到,RocketMQ是根據rollback或commit操作來決定half消息的狀態的。如果業務系統執行了commit操作,則將half消息設置為可見,庫存系統可以消費;如果業務系統執行了rollback操作,MQ就會刪除half消息。那么問題來了:如果訂單系統在執行rollback或commit操作時失敗又該如何處理呢?
這時候仍然依賴于前文提到的回查機制。
由于此時MQ中的消息一直處于half狀態,超過一定的超時時間后,MQ會發現這個half消息有問題,然后回調你的訂單系統的接口。此時訂單系統需要根據訂單狀態來決定執行commit請求還是rollback請求。
以上,就是RocketMQ事務消息的原理。結合文章開頭的代碼,是不是已經很清晰了呢?