成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

訂單系統中的數據一致性方案及RocketMQ事務消息詳解

開發 架構
生產中存在兩種常用的解決方案:TCC和可靠消息最終一致性。前者要求強一致,后者要求最終一致。強一致主要用于核心模塊,例如交易/訂單等。最終一致一般用于邊緣模塊例如庫存,通過mq去通知,保證最終一致性,也可以業務解耦。

數據一致性是確保業務操作正確執行的基礎,本文將以電商系統為例,詳細分析其分布式系統中的一致性問題。訂單核心流程:

訂單服務 -> 創建訂單 -> 庫存服務 -> 扣減庫存 -> 積分服務 -> 增加積分 -> 倉儲服務 -> 通知發貨

分布式一致性技術方案

生產中存在兩種常用的解決方案:TCC和可靠消息最終一致性。前者要求強一致,后者要求最終一致。

強一致主要用于核心模塊,例如交易/訂單等。最終一致一般用于邊緣模塊例如庫存,通過mq去通知,保證最終一致性,也可以業務解耦。

TCC:

訂單服務、庫存服務、積分服務 -> 綁定為一個TCC事務;

撤銷訂單時,回滾扣減庫存和增加積分。

可靠消息最終一致性:

可以去發送一個請求給消息中間件,由中間件保證一定會把消息交給下游的庫存服務去扣減庫存,倉儲服務去通知發貨等;

如果這個過程中有消息發送失敗,則可靠消息中間件應該保證不停的重試投遞消息。

本文重點分析如何利用RocketMQ的事務消息實現最終一致性,TCC事務將在另外一篇文章分享。

事務消息

RocketMQ的事務消息有兩個核心概念(流程):

  • Half Message,半消息

暫時不能被 Consumer消費的消息。Producer已經把消息發送到 Broker端,但是此消息的狀態被標記為不能投遞,處于這種狀態下的消息稱為半消息。事實上,該狀態下的消息會被放在一個叫做 RMQ_SYS_TRANS_HALF_TOPIC的主題下。

當 Producer端對它二次確認后,也就是 Commit之后,Consumer端才可以消費到;那么如果是Rollback,該消息則會被刪除,永遠不會被消費到。

  • 事務狀態回查

可能會因為網絡原因、應用問題等,導致Producer端一直沒有對這個半消息進行確認,那么這時候 Broker服務器會定時掃描這些半消息,主動找Producer端查詢該消息的狀態。

簡而言之,RocketMQ事務消息的實現原理就是基于兩階段提交和事務狀態回查,來決定消息最終是提交還是回滾的。

核心流程

結合整個訂單接口服務,分為兩個支付鏈路,一個是核心鏈路(訂單業務),一個是非核心鏈路(wms) 整個流程。

先向RocketMQ發送half msg,然后調用核心鏈路。核心鏈路要是返回失敗,就會走失敗的邏輯:退款,更改訂單狀態為取消,再給rocketmq發送callback廢棄掉剛才的消息。

如果成功,就commit msg讓消費者可以消費。如果在等待期間,一直沒有callback/commit那么mq就會走回調查詢具體的狀態。

消費者接收到消息后,消費完成就回復mq一個ack, 如果消費失敗了,mq就會重新投遞或者換一個服務投遞。使用rocketmq的half msg機制,可以實現這一套固定模式的最終一致性。

代碼實現

【核心鏈路-訂單、庫存、積分】

核心業務流程

【步驟一】:發送事務消息(half msg

springboot下,RocketMQ的集成還是很簡單的,引入
rocketmq-spring-boot-starter依賴、添加相關配置后,即可利用RocketMQTemplate的sendMessageInTransaction方法發送消息:

/**
 * 發送事務消息
 *
 * @param topic   topic
 * @param message 消息對象
 */
public void sendMessageInTransaction(String topic, Object message) {
    String transactionId = UUID.randomUUID().toString();
    TransactionSendResult result = this.rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(message)
            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
            .build(), message);
}

