成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

分布式事務實現方案:一文詳解RocketMQ事務消息

云計算 分布式
RocketMQ 事務消息是分布式事務中一種常見的實現方案,只是把發送消息和本地事務放在一個事務中,并且只保證最終一致性,無法保證強一致性。

常見的分布式事務實現方案有以下幾種:兩階段提交(2PC)、兩階段提交(2PC)、補償事務(Saga)、MQ事務消息等。今天就講一下 RocketMQ 的事務消息,是一種非常特殊的分布式事務實現方案,基于半消息(Half Message)機制實現的。 看完這篇想一下,RocketMQ事務消息到底能不能保證分布式系統中數據的強一致性?

實現原理

RocketMQ事務消息執行流程如下:

  1. 生產者將消息發送至RocketMQ服務端。
  2. RocketMQ服務端將消息持久化成功之后,向生產者返回Ack確認消息已經發送成功,此時消息被標記為"暫不能投遞",這種狀態下的消息即為半事務消息(Half Message)。
  3. 生產者開始執行本地事務邏輯。
  4. 生產者根據本地事務執行結果向服務端提交二次確認結果(Commit或是Rollback),服務端收到確認結果后處理邏輯如下:

二次確認結果為Commit:服務端將半事務消息標記為可投遞,并投遞給消費者。

二次確認結果為Rollback:服務端將回滾事務,不會將半事務消息投遞給消費者。

  1. 在斷網或者是生產者應用重啟的特殊情況下,若服務端未收到發送者提交的二次確認結果,或服務端收到的二次確認結果為Unknown未知狀態,經過固定時間后,服務端將對消息生產者即生產者集群中任一生產者實例發起消息回查。
  2. 生產者收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
  3. 生產者根據檢查到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務消息進行處理。

圖片圖片

代碼實現

RocketMQ事務消息示例如下:

//演示demo,模擬訂單表查詢服務,用來確認訂單事務是否提交成功。
private static boolean checkOrderById(String orderId) {
    return true;
}

//演示demo,模擬本地事務的執行結果。
private static boolean doLocalTransaction() {
    return true;
}

public static void main(String[] args) throws ClientException {
    ClientServiceProvider provider = new ClientServiceProvider();
    MessageBuilder messageBuilder = new MessageBuilderImpl();
    //構造事務生產者:事務消息需要生產者構建一個事務檢查器,用于檢查確認異常半事務的中間狀態。
    Producer producer = provider.newProducerBuilder()
            .setTransactionChecker(messageView -> {
                /**
                 * 事務檢查器一般是根據業務的ID去檢查本地事務是否正確提交還是回滾,此處以訂單ID屬性為例。
                 * 在訂單表找到了這個訂單,說明本地事務插入訂單的操作已經正確提交;如果訂單表沒有訂單,說明本地事務已經回滾。
                 */
                final String orderId = messageView.getProperties().get("OrderId");
                if (Strings.isNullOrEmpty(orderId)) {
                    // 錯誤的消息,直接返回Rollback。
                    return TransactionResolution.ROLLBACK;
                }
                return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
            })
            .build();
    //開啟事務
    final Transaction transaction;
    try {
        transaction = producer.beginTransaction();
    } catch (ClientException e) {
        e.printStackTrace();
        //事務開啟失敗,直接退出。
        return;
    }
    Message message = messageBuilder.setTopic("topic")
            //設置消息索引鍵,可根據關鍵字精確查找某條消息。
            .setKeys("messageKey")
            //設置消息Tag,用于消費端根據指定Tag過濾消息。
            .setTag("messageTag")
            //一般事務消息都會設置一個本地事務關聯的唯一ID,用來做本地事務回查的校驗。
            .addProperty("OrderId", "xxx")
            //消息體。
            .setBody("messageBody".getBytes())
            .build();
    //發送半事務消息
    final SendReceipt sendReceipt;
    try {
        sendReceipt = producer.send(message, transaction);
    } catch (ClientException e) {
        //半事務消息發送失敗,事務可以直接退出并回滾。
        return;
    }
    /**
     * 執行本地事務,并確定本地事務結果。
     * 1. 如果本地事務提交成功,則提交消息事務。
     * 2. 如果本地事務提交失敗,則回滾消息事務。
     * 3. 如果本地事務未知異常,則不處理,等待事務消息回查。
     *
     */
    boolean localTransactionOk = doLocalTransaction();
    if (localTransactionOk) {
        try {
            transaction.commit();
        } catch (ClientException e) {
            // 業務可以自身對實時性的要求選擇是否重試,如果放棄重試,可以依賴事務消息回查機制進行事務狀態的提交。
            e.printStackTrace();
        }
    } else {
        try {
            transaction.rollback();
        } catch (ClientException e) {
            // 建議記錄異常信息,回滾異常時可以無需重試,依賴事務消息回查機制進行事務狀態的提交。
            e.printStackTrace();
        }
    }
}

