RocketMQ如何保證消息的可靠性投遞?
介紹
要想保證消息的可靠型投遞,無非保證如下3個階段的正常執(zhí)行即可。
- 生產(chǎn)者將消息成功投遞到broker
- broker將投遞過程的消息持久化下來
- 消費者能從broker消費到消息
發(fā)送端消息重試
producer向broker發(fā)送消息后,沒有收到broker的ack時,rocketmq會自動重試。重試的次數(shù)可以設(shè)置,默認為2次
- DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME);
- // 同步發(fā)送設(shè)置重試次數(shù)為5次
- producer.setRetryTimesWhenSendFailed(5);
- // 異步發(fā)送設(shè)置重試次數(shù)為5次
- producer.setRetryTimesWhenSendAsyncFailed(5);
消息持久化
我們先來了解一下消息的存儲流程,這個知識對后面分析消費端消息重試非常重要。
和消息相關(guān)的文件有如下幾種
- CommitLog:存儲消息的元數(shù)據(jù)
- ConsumerQueue:存儲消息在CommitLog的索引
- IndexFile:可以通過Message Key,時間區(qū)間快速查找到消息
整個消息的存儲流程如下
- Producer將消息順序?qū)懙紺ommitLog中
- 有一個線程根據(jù)消息的隊列信息,寫入到相關(guān)的ConsumerQueue中(minOffset為寫入的初始位置,consumerOffset為當前消費到的位置,maxOffset為ConsumerQueue最新寫入的位置)和IndexFile
- Consumer從ConsumerQueue的consumerOffset讀取到當前應(yīng)該消費的消息在CommitLog中的偏移量,到CommitLog中找到對應(yīng)的消息,消費成功后移動consumerOffset
刷盤機制
「異步刷盤」:消息被寫入內(nèi)存的PAGECACHE,返回寫成功狀態(tài),當內(nèi)存里的消息量積累到一定程度時,統(tǒng)一觸發(fā)寫磁盤操作,快速寫入 。吞吐量高,當磁盤損壞時,會丟失消息
「同步刷盤」:消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,給應(yīng)用返回消息寫成功的狀態(tài)。吞吐量低,但不會造成消息丟失
主從復(fù)制
如果一個broker有master和slave時,就需要將master上的消息復(fù)制到slave上,復(fù)制的方式有兩種
- 「同步復(fù)制」:master和slave均寫成功,才返回客戶端成功。maste掛了以后可以保證數(shù)據(jù)不丟失,但是同步復(fù)制會增加數(shù)據(jù)寫入延遲,降低吞吐量
- 「異步復(fù)制」:master寫成功,返回客戶端成功。擁有較低的延遲和較高的吞吐量,但是當master出現(xiàn)故障后,有可能造成數(shù)據(jù)丟失
消費端消息重試
順序消息的重試
對于順序消息,當消費者消費消息失敗后,消息隊列RocketMQ版會自動不斷地進行消息重試(每次間隔時間為1秒),這時,應(yīng)用會出現(xiàn)消息消費被阻塞的情況。所以一定要做好監(jiān)控,避免阻塞現(xiàn)象的發(fā)生
「順序消息消費失敗后不會消費下一條消息而是不斷重試這條消息,應(yīng)該是考慮到如果跨過這條消息消費后面的消息會對業(yè)務(wù)邏輯產(chǎn)生影響」
「順序消息暫時僅支持集群消費模式,不支持廣播消費模式」
無序消息的重試
對于無序消息(普通、定時、延時、事務(wù)消息),當消費者消費消息失敗時,您可以通過設(shè)置返回狀態(tài)達到消息重試的結(jié)果。
「無序消息的重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續(xù)消費新的消息」
「消費時候后,重試的配置方式有如下三種」
- 返回Action.ReconsumeLater(推薦)
- 返回Null
- 拋出異常
- public class MessageListenerImpl implements MessageListener {
- @Override
- public Action consume(Message message, ConsumeContext context) {
- //消息處理邏輯拋出異常,消息將重試。
- doConsumeMessage(message);
- //方式1:返回Action.ReconsumeLater,消息將重試。
- return Action.ReconsumeLater;
- //方式2:返回null,消息將重試。
- return null;
- //方式3:直接拋出異常,消息將重試。
- throw new RuntimeException("Consumer Message exception");
- }
- }
「消費失敗后,無需重試的配置方式」
集群消費方式下,消息失敗后期望消息不重試,需要捕獲消費邏輯中可能拋出的異常,最終返回Action.CommitMessage,此后這條消息將不會再重試。
- public class MessageListenerImpl implements MessageListener {
- @Override
- public Action consume(Message message, ConsumeContext context) {
- try {
- doConsumeMessage(message);
- } catch (Throwable e) {
- //捕獲消費邏輯中的所有異常,并返回Action.CommitMessage;
- return Action.CommitMessage;
- }
- //消息處理正常,直接返回Action.CommitMessage;
- return Action.CommitMessage;
- }
- }
「消息重試次數(shù)」
「RocketMQ默認允許每條消息最多重試16次,每次消費失敗發(fā)送一條延時消息到重試隊列,同一條消息失敗一次將延時等級提高一次,然后再放到重試隊列。重試16次后如果還沒有消費成功,則將消息放到死信隊列中。」
「注意:重試隊列和死信隊列都是按照Consumer Group劃分的」
重試隊列topic名字:%RETRY% + consumerGroup
死信隊列topic名字:%DLQ% + consumerGroup
「為什么重試隊列和死信隊列要按照Consumer Group來進行劃分?」
「因為在RocketMQ的時候使用一定要保持訂閱關(guān)系一致。即一個Consumer Group訂閱的topic和tag要完全一致,不然可能會導(dǎo)致消費邏輯混亂,消息丟失」
如下任意一種情況都表現(xiàn)為訂閱關(guān)系不一致
- 相同ConsumerGroup下的Consumer實例訂閱了不同的Topic。
- 相同ConsumerGroup下的Consumer實例訂閱了相同的Topic,但訂閱的Tag不一致。
我們可以通過控制臺查看各種類型的主題
消息每次重試的間隔時間如下
第幾次重試 與上次重試的間隔時間 第幾次重試 與上次重試的間隔時間
第幾次重試 | 與上次重試的間隔時間 | 第幾次重試 | 與上次重試的間隔時間 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分鐘 |
2 | 30 秒 | 10 | 8 分鐘 |
3 | 1 分鐘 | 11 | 9 分鐘 |
4 | 2 分鐘 | 12 | 10 分鐘 |
5 | 3 分鐘 | 13 | 20 分鐘 |
6 | 4 分鐘 | 14 | 30 分鐘 |
7 | 5 分鐘 | 15 | 1 小時 |
8 | 6 分鐘 | 16 | 2 小時 |
「前面說到RocketMQ的消息重試是通過往重試隊列發(fā)送定時消息來實現(xiàn)的。」 RocketMQ支持18個級別的定時延時,每個級別定時消息的延時時間如下。
- // MessageStoreConfig.java
- private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
消息重試只是把定時消息的前2個級別去掉,每次發(fā)送下一個級別的定時消息
我們可以設(shè)置消費端消息重試次數(shù)
- 最大重試次數(shù)小于等于16次,則重試時間間隔同上表描述。
- 最大重試次數(shù)大于16次,超過16次的重試時間間隔均為每次2小時。
- Properties properties = new Properties();
- // 配置對應(yīng)Group ID的最大消息重試次數(shù)為20次,最大重試次數(shù)為字符串類型。
- properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
- Consumer consumer =ONSFactory.createConsumer(properties);
「那么重試隊列中的消息是如何被消費的?」
消息消費者在啟動的時候,會訂閱正常的topic和重試隊列的topic
定時消息的實現(xiàn)邏輯也比較簡單,可以歸納為如下幾步
1.發(fā)送延時消息
1.1 替換topic為SCHEDULE_TOPIC_XXXX,queueId為消息延遲等級(如果不替換topic直接發(fā)到對應(yīng)的consumeQueue中,則消息會被立馬消費)
1.2 將消息原來的topic,queueId放到消息擴展屬性中
1.3 將消息應(yīng)該執(zhí)行的時間放到tagsCode中
將消息順序?qū)懙紺ommitLog中
將消息對應(yīng)的信息分發(fā)到對應(yīng)的ConsumerQueue中(topic為SCHEDULE_TOPIC_XXXX總共有18個queue,對應(yīng)18個延遲級別)
定時任務(wù)不斷判斷消息是否到達投遞時間,沒有到達則后續(xù)執(zhí)行投遞
如果到達投遞時間,則從commitLog中拉取消息的內(nèi)容,重新設(shè)置消息topic,queueId為原來的(原來的topic,queueId在消息擴展屬性中),然后將消息投遞到commitLog中,此時消息就會被分發(fā)到對應(yīng)的隊列中,然后被消費。
本文轉(zhuǎn)載自微信公眾號「Java識堂」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系Java識堂公眾號。