【步驟二】:broker回調,執行本地事務

消息發送成功之后,系統需要知道RocketMQ的broker是否成功收到了消息,這里主要借助
RocketMQTransactionListener注解實現。在成功收到回調后,會觸發executeLocalTransaction來
執行核心業務(訂單、庫存、積分等)。

@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

    /**
     * 執行本地事務,即處理核心鏈路
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
          	// 事務transactionId
            String transactionId = msg.getHeaders().get("rocketmq_TRANSACTION_ID").toString();
            // 本地事務,執行核心鏈路業務
            String payload = new String((byte[]) msg.getPayload());
            OrderTranscationMesageDTO data = JSONObject.parseObject(payload, OrderTranscationMesageDTO.class);
            orderService.executeCoreBusiness(data.getPayMoney(),data.getOrderDO(), data.getTransactionNo(),data.getPayType(),transactionId);
        } catch (Exception e) {
            log.error("本地事務執行異常:{}事務消息回滾", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
      	log.info("提交事務消息");
        return RocketMQLocalTransactionState.COMMIT;
    }

    /**
     * 校驗本地事務(broker未收到提交或回滾事務消息時主動回查)
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 事務transactionId
        String transactionId = msg.getHeaders().get("rocketmq_TRANSACTION_ID").toString();
        // 數據庫能查到,說明本地事務執行失敗,需要回滾
        if (Objects.isNull(transcationLogDao.getById(transactionId))){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

}

執行核心業務的同時,還有一個很重要的環節,即記錄事務ID。為什么要記錄事務ID呢?假想這樣一種情況:我們本地事務即核心的業務都成功執行后,需要提交RocketMQ的事務消息,只有提交后,消息才能被消費者(即非核心業務系統,如倉儲)消費,但是如果提交時,網絡出現異常,broker一直未收到怎么辦呢,這時利用transactionId,也是RocketMA的回查機制了。

/**
  * 核心業務,并記錄RocketMQ事務ID
*/
@GlobalTransactional  // seata全局事務
public void executeCoreBusiness(BigDecimal payMoney, AppDerivativeGoodsOrderDO orderDO, String transactionNo, String payType,String transactionId) {
		// 核心業務偽代碼
    orderService.execute();
    storeFeign.execute();
    scoreFeign.execute();
    
    // 數據庫記錄rocket事務消息ID 用于異常情況下的回查
    if (Objects.nonNull(transactionId)){
        //寫入事務日志
        TransactionLogDO log = new TransactionLogDO();
        log.setId(transactionId);
        log.setBusiness("order");
        log.setForeignKey(String.valueOf(orderDO.getId()));
        transcationLogDao.save(log);
    }
}

本地事務執行成功之后,記錄事務ID,即便提交時,出現網絡異常,broker遲遲未收到,也可以利用回查機制,即checkLocalTransaction方法,得知本地事務是否執行成功。

用于記錄事務的表結構:

CREATE TABLE `transaction_log` (
  `id` varchar(32) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '事務ID',
  `business` varchar(32) COLLATE utf8mb4_bin NOT NULL COMMENT '業務標識',
  `foreign_key` varchar(32) COLLATE utf8mb4_bin NOT NULL COMMENT '對應業務表中的主鍵',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

【非核心鏈路-wms倉儲】

【步驟三】:消費消息,處理其他業務

上述的步驟保證了核心業務與RocketMQ消息的一致性,即核心業務成功,消息就一定會被發送到broker。接下來就是非核心業務(如倉儲物流)監聽消息,通過@RocketMQMessageListener實現:

@RocketMQMessageListener(topic = "order_topic",consumerGroup = "order_group")
public class TestListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        if(messageExt.getReconsumeTimes() >= 3){
            log.error("消息重試已達最大次數,將通知業務人員排查問題。{}",messageExt.getMsgId());
            //消息處理,第3次處理失敗后,發送郵件通知人工介入
            sendMail(messageExt.getMsgId());
        }
        // 倉儲物流相關業務 
        wmsService.execute(messageExt.getBody());
    }
}

