成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

RocketMQ如何保證消息的可靠性投遞?

開發(fā) 架構(gòu)
對于順序消息,當消費者消費消息失敗后,消息隊列RocketMQ版會自動不斷地進行消息重試(每次間隔時間為1秒),這時,應(yīng)用會出現(xiàn)消息消費被阻塞的情況。

[[396087]]

介紹

要想保證消息的可靠型投遞,無非保證如下3個階段的正常執(zhí)行即可。

  1. 生產(chǎn)者將消息成功投遞到broker
  2. broker將投遞過程的消息持久化下來
  3. 消費者能從broker消費到消息

發(fā)送端消息重試

producer向broker發(fā)送消息后,沒有收到broker的ack時,rocketmq會自動重試。重試的次數(shù)可以設(shè)置,默認為2次

  1. DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME); 
  2. // 同步發(fā)送設(shè)置重試次數(shù)為5次 
  3. producer.setRetryTimesWhenSendFailed(5); 
  4. // 異步發(fā)送設(shè)置重試次數(shù)為5次 
  5. producer.setRetryTimesWhenSendAsyncFailed(5); 

消息持久化

我們先來了解一下消息的存儲流程,這個知識對后面分析消費端消息重試非常重要。

和消息相關(guān)的文件有如下幾種

  1. CommitLog:存儲消息的元數(shù)據(jù)
  2. ConsumerQueue:存儲消息在CommitLog的索引
  3. IndexFile:可以通過Message Key,時間區(qū)間快速查找到消息

整個消息的存儲流程如下

  1. Producer將消息順序?qū)懙紺ommitLog中
  2. 有一個線程根據(jù)消息的隊列信息,寫入到相關(guān)的ConsumerQueue中(minOffset為寫入的初始位置,consumerOffset為當前消費到的位置,maxOffset為ConsumerQueue最新寫入的位置)和IndexFile
  3. Consumer從ConsumerQueue的consumerOffset讀取到當前應(yīng)該消費的消息在CommitLog中的偏移量,到CommitLog中找到對應(yīng)的消息,消費成功后移動consumerOffset

刷盤機制

「異步刷盤」:消息被寫入內(nèi)存的PAGECACHE,返回寫成功狀態(tài),當內(nèi)存里的消息量積累到一定程度時,統(tǒng)一觸發(fā)寫磁盤操作,快速寫入 。吞吐量高,當磁盤損壞時,會丟失消息

「同步刷盤」:消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,給應(yīng)用返回消息寫成功的狀態(tài)。吞吐量低,但不會造成消息丟失

主從復(fù)制

如果一個broker有master和slave時,就需要將master上的消息復(fù)制到slave上,復(fù)制的方式有兩種

  1. 「同步復(fù)制」:master和slave均寫成功,才返回客戶端成功。maste掛了以后可以保證數(shù)據(jù)不丟失,但是同步復(fù)制會增加數(shù)據(jù)寫入延遲,降低吞吐量
  2. 「異步復(fù)制」:master寫成功,返回客戶端成功。擁有較低的延遲和較高的吞吐量,但是當master出現(xiàn)故障后,有可能造成數(shù)據(jù)丟失

消費端消息重試

順序消息的重試

對于順序消息,當消費者消費消息失敗后,消息隊列RocketMQ版會自動不斷地進行消息重試(每次間隔時間為1秒),這時,應(yīng)用會出現(xiàn)消息消費被阻塞的情況。所以一定要做好監(jiān)控,避免阻塞現(xiàn)象的發(fā)生

「順序消息消費失敗后不會消費下一條消息而是不斷重試這條消息,應(yīng)該是考慮到如果跨過這條消息消費后面的消息會對業(yè)務(wù)邏輯產(chǎn)生影響」

「順序消息暫時僅支持集群消費模式,不支持廣播消費模式」

無序消息的重試

對于無序消息(普通、定時、延時、事務(wù)消息),當消費者消費消息失敗時,您可以通過設(shè)置返回狀態(tài)達到消息重試的結(jié)果。

「無序消息的重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續(xù)消費新的消息」

