成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

實戰與原理:如何基于RocketMQ實現分布式事務?

開發 前端
由于此時MQ中的消息一直處于half狀態,超過一定的超時時間后,MQ會發現這個half消息有問題,然后回調你的訂單系統的接口。此時訂單系統需要根據訂單狀態來決定執行commit請求還是rollback請求。

使用事務消息

在DailyMart系統中,用戶發起支付后,訂單系統需要調用庫存服務執行庫存扣減邏輯。圖片

由于這是跨服務調用,因此會產生分布式事務。在這里,我們使用RocketMQ的事務消息來實現分布式事務。

1、首先,在訂單服務的應用服務層處理支付邏輯,并調用RocketMQ發送事務消息:

@Override
public String payment(String orderSn) {
    // todo 集成支付寶支付
    // 支付流水號
    String outOrderNo = IdUtils.get32UUID();
    TradeOrder tradeOrder = Optional.ofNullable(tradeOrderService.getByOrderSn(orderSn)).orElseThrow(() -> new BusinessException("訂單編號不存在"));

    // 如果訂單處于待支付狀態
    if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.WAITING_PAYMENT.getStatus())) {

        OrderPaidEvent orderPaidEvent = new OrderPaidEvent(orderSn, outOrderNo);

        TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID");

        if (SendStatus.SEND_OK == sendResult.getSendStatus() && sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
            return tradeOrder.getOrderSn();
        } else {
            throw new BusinessException("支付失敗...");
        }
    } else {
        throw new BusinessException("訂單已支付,請勿重復提交...");
    }
}

2、在訂單服務的基礎設施層,創建一個類實現 RocketMQLocalTransactionListener 接口:

該接口有兩個方法:

  • executeLocalTransaction:用于執行本地事務。
  • checkLocalTransaction:在RocketMQ執行消息回查時檢查本地事務執行結果,用于確定消息提交還是回滾。
@Component
@Slf4j
public class OrderPaidTransactionConsumer implements RocketMQLocalTransactionListener {
    