非核心業務,接受不了消息后,再處理相關業務,其實,此時已經與核心業務脫離了關聯,因此,不管它成功與否,核心業務都已經完成了,這也為何是最終一致性,而非強一致性。

最終一致性主要依賴的是RocketMQ的重試機制以及補償處理(比人工干預)。如上述代碼中,假若wmsService執行業務過程拋出了異常,即消息消費失敗,RocketMQ則會自動重發。默認16次,可以通過配置修改。另外,可以在重試一定次數后,做補償處理,例如,將執行失敗的任務記錄在數據庫,后續定時任務補償處理,抑或是像上述代碼,發送郵件通知相關人員。

冪等性消費

消息的重發,有可能帶來另外一個問題,重復消費。不做處理,就可能導致數據重復插入,倉儲系統就可能重復發貨。

冪等性:就是用戶對于同一操作發起的一次請求或者多次請求的結果是一致的,不會因為多次點擊而產生了副作用。

實現冪等性消費的方式有很多種,具體怎么做,根據自己的情況來看。一種常用的方式就是利用redis緩存,在執行操作之前,先到緩存中查詢,該操作是否已執行過。

總結

本文重點闡述了基于RocketMQ來實現最終一致性的分布式事務案例。

責任編輯:姜華 來源: 今日頭條
相關推薦

2021-03-04 06:49:53

RocketMQ事務

2023-12-01 13:51:21

數據一致性數據庫

2019-01-15 17:58:03

微服務架構數據

2019-12-17 08:40:33

微服務架構數據

2009-06-18 09:18:08

Oracle檢索數據數據一致性事務恢復

2021-11-01 21:15:54

微服務系統數據

2025-03-27 08:20:54

2023-08-22 09:32:44

邊緣計算管理

2023-11-22 12:55:59

微服務架構數據庫

2022-07-21 06:54:28

微服務系統RocketMQ

2018-09-11 10:46:10

緩存數據庫一致性

2023-09-07 08:11:24

Redis管道機制

2024-12-26 15:01:29

2021-10-18 10:30:59

流計算阿里云

2021-10-13 09:55:11

流計算引擎數據

2021-12-05 21:06:27

軟件

2021-06-16 08:33:02

分布式事務ACID

2023-06-07 08:10:29

2023-12-27 14:23:10

微服務數據存儲

2023-05-26 07:34:50

RedisMySQL緩存
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 中文字幕日韩欧美一区二区三区 | 免费a在线 | 成人欧美一区二区三区在线观看 | 在线午夜 | 中文字幕亚洲一区二区三区 | 九九热在线免费视频 | 韩国精品一区二区三区 | 一级a爱片久久毛片 | 日本精品视频在线观看 | 国产精品高清在线 | 日韩欧美在线一区 | 91色视频在线 | 国产精品久久久久无码av | 成人国产免费视频 | 2021天天躁夜夜看 | 日本a视频 | 久久99久久99精品免视看婷婷 | 欧美不卡一区二区三区 | 午夜影院| 日韩精品视频在线免费观看 | 精品在线观看一区二区 | 亚洲高清视频在线观看 | 日本超碰 | 国产一区黄色 | 91麻豆精品国产91久久久久久久久 | 午夜精品视频在线观看 | 91手机精品视频 | 亚洲成人av | 久久三级av | 91网站在线看 | 国产精品精品视频一区二区三区 | 黄网免费 | 亚洲精品在线视频 | 日韩一区二区三区视频 | av看片网站| 午夜激情在线 | 国产乱码久久久久久 | 中文字幕日本一区二区 | 2021狠狠干| 天天色官网 | 日韩久久中文字幕 |