成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

故障現(xiàn)場 | 消息發(fā)送居然有這么大的坑

開發(fā) 前端
RocketMQ事務(wù)消息是一種支持分布式事務(wù)的消息模型,將消息生產(chǎn)和消費與業(yè)務(wù)邏輯綁定在一起,確保消息發(fā)送和事務(wù)執(zhí)行的原子性,保證消息的可靠性。

1. 問題&分析

基于 MQ 進(jìn)行系統(tǒng)間的解耦真的是太香了,小艾還沉浸在喜悅中久久不能自拔。但,打臉的事已經(jīng)在路上了。。。。

1.1. 案例

昨天下班,在電梯里,物流組的晨姐偶遇了小艾,就一個技術(shù)問題向小艾進(jìn)行了反饋。具體來說,物流系統(tǒng)有一項功能,即實時監(jiān)控訂單的支付成功事件,一旦檢測到,便會為顧客準(zhǔn)備物資,進(jìn)而安排快遞發(fā)貨。今天系統(tǒng)出現(xiàn)了幾次空指針異常。查閱日志,似乎是在反查訂單信息時,沒有獲取到預(yù)期的訂單數(shù)據(jù)。但查詢物流系統(tǒng),物流單已經(jīng)成功生成,對業(yè)務(wù)操作并未造成實際影響,但這個問題還是值得注意。由于這個問題并沒有立即影響到業(yè)務(wù)流程,所以晨姐沒有在第一時間聯(lián)系小艾進(jìn)行確認(rèn)。

在小艾正準(zhǔn)備啟動IDEA尋找線索的時候,算法組的負(fù)責(zé)人龍哥急匆匆地走了過來,向小艾反映了他們團(tuán)隊遇到的一個重要問題。為了提升推薦效果,算法組也會實時監(jiān)控訂單支付成功事件,并以此為依據(jù)重新計算用戶的推薦商品。然而,今天早上,他們突然收到了一系列的報警信息,問題同樣是無法查詢到訂單信息,這個現(xiàn)象與物流系統(tǒng)的問題高度相似。

小艾隨口問道:“這個問題會自己修復(fù)嗎?”龍哥愣了一下,回答說:“以前會自動修復(fù),但剛剛那條數(shù)據(jù)還在報錯。”隨后,龍哥提供了報錯的訂單ID,小艾立即去數(shù)據(jù)庫中查詢,卻驚訝地發(fā)現(xiàn),這條數(shù)據(jù)竟然不存在!

看到這種場景,小艾有些慌神,連龍哥什么時候走的都沒有注意到。目光直勾勾的盯著電腦屏幕發(fā)呆:

@Transactional
public void paySuccess(String orderId, String token){
    // 驗證 token,保障有效性
    checkToke(token);

    // 加載訂單信息
    Order order = this.orderRepository.getById(orderId);
    if (order == null){
        throw new RuntimeException("訂單不存在");
    }
    // 支付成功,更新訂單狀態(tài)
    order.paySuccess();

    // 將變更更新到數(shù)據(jù)庫
    this.orderRepository.update(order);

    // 發(fā)送支付成功事件
    this.eventPublisher.publishEvent(new OrderPaidEvent(order));
    // 執(zhí)行其他業(yè)務(wù)邏輯
    doSomething();
}

// 監(jiān)聽變更,發(fā)布 MQ
@EventListener
public void handle(OrderPaidEvent event){
    rocketMQTemplate.convertAndSend("order_event", event);
}

1.2. 問題分析

兩個問題看起來一樣,但又有區(qū)別。當(dāng)下游在收到 MQ 消息時

  1. 無法查不到訂單,但稍后能自我修復(fù)
  2. 一直查不到訂單,數(shù)據(jù)庫里還沒有,無法進(jìn)行自我修復(fù)

小艾無意間看到 paySuccess 方法上的 @Transactional 頓時茅塞頓開。

