分布式事務實現方案:一文詳解RocketMQ事務消息
常見的分布式事務實現方案有以下幾種:兩階段提交(2PC)、兩階段提交(2PC)、補償事務(Saga)、MQ事務消息等。今天就講一下 RocketMQ 的事務消息,是一種非常特殊的分布式事務實現方案,基于半消息(Half Message)機制實現的。 看完這篇想一下,RocketMQ事務消息到底能不能保證分布式系統中數據的強一致性?
實現原理
RocketMQ事務消息執行流程如下:
- 生產者將消息發送至RocketMQ服務端。
- RocketMQ服務端將消息持久化成功之后,向生產者返回Ack確認消息已經發送成功,此時消息被標記為"暫不能投遞",這種狀態下的消息即為半事務消息(Half Message)。
- 生產者開始執行本地事務邏輯。
- 生產者根據本地事務執行結果向服務端提交二次確認結果(Commit或是Rollback),服務端收到確認結果后處理邏輯如下:
二次確認結果為Commit:服務端將半事務消息標記為可投遞,并投遞給消費者。
二次確認結果為Rollback:服務端將回滾事務,不會將半事務消息投遞給消費者。
- 在斷網或者是生產者應用重啟的特殊情況下,若服務端未收到發送者提交的二次確認結果,或服務端收到的二次確認結果為Unknown未知狀態,經過固定時間后,服務端將對消息生產者即生產者集群中任一生產者實例發起消息回查。
- 生產者收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
- 生產者根據檢查到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務消息進行處理。
圖片
代碼實現
RocketMQ事務消息示例如下:
//演示demo,模擬訂單表查詢服務,用來確認訂單事務是否提交成功。
private static boolean checkOrderById(String orderId) {
return true;
}
//演示demo,模擬本地事務的執行結果。
private static boolean doLocalTransaction() {
return true;
}
public static void main(String[] args) throws ClientException {
ClientServiceProvider provider = new ClientServiceProvider();
MessageBuilder messageBuilder = new MessageBuilderImpl();
//構造事務生產者:事務消息需要生產者構建一個事務檢查器,用于檢查確認異常半事務的中間狀態。
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* 事務檢查器一般是根據業務的ID去檢查本地事務是否正確提交還是回滾,此處以訂單ID屬性為例。
* 在訂單表找到了這個訂單,說明本地事務插入訂單的操作已經正確提交;如果訂單表沒有訂單,說明本地事務已經回滾。
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// 錯誤的消息,直接返回Rollback。
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
//開啟事務
final Transaction transaction;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
//事務開啟失敗,直接退出。
return;
}
Message message = messageBuilder.setTopic("topic")
//設置消息索引鍵,可根據關鍵字精確查找某條消息。
.setKeys("messageKey")
//設置消息Tag,用于消費端根據指定Tag過濾消息。
.setTag("messageTag")
//一般事務消息都會設置一個本地事務關聯的唯一ID,用來做本地事務回查的校驗。
.addProperty("OrderId", "xxx")
//消息體。
.setBody("messageBody".getBytes())
.build();
//發送半事務消息
final SendReceipt sendReceipt;
try {
sendReceipt = producer.send(message, transaction);
} catch (ClientException e) {
//半事務消息發送失敗,事務可以直接退出并回滾。
return;
}
/**
* 執行本地事務,并確定本地事務結果。
* 1. 如果本地事務提交成功,則提交消息事務。
* 2. 如果本地事務提交失敗,則回滾消息事務。
* 3. 如果本地事務未知異常,則不處理,等待事務消息回查。
*
*/
boolean localTransactionOk = doLocalTransaction();
if (localTransactionOk) {
try {
transaction.commit();
} catch (ClientException e) {
// 業務可以自身對實時性的要求選擇是否重試,如果放棄重試,可以依賴事務消息回查機制進行事務狀態的提交。
e.printStackTrace();
}
} else {
try {
transaction.rollback();
} catch (ClientException e) {
// 建議記錄異常信息,回滾異常時可以無需重試,依賴事務消息回查機制進行事務狀態的提交。
e.printStackTrace();
}
}
}
注意事項
- 冪等性: 消費者處理消息時需要確保業務邏輯的冪等性,以應對消息可能的重復消費。
- 超時和監控: 設置合理的超時時間,并監控事務消息的性能
總結
RocketMQ 事務消息是分布式事務中一種常見的實現方案,只是把發送消息和本地事務放在一個事務中,并且只保證最終一致性,無法保證強一致性。 原因有兩點:
- 執行完成本地事務后,在commit事務消息之前,這段時間內數據是不一致的,所以只是保證了發送消息和本地事務的最終一致性。
- 在commit事務消息之后,然后把消息投遞給消費者。至于消費者是否消費消息,什么時候消費?也都是不可控的,所以也只能盡量保證數據最終一致性。