    @Resource
    private TransactionTemplate transactionTemplate;
    @Resource
    private TradeOrderService tradeOrderService;
    
   
  /**
     * 執行本地事務
     * 將訂單狀態修改成已支付
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);
        try {
            // 放到同一個本地事務中
            this.transactionTemplate.executeWithoutResult(status -> {
                String orderSn = orderPaidEvent.getOrderSn();
                // 修改成待發貨
                tradeOrderService.changeOrderStatus(orderSn, OrderStatusEnum.AWAITING_SHIPMENT);
            });
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("修改訂單狀態失敗", e);
            // ROLLBACK 則回滾消息,rocketmq將廢棄這條消息
            return RocketMQLocalTransactionState.ROLLBACK;
            // 如果是UNKNOWN, 則觸發回查
        }

    }

    /**
     * 檢查本地事務執行狀態
     * 消息回查時,對于正在進行中的事務不要返回Rollback或Commit結果,應繼續保持Unknown的狀態。
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);

        String orderSn = orderPaidEvent.getOrderSn();
        TradeOrder tradeOrder = tradeOrderService.getByOrderSn(orderSn);
        // 如果已經修改成待發貨說明本地事務執行成功,此時消費端可以直接消費
        if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.AWAITING_SHIPMENT.getStatus())) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            // 這里查不到的時候返回 UNKNOWN在于,有可能事務還沒有提交,回查就開始了
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

3、在庫存服務的基礎設施層,監聽消息以執行庫存扣減邏輯:

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "dailymart_inventory_group", topic = "TRADE-ORDER", selectorExpression = "ORDER-PAID")
public class InventoryDeductionConsumer extends EnhanceMessageHandler<OrderPaidEvent> implements RocketMQListener<OrderPaidEvent> {
    
    @Resource
    private InventoryDomainService inventoryDomainService;
    
    @Override
    public void onMessage(OrderPaidEvent orderPaidEvent) {
        super.dispatchMessage(orderPaidEvent);
    }
    
    @Override
    protected void handleMessage(OrderPaidEvent orderPaidEvent) throws Exception {
        // 執行庫存扣減邏輯
        String orderSn = orderPaidEvent.getOrderSn();
        inventoryDomainService.deductionInventory(orderSn);
    }
}

通過以上步驟,我們完成了RocketMQ事務消息的發送,利用事務消息的特性保證分布式事務的最終一致性。與普通消息相比,事務消息在處理時需要實現 RocketMQLocalTransactionListener 接口,這是事務消息的核心。

介紹完事務消息的使用,接下來我們再來聊聊事務消息的原理。

事務消息的原理

首先,讓我們思考一下,如果不使用事務消息會有什么問題。

很容易想到的一個問題就是消息丟失。當保存訂單后由于網絡問題導致消息丟失,如下圖所示:

圖片圖片

在不使用RocketMQ的情況下,我們往往會通過 本地消息表 + 補償重試 的機制來保證消息一定會發送出去。其原理可以參考上篇文章 [Dailymart26:微服務中躲不過的坑 - 分布式事務]。

那RocketMQ是如何解決這個問題的呢?

1. 發送half消息,探測MQ是否正常

在基于RocketMQ的事務消息中,我們不是先執行自身的訂單支付邏輯,而是先讓訂單系統發送一條 half消息 到MQ去。這個half消息本質上是一個訂單支付成功的消息,只不過此時庫存系統是看不見這個half消息的。然后,我們等待接收這個half消息寫入成功的響應通知。

圖片圖片

發送half消息的本質其實是為了探測MQ是否仍然正常運行。但問題來了,如上所述,消息會發生丟失,那么half消息丟失怎么辦呢?

2. half消息發送失敗

在發送half消息時,由于網絡原因或者MQ直接掛了,就會導致half消息發送失敗。這個時候訂單系統需要執行一系列的回滾操作。在我們的場景中,應該執行退款操作,將錢退還給用戶,并告知用戶交易失敗。

3. half消息成功,訂單系統執行自己的業務邏輯

如果成功收到half消息的正常響應,此時訂單系統應該執行自己的業務邏輯。在我們這個場景中,就是修改訂單數據庫狀態,將其修改為待發貨狀態。這部分邏輯就對應上述代碼中的executeLocalTransaction()方法。

圖片圖片

4. 訂單本地事務執行失敗

如果訂單系統執行本地事務失敗,則需要發送一個rollback請求給MQ,讓其刪除這條half消息。

圖片圖片

5. 訂單本地事務執行成功

如果訂單系統的本地事務執行正常,此時需要發送一個commit請求給MQ,要求MQ對之前的half消息進行commit操作,這樣庫存系統就可以消費這條消息了。

圖片圖片

訂單創建消息處于half狀態時,庫存系統是看不見它的。必須等到訂單系統執行commit請求,消息被commit后,庫存系統才能看到并獲取這條消息進行后續處理。

6. half消息發送成功,但是沒收到half的響應

以上就是RocketMQ事務消息的正向流程。

然而,還有一個問題:如果訂單系統發送half消息成功后卻沒有收到half消息的響應,該如何處理呢?

在這種情況下,訂單系統可能會誤以為是發送half消息到MQ失敗了。訂單系統就會執行回滾流程,退還支付金額,關閉訂單。

圖片圖片

然而,此時MQ系統中已經存在了一條half消息。這條half消息又該如何處理呢?

在RocketMQ中,有一套補償流程。RocketMQ會定期掃描處于half狀態的消息。如果一直沒有對這個消息執行 commit/rollback 操作,超過了一定的時間,RocketMQ就會回調你的訂單系統的一個接口,用以確認你本地事務的情況。

當訂單系統收到MQ的回查請求時,就需要檢索一下數據庫,根據訂單狀態決定執行commit還是rollback。

這部分邏輯就對應上述代碼中checkLocalTransaction()方法。

圖片圖片

7. rollback 或者 commit 失敗怎么辦?

通過上述說明,可以看到,RocketMQ是根據rollback或commit操作來決定half消息的狀態的。如果業務系統執行了commit操作,則將half消息設置為可見,庫存系統可以消費;如果業務系統執行了rollback操作,MQ就會刪除half消息。那么問題來了:如果訂單系統在執行rollback或commit操作時失敗又該如何處理呢?

這時候仍然依賴于前文提到的回查機制。

由于此時MQ中的消息一直處于half狀態,超過一定的超時時間后,MQ會發現這個half消息有問題,然后回調你的訂單系統的接口。此時訂單系統需要根據訂單狀態來決定執行commit請求還是rollback請求。

以上,就是RocketMQ事務消息的原理。結合文章開頭的代碼,是不是已經很清晰了呢?

責任編輯:武曉燕 來源: JAVA日知錄
相關推薦

2025-04-11 09:57:16

2022-06-21 08:27:22

Seata分布式事務

2022-08-26 00:02:03

RocketMQ單體架構MQ

2022-06-27 08:21:05

Seata分布式事務微服務

2024-06-13 09:25:14

2023-01-06 09:19:12

Seata分布式事務

2023-05-12 08:02:43

分布式事務應用

2022-07-10 20:24:48

Seata分布式事務

2019-08-19 10:24:33

分布式事務數據庫

2022-11-06 19:28:02

分布式鎖etcd云原生

2023-09-14 15:44:46

分布式事務數據存儲

2025-01-15 08:34:00

分布式事務服務

2017-07-26 15:08:05

大數據分布式事務

2020-03-31 08:05:23

分布式開發技術

2024-01-05 07:28:50

分布式事務框架

2021-08-06 08:33:27

Springboot分布式Seata

2024-06-28 09:07:19

2024-11-28 15:11:28

2023-09-14 15:38:55

云原生分布式架構

2022-06-14 10:47:00

分布式事務數據
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 最新日韩在线 | 99re在线视频 | 久久99精品久久久久久国产越南 | 亚洲成av人片在线观看无码 | 欧美国产视频 | 免费在线观看一区二区三区 | 亚洲精品观看 | 日韩精品一区二区三区 | 精品日韩在线 | 久久精品一区 | 婷婷久久精品一区二区 | 成人av观看 | 四虎永久免费地址 | 在线视频a | 精品国产乱码久久久久久牛牛 | 国产一级特黄视频 | 午夜一级做a爰片久久毛片 精品综合 | 日韩久久久久 | 成人精品毛片国产亚洲av十九禁 | 国产精品高清一区二区 | 四虎影院一区二区 | 国产99久久久国产精品下药 | 欧美1页 | 亚洲精品一区二区三区蜜桃久 | 男人av在线播放 | 亚洲免费三级 | 久草综合在线视频 | 亚洲一二三在线观看 | 日本午夜精品 | 小早川怜子xxxxaⅴ在线 | 国产精品视频一区二区三区不卡 | 欧美一级淫片007 | 久久国产精品久久久久久久久久 | 国产精品久久久久久久久免费丝袜 | 亚洲精品福利视频 | 干干干日日日 | h视频在线观看免费 | 视频1区| 国产在线中文字幕 | 91动漫在线观看 | 日韩二 |