圖片圖片

正如上圖所示:

  1. 在數(shù)據(jù)庫更新完成后,系統(tǒng)會立即發(fā)送消息隊列(MQ)消息,同時主流程會繼續(xù)執(zhí)行后續(xù)的耗時操作。
  2. 當(dāng)下游接收到MQ消息時,會進(jìn)行數(shù)據(jù)查詢。然而,由于此時主流程尚未完成事務(wù)提交,因此無法查詢到相關(guān)數(shù)據(jù),導(dǎo)致下游出現(xiàn)錯誤。
  3. 如果MQ消息消費失敗,系統(tǒng)會自動進(jìn)行重試。如果在此期間主流程已經(jīng)完成了事務(wù)提交,那么就能夠成功查詢到數(shù)據(jù),從而使得業(yè)務(wù)流程得以恢復(fù)正常。

這就完美的解釋了物流問題,那為什么算法組收到消息里的訂單ID在數(shù)據(jù)庫不存在呢?

圖片圖片

如上圖所示:

  1. 在數(shù)據(jù)庫更新完成后,系統(tǒng)會立即發(fā)送消息隊列(MQ)消息,而主流程將同時繼續(xù)執(zhí)行后續(xù)的耗時操作。
  2. 若主流程在執(zhí)行后續(xù)邏輯時發(fā)生異常,將導(dǎo)致整個事務(wù)回滾,進(jìn)而中斷處理過程。
  3. 下游系統(tǒng)接收到消息后,進(jìn)行數(shù)據(jù)反查,但由于事務(wù)已回滾,因此無法查詢到任何數(shù)據(jù)。
  4. 因為發(fā)生事務(wù)回滾,數(shù)據(jù)庫中根本就沒有這條記錄,所以即使后面有自動重試機(jī)制,也無法恢復(fù)處理邏輯。

小艾終于鎖定了問題所在,深深地吸了一口氣,釋放了緊繃的神經(jīng)。就在這時,晨姐的電話打了進(jìn)來。小艾喃喃自語:“毫無疑問,和算法部門遇到的情況一樣,被XXX訂單給堵住了。”說罷,她信心滿滿地接起了電話…

本質(zhì):該問題根本原因是==沒有保障 更新數(shù)據(jù)庫操作 與 發(fā)送消息操作這兩個業(yè)務(wù)單元之間的一致性。==

2. 解決方案

定位后,解決方案就變的非常清晰。

2.1. 方案1:使用 @TransactionalEventListener

最簡的方案就是將 @EventListener 注解 換成 @TransactionalEventListener。

2.1.1. EventListener 和 TransactionalEventListener

EventListener 和 TransactionalEventListener 都是 Spring 中用于處理事件的監(jiān)聽器。它們之間的主要區(qū)別在于它們處理事件的方式和事務(wù)管理。

  1. EventListener:這是一個通用的事件監(jiān)聽器,當(dāng)事件發(fā)布時,它會立即執(zhí)行相應(yīng)的處理方法。它不會參與到事務(wù)管理中,也就是說,即使在事務(wù)執(zhí)行過程中發(fā)生異常,EventListener 依然會執(zhí)行。
  2. TransactionalEventListener:這是一個具有事務(wù)管理功能的事件監(jiān)聽器。當(dāng)事件發(fā)布時,它會等待當(dāng)前事務(wù)完成后再執(zhí)行相應(yīng)的處理方法。這意味著,如果在事務(wù)執(zhí)行過程中發(fā)生異常,TransactionalEventListener 將不會執(zhí)行,從而確保事務(wù)的一致性。

總之,EventListener 和 TransactionalEventListener 的主要區(qū)別在于它們處理事件的方式和事務(wù)管理。在選擇使用哪種監(jiān)聽器時,需要根據(jù)實際需求和事務(wù)一致性的要求來決定。

2.1.2. 源碼示例

