4 張圖,9 個維度告訴你怎么做能確保 RocketMQ 不丟失消息
大家好,我是君哥。
引入消息隊列可以方便地實現系統解耦、削峰填谷等作用。但是消息隊列使用不當,可能會引起消息丟失,在一些消息敏感的業務場景下,這是不允許的。今天我們來聊一聊 RocketMQ 怎么做能確保消息不丟失。
1 RocketMQ 簡介
RocketMQ 是阿里巴巴開源的分布式消息中間件,整體架構如下圖:
RocketMQ 主要包括 Producer、Consumer 和 Broker,同時 Name Server 進行集群注冊管理和保存元數據。
2 消息不丟失
要想保證消息不丟失,需要從以下幾個方面考慮:
- Producer 發送消息
- Broker 保存消息
- Consumer 消費消息
- Broker 主從切換
維度 1:同步發送,代碼如下:
public void send() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");
SendResult sendResult = null;
DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
try {
sendResult = producer.send(sendMessage);
} catch (Exception e) {
e.printStackTrace();
}
if (sendResult != null) {
System.out.println(sendResult.getSendStatus());
}
}
同步發送會返回 4 個狀態碼:
- SEND_OK:消息發送成功。需要注意的是,消息發送到 broker 后,還有兩個操作:消息刷盤和消息同步到 slave 節點,默認這兩個操作都是異步的,只有把這兩個操作都改為同步,SEND_OK 這個狀態才能真正表示發送成功。
- FLUSH_DISK_TIMEOUT:消息發送成功但是消息刷盤超時。
- FLUSH_SLAVE_TIMEOUT:消息發送成功但是消息同步到 slave 節點時超時。
- SLAVE_NOT_AVAILABLE:消息發送成功但是 broker 的 slave 節點不可用。
根據返回的狀態碼,可以做消息重試,這里設置的重試次數是 3。
消息重試時,消費端一定要做好冪等處理。
維度 2:異步發送,代碼如下:
public void sendAsync() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");
DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.send(sendMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
// TODO 可以在這里加入重試邏輯
}
});
}
異步發送,可以重寫回調函數,回調函數捕獲到 Exception 時表示發送失敗,這時可以進行重試,這里設置的重試次數是 3。
維度 3:刷盤策略
- 異步刷盤:默認。消息寫入 CommitLog 時,并不會直接寫入磁盤,而是先寫入 PageCache 緩存后返回成功,然后用后臺線程異步把消息刷入磁盤。異步刷盤提高了消息吞吐量,但是可能會有消息丟失的情況,比如斷點導致機器停機,PageCache 中沒來得及刷盤的消息就會丟失。
- 同步刷盤:消息寫入內存后,立刻請求刷盤線程進行刷盤,如果消息未在約定的時間內(默認 5 s)刷盤成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到這個響應后,可以進行重試。同步刷盤策略保證了消息的可靠性,同時降低了吞吐量,增加了延遲。要開啟同步刷盤,需要增加下面配置:
flushDiskType=SYNC_FLUSH
維度 4:Broker 多副本和高可用
Broker 為了保證高可用,采用一主多從的方式部署。如下圖:
消息發送到 master 節點后,slave 節點會從 master 拉取消息保持跟 master 的一致。這個過程默認是異步的,即 master 收到消息后,不等 slave 節點復制消息就直接給 Producer 返回成功。
這樣會有一個問題,如果 slave 節點還沒有完成消息復制,這時 master 宕機了,進行主備切換后就會有消息丟失。為了避免這個問題,可以采用 slave 節點同步復制消息,即等 slave 節點復制消息成功后再給 Producer 返回發送成功。只需要增加下面的配置:
brokerRole=SYNC_MASTER
改為同步復制后,消息復制流程如下:
- slave 初始化后,跟 master 建立連接并向 master 發送自己的 offset;
- master 收到 slave 發送的 offset 后,將 offset 后面的消息批量發送給 slave;
- slave 把收到的消息寫入 commitLog 文件,并給 master 發送新的 offset;
- master 收到新的 offset 后,如果 offset >= producer 發送消息后的 offset,給 Producer 返回 SEND_OK。
維度 5:消息確認
Consumer 消費消息的代碼如下:
public void consume() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("topic1", "tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try{
System.out.printf("Receive New Messages: %s", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}
如果 Consumer 消費成功,返回 CONSUME_SUCCESS,提交 offset 并從 Broker 拉取下一批消息。
維度 6:Consumer 重試
Consumer 消費失敗,這里有 3 種情況:
- 返回 RECONSUME_LATER
- 返回 null
- 拋出異常
Broker 收到這個響應后,會把這條消息放入重試隊列,重新發送給 Consumer。
注意:
- Broker 默認最多重試 16 次,如果重試 16 次都失敗,就把這條消息放入死信隊列,Consumer 可以訂閱死信隊列進行消費。
- 重試只有在集群模式(MessageModel.CLUSTERING)下生效,在廣播模式(MessageModel.BROADCASTING)下是不生效的。
- Consumer 端一定要做好冪等處理。
其實重試 3 次都失敗就可以說明代碼有問題,這時 Consumer 可以把消息存入本地,給 Broker 返回CONSUME_SUCCESS 來結束重試。代碼如下:
int count = ((MessageExt) msgs).getReconsumeTimes();
if (count > 2) {
//TODO 把消息寫入本地存儲
System.out.println("重試次數超過3次");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
維度7:事務消息
RocketMQ支持事務消息,整體流程如下圖:
- Producer 發送 half 消息;
- Broker 先把消息寫入 topic 是 RMQ_SYS_TRANS_HALF_TOPIC 的隊列,之后給 Producer 返回成功;
- Producer 執行本地事務,成功后給 Broker 發送 commit 命令(本地事務執行失敗則發送 rollback);
- Broker 收到 commit 請求后把消息狀態更改為成功并把消息推到真正的 topic;
- Consumer 拉取消息進行消費。
代碼如下:
public class ProducerTransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
/**
* 這里執行本地事務,執行成功返回LocalTransactionState.COMMIT_MESSAGE,執行失敗返回
* LocalTransactionState.ROLLBACK_MESSAGE,如果返回LocalTransactionState.UNKNOW,
* Broker會回來查詢,所以需要記錄事務執行狀態
*/
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
/**
* 這里查詢事務執行狀態,根據事務狀態返回LocalTransactionState.COMMIT_MESSAGE或
* LocalTransactionState.ROLLBACK_MESSAGE,如果沒有查詢到返回LocalTransactionState.UNKNOW,
* Broker會再次查詢,可以記錄查詢次數,超過次數后返回ROLLBACK_MESSAGE
*/
return LocalTransactionState.UNKNOW;
}
}
維度 8:消息索引
我們知道,RocketMQ 核心的數據文件有 3 個:CommitLog、ConsumeQueue 和 Index。其中Index 文件就是一個索引文件,結構如下圖:
查找消息時,首先根據消息 key 的 hashcode 計算出 Hash 槽的位置,然后讀取 Hash 槽的值計算 Index 條目的位置,從Index 條目位置讀取到消息在 CommitLog 文件中的 offset,從而查找到消息。
在 Producer 發送消息時,可以指定一個 key,代碼如下:
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.setKeys("weiyiid");
這樣可以通過 RocketMQ 提供的命令或者管理控制臺來查詢消息是否發送成功。
維度 9:極端情況
如果對消息丟失零容忍,我們必須要考慮極端情況,比如整個 RocketMQ 集群掛了,這時 Producer 端發送消息一定會失敗,可以考慮在 Producer 端做降級,把要發送的消息保存到本地數據庫或磁盤,等 RocketMQ 恢復以后再把本地消息推送出去。
3 總結
在一些特殊的業務場景,比如支付、銀行核算等,需要確保消息不丟失,但是同時也要看到,消息不丟失的方案會大大降低 RocketMQ 的吞吐量,需要綜合考慮。