注意事項

  • 冪等性: 消費者處理消息時需要確保業務邏輯的冪等性,以應對消息可能的重復消費。
  • 超時和監控: 設置合理的超時時間,并監控事務消息的性能

總結

RocketMQ 事務消息是分布式事務中一種常見的實現方案,只是把發送消息和本地事務放在一個事務中,并且只保證最終一致性,無法保證強一致性。 原因有兩點:

  1. 執行完成本地事務后,在commit事務消息之前,這段時間內數據是不一致的,所以只是保證了發送消息和本地事務的最終一致性。
  2. 在commit事務消息之后,然后把消息投遞給消費者。至于消費者是否消費消息,什么時候消費?也都是不可控的,所以也只能盡量保證數據最終一致性。
責任編輯:武曉燕 來源: 一燈架構
相關推薦

2025-04-29 04:00:00

分布式事務事務消息

2021-06-28 10:03:44

分布式數據庫架構

2022-06-27 08:21:05

Seata分布式事務微服務

2022-05-30 10:37:35

分布式事務反向補償

2023-11-06 13:15:32

分布式事務Seata

2024-06-11 13:50:43

2024-03-29 13:30:41

分布式事務節點

2019-10-10 09:16:34

Zookeeper架構分布式

2025-06-04 01:00:00

2024-01-26 13:17:00

rollbackMQ訂單系統

2023-01-06 09:19:12

Seata分布式事務

2024-08-19 09:05:00

Seata分布式事務

2022-07-10 20:24:48

Seata分布式事務

2017-07-26 15:08:05

大數據分布式事務

2022-06-21 08:27:22

Seata分布式事務

2021-06-16 08:33:02

分布式事務ACID

2019-11-04 08:38:45

分布式事務主流TCC

2023-09-04 08:00:53

提交事務消息

2019-08-19 10:24:33

分布式事務數據庫

2020-03-31 08:05:23

分布式開發技術
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久久久久av | 日本淫视频 | 一区二区av | 精品国产乱码久久久久久蜜退臀 | 日韩一级黄色片 | 日韩欧美中文 | 国产精品视频在线播放 | 国产精品久久久久9999鸭 | 91精品国产91久久久久久最新 | 精品久久久久久久 | 成人做爰999 | 国产91在线 | 亚洲 | 国产精品视频500部 a久久 | 欧美精产国品一二三区 | 日韩小视频在线 | 蜜桃av人人夜夜澡人人爽 | 免费av播放 | 精品综合 | 日韩精品久久久 | 亚洲国产成人一区二区 | 久久精品91久久久久久再现 | 丁香六月激情 | 一区二区免费看 | 午夜欧美 | 精品久久99| 欧美精品久久 | 青青草久久 | 亚洲国产成人在线视频 | av中文字幕在线 | 国产免费人成xvideos视频 | 午夜专区 | 久久国 | 亚洲精品永久免费 | 久色一区 | 国产精品视屏 | 亚洲欧美一区在线 | 午夜精品一区二区三区在线观看 | 欧美一级欧美一级在线播放 | 欧美精品久久久久久 | 久久久国产精品入口麻豆 | 91精品久久久久久久久中文字幕 |