了解兩者的區(qū)別后,只需做一點調(diào)整便可以解決這個問題,調(diào)整如下:

/**
 * 使用 @TransactionalEventListener 替代 @EventListener 監(jiān)聽訂單支付事件,然后發(fā)送消息到 RocketMQ
 * @param event
 */
@TransactionalEventListener
public void handle(OrderPaidEvent event){
    rocketMQTemplate.convertAndSend("order_event", event);
}

如果沒有使用 Spring 的 Event 機(jī)制,但仍想實現(xiàn) @TransactionEventlistner 的效果,可以直接使用 Spring API:

private void doIfCommitted(Runnable task) {
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronization transactionSynchronization = new TransactionSynchronizationAdapter(){
                @Override
                public void afterCommit() {
                    task.run();
                }
            };
            TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);

        }else {
            task.run();
        }
    }

2.1.3. 問題&挑戰(zhàn)

這個方案確實解決了上述問題,但從一致性角度分析,還是存在設(shè)計缺陷,只是發(fā)生的概率變低而已,沒有從根本上解決問題。

在事務(wù)提交后發(fā)送 MQ 時,可能會遇到以下幾種情況,導(dǎo)致兩個操作(數(shù)據(jù)庫操作和 MQ 發(fā)送操作)之間的一致性問題:

  1. 數(shù)據(jù)庫事務(wù)提交成功,但在發(fā)送 MQ 消息時發(fā)生網(wǎng)絡(luò)故障。此時,數(shù)據(jù)庫操作已經(jīng)完成,但 MQ 消息未能成功發(fā)送。
  2. 數(shù)據(jù)庫事務(wù)提交成功,但在發(fā)送 MQ 消息時發(fā)生 MQ 服務(wù)器故障。此時,數(shù)據(jù)庫操作已經(jīng)完成,但 MQ 消息未能成功發(fā)送。
  3. 數(shù)據(jù)庫事務(wù)提交成功,但在發(fā)送 MQ 消息時發(fā)生應(yīng)用程序故障。此時,數(shù)據(jù)庫操作已經(jīng)完成,但 MQ 消息未能成功發(fā)送。
  4. 數(shù)據(jù)庫事務(wù)提交成功,但在發(fā)送 MQ 消息時發(fā)生消息丟失。此時,數(shù)據(jù)庫操作已經(jīng)完成,但 MQ 消息未能成功發(fā)送。

這個方案極為簡單,但大幅降低了錯誤概率,主要應(yīng)用于要求并不嚴(yán)格的業(yè)務(wù)場景。

2.2 方案2:RocketMQ事務(wù)消息

RocketMQ 的事務(wù)消息就是針對這個問題設(shè)計的,可以非常高效的解決這個問題。

2.2.1. 半消息以及工作原理

RocketMQ事務(wù)消息是一種支持分布式事務(wù)的消息模型,將消息生產(chǎn)和消費與業(yè)務(wù)邏輯綁定在一起,確保消息發(fā)送和事務(wù)執(zhí)行的原子性,保證消息的可靠性。

事務(wù)消息分為兩個階段:發(fā)送消息和確認(rèn)消息,確認(rèn)消息分為提交和回滾兩個操作。在提交操作執(zhí)行完畢后,消息才會被消費端消費,而在回滾操作執(zhí)行完畢后,消息會被刪除,從而達(dá)到了事務(wù)的一致性和可靠性。

事務(wù)消息的發(fā)生流程如下:

圖片圖片

  1. 生產(chǎn)者發(fā)送prepare消息到RocketMQ服務(wù)端,RocketMQ將消息存儲到本地并返回結(jié)果;
  2. 生產(chǎn)者開始執(zhí)行本地事務(wù),并根據(jù)本地事務(wù)的結(jié)果將狀態(tài)信息提交給RocketMQ服務(wù)端;
  3. 如果本地事務(wù)執(zhí)行成功,生產(chǎn)者向RocketMQ服務(wù)端發(fā)送commit消息;
  4. 如果本地事務(wù)執(zhí)行失敗,生產(chǎn)者向RocketMQ服務(wù)端發(fā)送rollback消息;
  5. RocketMQ接收到commit或rollback消息后,對消息進(jìn)行投放或刪除;

