成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

4 張圖,9 個維度告訴你怎么做能確保 RocketMQ 不丟失消息

開發 架構
引入消息隊列可以方便地實現系統解耦、削峰填谷等作用。但是消息隊列使用不當,可能會引起消息丟失,在一些消息敏感的業務場景下,這是不允許的。今天我們來聊一聊 RocketMQ 怎么做能確保消息不丟失。

大家好,我是君哥。

引入消息隊列可以方便地實現系統解耦、削峰填谷等作用。但是消息隊列使用不當,可能會引起消息丟失,在一些消息敏感的業務場景下,這是不允許的。今天我們來聊一聊 RocketMQ 怎么做能確保消息不丟失。

1 RocketMQ 簡介

RocketMQ 是阿里巴巴開源的分布式消息中間件,整體架構如下圖:

RocketMQ 主要包括 Producer、Consumer 和 Broker,同時 Name Server 進行集群注冊管理和保存元數據。

2 消息不丟失

要想保證消息不丟失,需要從以下幾個方面考慮:

  • Producer 發送消息
  • Broker 保存消息
  • Consumer 消費消息
  • Broker 主從切換

維度 1:同步發送,代碼如下:

public void send() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");
SendResult sendResult = null;

DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
try {
sendResult = producer.send(sendMessage);
} catch (Exception e) {
e.printStackTrace();
}
if (sendResult != null) {
System.out.println(sendResult.getSendStatus());
}
}

同步發送會返回 4 個狀態碼:

  • SEND_OK:消息發送成功。需要注意的是,消息發送到 broker 后,還有兩個操作:消息刷盤和消息同步到 slave 節點,默認這兩個操作都是異步的,只有把這兩個操作都改為同步,SEND_OK 這個狀態才能真正表示發送成功。
  • FLUSH_DISK_TIMEOUT:消息發送成功但是消息刷盤超時。
  • FLUSH_SLAVE_TIMEOUT:消息發送成功但是消息同步到 slave 節點時超時。
  • SLAVE_NOT_AVAILABLE:消息發送成功但是 broker 的 slave 節點不可用。

根據返回的狀態碼,可以做消息重試,這里設置的重試次數是 3。

消息重試時,消費端一定要做好冪等處理。

維度 2:異步發送,代碼如下:

public void sendAsync() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");

DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.send(sendMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {

}

@Override
public void onException(Throwable e) {
// TODO 可以在這里加入重試邏輯
}
});
}

異步發送,可以重寫回調函數,回調函數捕獲到 Exception 時表示發送失敗,這時可以進行重試,這里設置的重試次數是 3。

維度 3:刷盤策略

  • 異步刷盤:默認。消息寫入 CommitLog 時,并不會直接寫入磁盤,而是先寫入 PageCache 緩存后返回成功,然后用后臺線程異步把消息刷入磁盤。異步刷盤提高了消息吞吐量,但是可能會有消息丟失的情況,比如斷點導致機器停機,PageCache 中沒來得及刷盤的消息就會丟失。
  • 同步刷盤:消息寫入內存后,立刻請求刷盤線程進行刷盤,如果消息未在約定的時間內(默認 5 s)刷盤成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到這個響應后,可以進行重試。同步刷盤策略保證了消息的可靠性,同時降低了吞吐量,增加了延遲。要開啟同步刷盤,需要增加下面配置:
flushDiskType=SYNC_FLUSH

維度 4:Broker 多副本和高可用

Broker 為了保證高可用,采用一主多從的方式部署。如下圖:

消息發送到 master 節點后,slave 節點會從 master 拉取消息保持跟 master 的一致。這個過程默認是異步的,即 master 收到消息后,不等 slave 節點復制消息就直接給 Producer 返回成功。

這樣會有一個問題,如果 slave 節點還沒有完成消息復制,這時 master 宕機了,進行主備切換后就會有消息丟失。為了避免這個問題,可以采用 slave 節點同步復制消息,即等 slave 節點復制消息成功后再給 Producer 返回發送成功。只需要增加下面的配置:

brokerRole=SYNC_MASTER

改為同步復制后,消息復制流程如下:

  • slave 初始化后,跟 master 建立連接并向 master 發送自己的 offset;
  • master 收到 slave 發送的 offset 后,將 offset 后面的消息批量發送給 slave;
  • slave 把收到的消息寫入 commitLog 文件,并給 master 發送新的 offset;
  • master 收到新的 offset 后,如果 offset >= producer 發送消息后的 offset,給 Producer 返回 SEND_OK。

維度 5:消息確認

Consumer 消費消息的代碼如下:

public void consume() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("topic1", "tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try{
System.out.printf("Receive New Messages: %s", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}

如果 Consumer 消費成功,返回 CONSUME_SUCCESS,提交 offset 并從 Broker 拉取下一批消息。

維度 6:Consumer 重試

Consumer 消費失敗,這里有 3 種情況:

  • 返回 RECONSUME_LATER
  • 返回 null
  • 拋出異常

Broker 收到這個響應后,會把這條消息放入重試隊列,重新發送給 Consumer。

注意:

  • Broker 默認最多重試 16 次,如果重試 16 次都失敗,就把這條消息放入死信隊列,Consumer 可以訂閱死信隊列進行消費。
  • 重試只有在集群模式(MessageModel.CLUSTERING)下生效,在廣播模式(MessageModel.BROADCASTING)下是不生效的。
  • Consumer 端一定要做好冪等處理。

其實重試 3 次都失敗就可以說明代碼有問題,這時 Consumer 可以把消息存入本地,給 Broker 返回CONSUME_SUCCESS 來結束重試。代碼如下:

int count = ((MessageExt) msgs).getReconsumeTimes();
if (count > 2) {
//TODO 把消息寫入本地存儲
System.out.println("重試次數超過3次");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

維度7:事務消息

RocketMQ支持事務消息,整體流程如下圖:

  • Producer 發送 half 消息;
  • Broker 先把消息寫入 topic 是 RMQ_SYS_TRANS_HALF_TOPIC 的隊列,之后給 Producer 返回成功;
  • Producer 執行本地事務,成功后給 Broker 發送 commit 命令(本地事務執行失敗則發送 rollback);
  • Broker 收到 commit 請求后把消息狀態更改為成功并把消息推到真正的 topic;
  • Consumer 拉取消息進行消費。

代碼如下:

public class ProducerTransactionListenerImpl implements TransactionListener {

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
/**
* 這里執行本地事務,執行成功返回LocalTransactionState.COMMIT_MESSAGE,執行失敗返回
* LocalTransactionState.ROLLBACK_MESSAGE,如果返回LocalTransactionState.UNKNOW,
* Broker會回來查詢,所以需要記錄事務執行狀態
*/
return LocalTransactionState.COMMIT_MESSAGE;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
/**
* 這里查詢事務執行狀態,根據事務狀態返回LocalTransactionState.COMMIT_MESSAGE或
* LocalTransactionState.ROLLBACK_MESSAGE,如果沒有查詢到返回LocalTransactionState.UNKNOW,
* Broker會再次查詢,可以記錄查詢次數,超過次數后返回ROLLBACK_MESSAGE
*/
return LocalTransactionState.UNKNOW;
}
}

維度 8:消息索引

我們知道,RocketMQ 核心的數據文件有 3 個:CommitLog、ConsumeQueue 和 Index。其中Index 文件就是一個索引文件,結構如下圖:

查找消息時,首先根據消息 key 的 hashcode 計算出 Hash 槽的位置,然后讀取 Hash 槽的值計算 Index 條目的位置,從Index 條目位置讀取到消息在 CommitLog 文件中的 offset,從而查找到消息。

在 Producer 發送消息時,可以指定一個 key,代碼如下:

Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.setKeys("weiyiid");

這樣可以通過 RocketMQ 提供的命令或者管理控制臺來查詢消息是否發送成功。

維度 9:極端情況

如果對消息丟失零容忍,我們必須要考慮極端情況,比如整個 RocketMQ 集群掛了,這時 Producer 端發送消息一定會失敗,可以考慮在 Producer 端做降級,把要發送的消息保存到本地數據庫或磁盤,等 RocketMQ 恢復以后再把本地消息推送出去。

3 總結

在一些特殊的業務場景,比如支付、銀行核算等,需要確保消息不丟失,但是同時也要看到,消息不丟失的方案會大大降低 RocketMQ 的吞吐量,需要綜合考慮。


責任編輯:武曉燕 來源: 君哥聊技術
相關推薦

2022-09-26 10:43:13

RocketMQ保存消息

2022-08-01 10:43:11

RocketMQZookeeper注冊中心

2022-08-15 10:45:34

RocketMQ消息隊列

2024-08-06 09:55:25

2020-10-09 06:55:23

監控告警日志

2022-09-26 11:32:14

用戶分層服務業務

2021-03-18 12:16:44

用戶分層業務

2022-03-31 08:26:44

RocketMQ消息排查

2022-07-11 11:06:11

RocketMQ函數.消費端

2022-06-13 11:05:35

RocketMQ消費者線程

2022-12-16 17:15:33

MQRabbitMQ

2022-12-19 17:44:25

MQ技術RabbitMQ

2022-04-25 15:01:07

系統程序員調度

2022-07-04 11:06:02

RocketMQ事務消息實現

2022-06-27 11:04:24

RocketMQ順序消息

2021-04-13 18:16:07

多線程安全代碼

2021-04-13 15:51:46

服務治理流量

2022-09-16 15:42:00

數據Kafka

2012-07-20 17:24:51

HTML5

2020-04-06 14:53:05

MySQL數據庫字符串
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 三级国产三级在线 | 国产日韩91 | 国产福利91精品 | 精品一区二区三区在线观看国产 | 亚洲精品日韩一区二区电影 | 国产高清一区二区三区 | 成人一区av偷拍 | 精品av| 亚洲日本欧美日韩高观看 | 99精品国产一区二区青青牛奶 | 国产精品久久久久久亚洲调教 | 91精品国产综合久久久动漫日韩 | 欧美在线视频网站 | 久久精品国产亚洲一区二区三区 | 国产精品久久久久久久久久久免费看 | 欧美激情在线精品一区二区三区 | 天天爽天天操 | 天天夜碰日日摸日日澡 | 福利在线观看 | 日韩av在线一区 | 国产精品久久久一区二区三区 | 精品日韩欧美一区二区 | 亚洲成人一区二区三区 | 精品国产一区探花在线观看 | 最新中文字幕在线 | 在线免费国产视频 | 国产日韩欧美精品一区二区三区 | 国产精品a久久久久 | 久久精品国产99国产精品 | 91国自产| 久久精品小短片 | 一级特黄在线 | 国产精品久久久久久久久久久免费看 | 暖暖日本在线视频 | 欧美成年视频 | 久久天堂 | 午夜国产 | 97超碰人人 | 狠狠婷婷综合久久久久久妖精 | av天天澡天天爽天天av | 亚洲综合在线视频 |