「消費時候后,重試的配置方式有如下三種」

  1. 返回Action.ReconsumeLater(推薦)
  2. 返回Null
  3. 拋出異常
  1. public class MessageListenerImpl implements MessageListener { 
  2.  
  3.     @Override 
  4.     public Action consume(Message message, ConsumeContext context) { 
  5.         //消息處理邏輯拋出異常,消息將重試。 
  6.         doConsumeMessage(message); 
  7.         //方式1:返回Action.ReconsumeLater,消息將重試。 
  8.         return Action.ReconsumeLater; 
  9.         //方式2:返回null,消息將重試。 
  10.         return null
  11.         //方式3:直接拋出異常,消息將重試。 
  12.         throw new RuntimeException("Consumer Message exception"); 
  13.     } 

「消費失敗后,無需重試的配置方式」

集群消費方式下,消息失敗后期望消息不重試,需要捕獲消費邏輯中可能拋出的異常,最終返回Action.CommitMessage,此后這條消息將不會再重試。

  1. public class MessageListenerImpl implements MessageListener { 
  2.  
  3.     @Override 
  4.     public Action consume(Message message, ConsumeContext context) { 
  5.         try { 
  6.             doConsumeMessage(message); 
  7.         } catch (Throwable e) { 
  8.             //捕獲消費邏輯中的所有異常,并返回Action.CommitMessage; 
  9.             return Action.CommitMessage; 
  10.         } 
  11.         //消息處理正常,直接返回Action.CommitMessage; 
  12.         return Action.CommitMessage; 
  13.     } 

「消息重試次數(shù)」

「RocketMQ默認允許每條消息最多重試16次,每次消費失敗發(fā)送一條延時消息到重試隊列,同一條消息失敗一次將延時等級提高一次,然后再放到重試隊列。重試16次后如果還沒有消費成功,則將消息放到死信隊列中。」

「注意:重試隊列和死信隊列都是按照Consumer Group劃分的」

重試隊列topic名字:%RETRY% + consumerGroup

死信隊列topic名字:%DLQ% + consumerGroup

「為什么重試隊列和死信隊列要按照Consumer Group來進行劃分?」

「因為在RocketMQ的時候使用一定要保持訂閱關(guān)系一致。即一個Consumer Group訂閱的topic和tag要完全一致,不然可能會導(dǎo)致消費邏輯混亂,消息丟失」

如下任意一種情況都表現(xiàn)為訂閱關(guān)系不一致

  • 相同ConsumerGroup下的Consumer實例訂閱了不同的Topic。
  • 相同ConsumerGroup下的Consumer實例訂閱了相同的Topic,但訂閱的Tag不一致。

我們可以通過控制臺查看各種類型的主題

消息每次重試的間隔時間如下

第幾次重試 與上次重試的間隔時間 第幾次重試 與上次重試的間隔時間

第幾次重試 與上次重試的間隔時間 第幾次重試 與上次重試的間隔時間
1 10 秒 9 7 分鐘
2 30 秒 10 8 分鐘
3 1 分鐘 11 9 分鐘
4 2 分鐘 12 10 分鐘
5 3 分鐘 13 20 分鐘
6 4 分鐘 14 30 分鐘
7 5 分鐘 15 1 小時
8 6 分鐘 16 2 小時

「前面說到RocketMQ的消息重試是通過往重試隊列發(fā)送定時消息來實現(xiàn)的。」 RocketMQ支持18個級別的定時延時,每個級別定時消息的延時時間如下。

  1. // MessageStoreConfig.java 
  2. private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"

消息重試只是把定時消息的前2個級別去掉,每次發(fā)送下一個級別的定時消息

我們可以設(shè)置消費端消息重試次數(shù)

  1. 最大重試次數(shù)小于等于16次,則重試時間間隔同上表描述。
  2. 最大重試次數(shù)大于16次,超過16次的重試時間間隔均為每次2小時。
  1. Properties properties = new Properties(); 
  2. // 配置對應(yīng)Group ID的最大消息重試次數(shù)為20次,最大重試次數(shù)為字符串類型。 
  3. properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); 
  4. Consumer consumer =ONSFactory.createConsumer(properties); 

「那么重試隊列中的消息是如何被消費的?」

消息消費者在啟動的時候,會訂閱正常的topic和重試隊列的topic

定時消息的實現(xiàn)邏輯也比較簡單,可以歸納為如下幾步

1.發(fā)送延時消息

1.1 替換topic為SCHEDULE_TOPIC_XXXX,queueId為消息延遲等級(如果不替換topic直接發(fā)到對應(yīng)的consumeQueue中,則消息會被立馬消費)

1.2 將消息原來的topic,queueId放到消息擴展屬性中

1.3 將消息應(yīng)該執(zhí)行的時間放到tagsCode中

將消息順序?qū)懙紺ommitLog中