如果生成者發(fā)送 prepare 消息后,未在規(guī)定時間內(nèi)發(fā)送 commit 或 rollback 消息,RocketMQ 將進(jìn)入恢復(fù)流程,具體如下:

圖片圖片

  1. 如果在回查的時間之前沒有收到相應(yīng)的 commit 或 rollback 消息,則 RocketMQ 會將對該 prepare 消息進(jìn)行回查;
  2. 應(yīng)用程序接收到回查指令,從業(yè)務(wù)庫中獲取數(shù)據(jù),并根據(jù)業(yè)務(wù)邏輯進(jìn)行判斷,最終是 commit 還是 rollback;
  3. RocketMQ 接收到 commit 或 rollback 回復(fù)后,進(jìn)行相應(yīng)動作,從而實現(xiàn)業(yè)務(wù)操作和消息發(fā)送的一致性;

2.2.2. 源碼示例

一個簡單的示例代碼如下:

// 編寫事務(wù)監(jiān)聽器類
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    // 執(zhí)行本地事務(wù)
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        System.out.println("executeLocalTransaction " + value);
        // TODO 執(zhí)行本地事務(wù),并返回事務(wù)狀態(tài)
        // 本例假定 index 為偶數(shù)的消息執(zhí)行成功,奇數(shù)的消息執(zhí)行失敗
        if (value % 2 == 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

    // 檢查本地事務(wù)狀態(tài)
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("checkLocalTransaction " + msg.getTransactionId());
        // 模擬檢查本地事務(wù)狀態(tài),返回事務(wù)狀態(tài)
        boolean committed = prepare(true);
        if (committed) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.UNKNOW;
    }

    // 模擬操作預(yù)處理邏輯
    private boolean prepare(boolean commit) {
        System.out.println("prepare " + (commit ? "commit" : "rollback"));
        return commit;
    }

}

// 編寫發(fā)送消息的代碼
public class Producer {
    private static final String NAME_SERVER_ADDR = "localhost:9876";

    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("MyGroup");
        producer.setNamesrvAddr(NAME_SERVER_ADDR);
        // 注冊事務(wù)監(jiān)聽器
        producer.setTransactionListener(new TransactionListenerImpl());
        producer.start();

        // 發(fā)送事務(wù)消息
        String[] tags = {"TagA", "TagB", "TagC"};
        for (int i = 0; i < 3; i++) {
            Message msg = new Message("TopicTest", tags[i], ("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));
            // 在消息發(fā)送時傳遞給事務(wù)監(jiān)聽器的參數(shù)
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf("%s%n", sendResult);
        }

        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }
}

單看代碼很難理解,簡單畫了張圖,具體如下:

圖片圖片

2.2.3. 問題&挑戰(zhàn)

事務(wù)消息并不完美,存在一定的問題:

  1. 與 MQ 實現(xiàn)強(qiáng)相關(guān),并不是每個 MQ 實現(xiàn)都對事務(wù)消息提供支持;
  2. API 比較晦澀,存在一定的學(xué)習(xí)成本,同時需要對業(yè)務(wù)邏輯拆分到 Listener 中,增加理解成本;

2.3. 方案3:本地消息表

事務(wù)消息表方案是一種常用的保證消息發(fā)送與業(yè)務(wù)操作一致性的方法。該方案基于數(shù)據(jù)庫事務(wù)和消息隊列,將消息發(fā)送和業(yè)務(wù)操作放入同一個事務(wù)中,并將業(yè)務(wù)操作和消息發(fā)送的狀態(tài)記錄在數(shù)據(jù)庫的消息表中,以實現(xiàn)消息的可靠性和冪等性。

2.3.1. 設(shè)計&核心流程

整體如下圖所示:

圖片圖片

image

核心流程如下:

  1. 應(yīng)用程序開啟一個數(shù)據(jù)庫事務(wù),并在事務(wù)中執(zhí)行業(yè)務(wù)操作和消息發(fā)送;
  2. 在事務(wù)中,將業(yè)務(wù)操作和消息發(fā)送的狀態(tài)記錄到消息表中;
  3. 如果業(yè)務(wù)操作執(zhí)行成功,并且消息發(fā)送成功,提交事務(wù),否則回滾事務(wù);
  4. 定時掃描消息表,并根據(jù)消息狀態(tài)重新發(fā)送未被確認(rèn)的消息。如果消息發(fā)送成功,更新消息狀態(tài);否則根據(jù)重試次數(shù)更新消息狀態(tài)或者丟棄消息;

通過事務(wù)消息表方案,可以保證消息的可靠性。即使在消息發(fā)送失敗或應(yīng)用程序崩潰的情況下,也可以通過重新發(fā)送消息將業(yè)務(wù)操作和消息發(fā)送的狀態(tài)同步。同時,該方案可以避免消息重復(fù)發(fā)送和漏發(fā)的情況。

2.3.2. 功能封裝

清晰的流程為復(fù)用打下了基礎(chǔ),lego 對其做了封裝。

2.3.2.1. 環(huán)境準(zhǔn)備

首先,需要引入 lego 相關(guān)依賴:

<dependency>
    <groupId>com.geekhalo.lego</groupId>
    <artifactId>lego-starter</artifactId>
    <version>0.1.12 以上版本</version>
</dependency>

其次,在業(yè)務(wù)數(shù)據(jù)庫上新建一張表用于存儲消息,示例如下:

create table test_message
(
    id           bigint auto_increment primary key,

    orderly      tinyint      not null comment '是否為順序消息',

    topic        varchar(64)  not null comment 'MQ topic',
    sharding_key varchar(128) not null comment 'ShardingKey,用于選擇不同的 partition',
    tag          varchar(128) not null comment 'Message Tag 信息',

    msg_id       varchar(64)  not null comment 'Msg ID 只有發(fā)送成功后才有數(shù)據(jù)',
    msg_key      varchar(64)  not null comment 'MSG Key,用于查詢數(shù)據(jù)',
    msg          longtext     not null comment '要發(fā)送的消息',

    retry_time   tinyint      not null comment '重試次數(shù)',
    status       tinyint      not null comment '發(fā)送狀態(tài):0-初始化,1-發(fā)送成功,2-發(fā)送失敗',

    create_time  datetime     not null,
    update_time  datetime     not null,

    index idx_update_time_status(update_time, status)
);

為了兼容多種MQ類型,對發(fā)送者進(jìn)行了抽象,因此需要實現(xiàn)自己的 MessageSender。

@Component
@Getter
@Slf4j
public class TestMessageSender implements MessageSender {

    @Override
    public String send(Message message) {
        // 發(fā)送消息
    }
}

最后,就是對所有的組件進(jìn)行配置,示例代碼如下:

@Configuration
@Slf4j
public class LocalTableBasedReliableMessageConfiguration
        extends LocalTableBasedReliableMessageConfigurationSupport {

    @Autowired
    private DataSource dataSource;

    @Autowired
    private MessageSender messageSender;

    @Override
    protected DataSource dataSource() {
        return this.dataSource;
    }

    @Override
    protected String messageTable() {
        return "test_message";
    }

    @Override
    protected MessageSender createMessageSend() {
        return this.messageSender;
    }
}

其中,包括:

  1. 繼承自 LocalTableBasedReliableMessageConfigurationSupport,由父類完成基本配置;
  2. 實現(xiàn) DataSource dataSource() 方法,返回業(yè)務(wù)數(shù)據(jù)源(備注:必須與業(yè)務(wù)使用同一個數(shù)據(jù)源)
  3. 實現(xiàn) String messageTable() 方法,配置本地消息表表名;
  4. 實現(xiàn) MessageSender createMessageSend() 方法,返回 MessageSender 實例,執(zhí)行真正的消費發(fā)送;
2.2.3.2. 具體使用

ReliableMessageSender#send 在業(yè)務(wù)方法中使用,執(zhí)行可靠消息發(fā)送;

@Transactional
public void testSuccess(){
    // 業(yè)務(wù)邏輯
    Message message = buildMessage();
    // 業(yè)務(wù)邏輯
    this.reliableMessageSender.send(message);
}

除發(fā)送流程外,還需要配置補(bǔ)充機(jī)制。

ReliableMessageCompensator#compensate 周期性調(diào)度,對未發(fā)送或發(fā)送失敗的消息進(jìn)行補(bǔ)充;

4. 示例&源碼

代碼倉庫:https://gitee.com/litao851025/learnFromBug

代碼地址:https://gitee.com/litao851025/learnFromBug/tree/master/src/main/java/com/geekhalo/demo/mq/sender


責(zé)任編輯:武曉燕 來源: geekhalo
相關(guān)推薦

2021-08-03 22:26:46

Go函數(shù)分頁

2023-05-25 10:03:40

2024-04-02 08:41:10

ArrayListSubList場景

2023-11-06 06:52:51

2015-07-30 09:20:26

微軟Android Lau

2024-04-01 09:46:11

MQ消息亂序

2020-06-01 08:04:18

三目運算符代碼

2023-04-10 07:26:28

UseStateUseReducer

2020-12-17 10:23:41

死鎖LinuxLockdep

2014-03-07 10:46:49

編程語言趣味

2019-08-09 15:07:33

TomcatJaegerSpringBoot

2022-07-06 11:47:27

JAVAfor循環(huán)

2020-11-02 08:35:59

內(nèi)存數(shù)據(jù)庫Redis

2024-03-07 12:54:00

AI模型

2024-01-22 09:16:47

多線程性能優(yōu)化

2009-06-12 16:55:10

VPN客戶端故障

2025-02-28 09:30:00

?DeepSeekDeepGEMMAI

2022-11-11 09:41:04

連接池微服務(wù)數(shù)據(jù)庫

2024-01-29 09:22:59

死鎖線程池服務(wù)

2023-05-07 23:22:24

golang
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 日韩中文一区二区三区 | 亚洲人成网亚洲欧洲无码 | 中文字幕一区二区三区乱码在线 | 精品二三区 | 久久精品视频在线观看 | 久久精品亚洲一区二区三区浴池 | 中文精品视频 | 亚洲精品久久久久久一区二区 | 精品福利视频一区二区三区 | 欧美日韩精品久久久免费观看 | 亚洲欧洲精品一区 | 夜夜操av| a在线免费观看 | 日韩一区二区三区视频 | 欧美国产日韩在线 | 国产片淫级awww | 久久国产婷婷国产香蕉 | www.操com | 羞羞的视频免费在线观看 | www.啪啪.com| 黄色亚洲 | 久久久久亚洲av毛片大全 | 成人aaa视频| 欧美xxxx色视频在线观看免费 | 成人精品久久久 | 亚洲欧美激情四射 | 国产亚洲成av人片在线观看桃 | 亚洲一区在线日韩在线深爱 | 在线免费av电影 | 国产一区二区不卡 | 欧美在线| 一区二区高清 | 国产在线精品一区二区三区 | 秋霞av国产精品一区 | 欧美精品一区二区三区在线 | 日韩在线观看视频一区 | 久久久福利 | www.精品国产 | 99pao成人国产永久免费视频 | 国产小视频在线 | 欧美日韩亚洲视频 |