將消息對應(yīng)的信息分發(fā)到對應(yīng)的ConsumerQueue中(topic為SCHEDULE_TOPIC_XXXX總共有18個queue,對應(yīng)18個延遲級別)

定時任務(wù)不斷判斷消息是否到達投遞時間,沒有到達則后續(xù)執(zhí)行投遞

如果到達投遞時間,則從commitLog中拉取消息的內(nèi)容,重新設(shè)置消息topic,queueId為原來的(原來的topic,queueId在消息擴展屬性中),然后將消息投遞到commitLog中,此時消息就會被分發(fā)到對應(yīng)的隊列中,然后被消費。

本文轉(zhuǎn)載自微信公眾號「Java識堂」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系Java識堂公眾號。

 

責任編輯:武曉燕 來源: Java識堂
相關(guān)推薦

2021-02-02 11:01:31

RocketMQ消息分布式

2024-05-09 08:04:23

RabbitMQ消息可靠性

2020-10-14 08:36:10

RabbitMQ消息

2023-03-06 08:16:04

SpringRabbitMQ

2023-10-17 16:30:00

TCP

2010-12-28 19:50:21

可靠性產(chǎn)品可靠性

2017-08-21 08:51:22

CAN網(wǎng)絡(luò)通訊

2018-09-27 14:13:27

云服務(wù)可靠故障

2024-02-28 10:26:04

物聯(lián)網(wǎng)數(shù)據(jù)存儲

2024-07-04 12:36:50

2011-06-20 14:21:01

模塊化數(shù)據(jù)中心IT基礎(chǔ)設(shè)施

2019-07-26 08:00:00

微服務(wù)架構(gòu)

2022-03-07 08:13:06

MQ消息可靠性異步通訊

2021-03-04 06:49:53

RocketMQ事務(wù)

2009-12-17 16:20:20

城域網(wǎng)路由器

2019-08-30 12:10:05

磁盤數(shù)據(jù)可靠性RAID

2010-12-28 19:55:20

軟件架構(gòu)可靠性

2020-12-06 14:51:23

物聯(lián)網(wǎng)可靠性IOT

2010-07-28 18:58:54

東海證券負載均衡Array Netwo

2010-12-28 20:04:10

網(wǎng)絡(luò)的可靠性網(wǎng)絡(luò)解決方案可靠性
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 91精品国产一区二区三区动漫 | 日韩在线成人 | 91视频在线 | 天天拍天天草 | 午夜av电影院 | 精品久久久久国产免费第一页 | 欧美国产日韩在线观看 | 日本欧美大片 | 亚洲精品一区二区三区四区高清 | 亚洲国产一区二区三区 | h片在线观看网站 | 久久91精品久久久久久9鸭 | 精品久久久久久久 | 亚洲精品一区二区三区中文字幕 | 久久久久久www | 免费看国产一级特黄aaaa大片 | 成人99| 国产精久久久久久久妇剪断 | 国产中文字幕在线观看 | 色爱综合网 | 欧美成人一级 | 精品一区二区电影 | 欧美精品一二三 | 成人一区二区三区在线观看 | 日韩av在线免费 | 成人深夜小视频 | 手机av在线| 日韩一区二区三区在线看 | 国产日韩欧美一区二区在线播放 | 99精品一区 | 色五月激情五月 | 99re视频这里只有精品 | 中文字幕在线观看国产 | a级毛片国产 | 日韩精品 电影一区 亚洲 | www.99re5.com | 欧美一区二不卡视频 | 黄色大片在线免费观看 | 亚洲精品一区二区网址 | 天天综合网天天综合 | 欧美黄色一级毛片 |