成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

RocketMQ消息重試機(jī)制解析!

開發(fā) 架構(gòu)
由于網(wǎng)絡(luò)抖動(dòng)、服務(wù)宕機(jī)等一些不確定的因素,RocketMQ在發(fā)送消息的時(shí)候很有可能出現(xiàn)消息發(fā)送或者消費(fèi)失敗的問題。

由于網(wǎng)絡(luò)抖動(dòng)、服務(wù)宕機(jī)等一些不確定的因素,RocketMQ在發(fā)送消息的時(shí)候很有可能出現(xiàn)消息發(fā)送或者消費(fèi)失敗的問題。

所以RocketMQ消息重試分為2種:

?Producer端重試和Consumer端重試。

Producer端重試

?生產(chǎn)者端的消息失敗,也就是Producer往MQ上發(fā)消息沒有發(fā)送成功。

  • 比如網(wǎng)絡(luò)抖動(dòng)致使生產(chǎn)者發(fā)送消息到MQ失敗。

這種消息失敗重試可以手動(dòng)設(shè)置發(fā)送失敗重試的次數(shù)。

producer.setRetryTimesWhenSendFailed(3);

官方說明

?Producer的send方法本身支持內(nèi)部重試。

重試邏輯:

  • 默認(rèn)至多重試2次。
  • 這個(gè)方法的總耗時(shí)時(shí)間不超過sendMsgTimeout設(shè)置的值,默認(rèn)10s。

如果本身向Broker發(fā)送消息產(chǎn)生超時(shí)異常,就不會(huì)再重試。

  • 以上策略也是在一定程度上保證了消息可以發(fā)送成功。

如果業(yè)務(wù)對(duì)消息可靠性要求比較高,建議增加相應(yīng)的重試邏輯:

  • 比如調(diào)用send同步方法發(fā)送失敗時(shí),則嘗試將消息存儲(chǔ)到DB。
  • 然后由后臺(tái)線程定時(shí)重試,確保消息一定到達(dá)Broker。

重試策略

消息發(fā)送重試有三種策略:

?同步發(fā)送失敗策略、異步發(fā)送失敗策略和消息刷盤失敗策略。

同步發(fā)送失敗策略:

?普通消息,消息發(fā)送默認(rèn)采用round-robin策略(輪轉(zhuǎn))來選擇所發(fā)送到的隊(duì)列。

  • 如果發(fā)送失敗,默認(rèn)重試2次。

但在重試時(shí)是不會(huì)選擇上次發(fā)送失敗的Broker,而是選擇其它Broker。

DefaultMQProducer producer = new DefaultMQProducer("pg");
// 設(shè)置同步發(fā)送失敗時(shí)重試發(fā)送的次數(shù),默認(rèn)為2次
producer.setRetryTimesWhenSendFailed(3);
// 設(shè)置發(fā)送超時(shí)時(shí)限為5s,默認(rèn)10s
producer.setSendMsgTimeout(5000);

異步發(fā)送失敗策略:

?異步發(fā)送失敗重試時(shí),異步重試不會(huì)選擇其他Broker,僅在當(dāng)前Broker上做重試。

  • 所以該策略無法保證消息不丟失。
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// 指定異步發(fā)送失敗后不進(jìn)行重試發(fā)送
producer.setRetryTimesWhenSendAsyncFailed(0);

消息刷盤失敗策略:

?消息刷盤超時(shí),默認(rèn)是不會(huì)將消息嘗試發(fā)送到其他Broker。

對(duì)于重要消息可以通過在Broker的配置文件設(shè)置retryAnotherBrokerWhenNotStoreOK屬性為true來開啟。

幾種情況

異步發(fā)送在發(fā)送過程中出現(xiàn)異常進(jìn)行重試:

?在解析請(qǐng)求結(jié)果時(shí),發(fā)現(xiàn)響應(yīng)狀態(tài)碼有其它異常(消息可能未正確被Broker處理)會(huì)繼續(xù)進(jìn)行重試。

  • 重試依然選擇當(dāng)前Broker。

但是如果響應(yīng)結(jié)果不為空的話,即使處理響應(yīng)時(shí)發(fā)生異常也不會(huì)進(jìn)行重試。

同步發(fā)送時(shí):

?如果發(fā)送過程中沒有異常,但是發(fā)送結(jié)果不OK,也會(huì)選擇另一個(gè)Broker繼續(xù)進(jìn)行重試。

順序消息發(fā)送失敗不進(jìn)行重試:

?順序消息:指的是同步+指定消息隊(duì)列的方式發(fā)送。

Consumer端重試

消息正常的到了消費(fèi)者,結(jié)果消費(fèi)者發(fā)生異常,處理失敗了。

?例如反序列化失敗,消息數(shù)據(jù)本身無法處理等。

順序消息

順序消息的消費(fèi)重試

?順序消息,當(dāng)Consumer消費(fèi)消息失敗后,為了保證消息的順序性,其會(huì)自動(dòng)不斷地進(jìn)行消息重試,直到消費(fèi)成功。

  • 消費(fèi)重試默認(rèn)間隔時(shí)間為1000ms。

重試期間應(yīng)用會(huì)出現(xiàn)消息消費(fèi)被阻塞的情況。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 順序消息消費(fèi)失敗的消費(fèi)重試時(shí)間間隔,單位毫秒,默認(rèn)為1000,其取值范圍為[10, 30000]
consumer.setSuspendCurrentQueueTimeMillis(100);

由于對(duì)順序消息的重試是無休止的,不間斷的,直至消費(fèi)成功。

  • 所以,對(duì)于順序消息的消費(fèi),務(wù)必要保證應(yīng)用能夠及時(shí)監(jiān)控并處理消費(fèi)失敗的情況,避免消費(fèi)被永久性阻塞。

?注意:順序消息沒有發(fā)送失敗重試機(jī)制,但具有消費(fèi)失敗重試機(jī)制。

消費(fèi)狀態(tài)

?順序消費(fèi)目前兩個(gè)狀態(tài):SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。

SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暫停消費(fèi)一下:

  • 過SuspendCurrentQueueTimeMillis時(shí)間間隔后再重試一下,而不是放到重試隊(duì)列里。
public enum ConsumeOrderlyStatus {
    SUCCESS,
    
    @Deprecated
    ROLLBACK,
    
    @Deprecated
    COMMIT,
    
    SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

并發(fā)消息

并發(fā)消息的消費(fèi)重試

?在并發(fā)消費(fèi)中,可能會(huì)有多個(gè)線程同時(shí)消費(fèi)一個(gè)隊(duì)列的消息。

因此即使發(fā)送端通過發(fā)送順序消息保證消息在同一個(gè)隊(duì)列中按照FIFO的順序,也無法保證消息實(shí)際被順序消費(fèi)。

  • 所有并發(fā)消費(fèi)也可以稱之為無序消費(fèi)。

對(duì)于無序消息(普通消息、延時(shí)消息、事務(wù)消息):

  • 當(dāng)Consumer消費(fèi)消息失敗時(shí),可以通過設(shè)置返回狀態(tài)達(dá)到消息重試的效果。

注意:

?無序消息的重試只針對(duì)集群消費(fèi)模式生效。

廣播消費(fèi)模式不提供失敗重試特性:即消費(fèi)失敗后,失敗消息不再重試,繼續(xù)消費(fèi)新的消息。

消費(fèi)狀態(tài)

Consumer端消息消費(fèi)有兩種狀態(tài):

?一個(gè)是成功(CONSUME_SUCCESS),一個(gè)是失敗&稍后重試(RECONSUME_LATER)。

Consumer為了保證消息消費(fèi)成功,只有使用方明確表示消費(fèi)成功。

  • 返回CONSUME_SUCCESS,RocketMQ才會(huì)認(rèn)為消息消費(fèi)成功。

若是消息消費(fèi)失敗,只要返回:ConsumeConcurrentlyStatus.RECONSUME_LATER。

  • RocketMQ就會(huì)認(rèn)為消息消費(fèi)失敗了,要重新投遞。
public enum ConsumeConcurrentlyStatus {
    CONSUME_SUCCESS,
    RECONSUME_LATER;   
}

重試機(jī)制

?為了保證消息是確定被至少消費(fèi)成功一次,RocketMQ會(huì)把這批消息重發(fā)回Broker。

  • Topic不是原Topic而是一個(gè)RETRY Topic。

在延遲的某個(gè)時(shí)間點(diǎn)(默認(rèn)是10秒,業(yè)務(wù)可設(shè)置)后,再次投遞。

?而若是一直這樣重復(fù)消費(fèi)都持續(xù)失敗到必定次數(shù)(默認(rèn)16次),就會(huì)投遞到死信隊(duì)列

在啟動(dòng)Broker的過程當(dāng)中,能夠觀察到以下輸出:

2024-09-19 16:29:58 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

RECONSUME_LATER策略:

?若是消費(fèi)失敗,那么1S后再次消費(fèi),若是失敗,那么5S后,再次消費(fèi),…… 直至2H后若是消費(fèi)還失敗。

  • 那么該條消息就會(huì)終止發(fā)送給消費(fèi)者了。

消息重試間隔時(shí)間如下:

重試次數(shù)

與上次重試的間隔時(shí)間

重試次數(shù)

與上次重試的間隔時(shí)間

1

10秒

9

7分鐘

2

30秒

10

8分鐘

3

1分鐘

11

9分鐘

4

2分鐘

12

10分鐘

5

3分鐘

13

20分鐘

6

4分鐘

14

30分鐘

7

5分鐘

15

1小時(shí)

8

6分鐘

16

2小時(shí)

?某條消息在一直消費(fèi)失敗的前提下,將會(huì)在接下來的4小時(shí)46分鐘之內(nèi)進(jìn)行16次重試。

  • 超過這個(gè)時(shí)間范圍消息將不再重試投遞,而被投遞至死信隊(duì)列

修改消費(fèi)重試次數(shù):

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 修改消費(fèi)重試次數(shù)
consumer.setMaxReconsumeTimes(10);

基本原理

?重試的 Message,RocketMQ 的做法并不是將其投遞回原 Topic重試隊(duì)列。

每個(gè) ConsumerGroup 都有自己的重試隊(duì)列:

  • 其名稱是由特定的前綴拼接上 ConsumerGroup 所組成,默認(rèn) %RETRY%+消費(fèi)者組名稱。
  • 所以在 Consumer 啟動(dòng)時(shí),就會(huì)同時(shí)消費(fèi)其 ConsumerGroup 對(duì)應(yīng)的重試隊(duì)列和普通隊(duì)列。

消費(fèi)失敗的 Message,Consumer 會(huì)將其投回 Broker:

  • 相當(dāng)于這條 Message 已經(jīng)被消費(fèi)掉了,之后重試的只是內(nèi)容相同、但實(shí)際不是同一條的 Message。
  • 然后會(huì)校驗(yàn)重試的次數(shù),如果達(dá)到16次則會(huì)進(jìn)入死信隊(duì)列 ,組成為 %DLQ%+消費(fèi)者組名稱。
  • 未達(dá)到最大重試次數(shù),則會(huì)根據(jù)重試間隔時(shí)間等級(jí)將其投遞到延遲隊(duì)列SCHEDULE_TOPIC_XXXX中。
  • 然后等到了延遲等級(jí)對(duì)應(yīng)的時(shí)間之后,再投遞到 ConsumerGroup 所對(duì)應(yīng)的重試隊(duì)列當(dāng)中,供后續(xù)消費(fèi)。

消息重復(fù)

如果消費(fèi)端收到兩條一樣的消息,應(yīng)該怎樣處理?

《RocketMQ 原理簡(jiǎn)介》中講到:

?RocketMQ 無法避免消息重復(fù)。

所以如果業(yè)務(wù)對(duì)消費(fèi)重復(fù)非常敏感,務(wù)必要在業(yè)務(wù)側(cè)去重,有以下幾種去重方式:

?

消費(fèi)端處理消息的業(yè)務(wù)邏輯保持冪等性。

  • 如何保證冪等性,可以看我之前的文章!

保證每條消息都有唯一編號(hào)且保證消息處理成功與去重表的日志同時(shí)出現(xiàn)。

  • 利用一張日志表來記錄已經(jīng)處理成功的消息的ID。
  • 如果新到的消息ID已經(jīng)在日志表中,那么就不再處理這條消息。
責(zé)任編輯:姜華 來源: 月伴飛魚
相關(guān)推薦

2025-01-03 08:44:37

kafka消息發(fā)送策略

2022-11-14 08:19:59

重試機(jī)制Kafka

2022-05-06 07:44:10

微服務(wù)系統(tǒng)設(shè)計(jì)重試機(jī)制

2020-07-19 15:39:37

Python開發(fā)工具

2017-07-02 16:50:21

2025-02-26 10:49:14

2017-06-16 15:16:15

2021-02-20 10:02:22

Spring重試機(jī)制Java

2023-11-27 07:44:59

RabbitMQ機(jī)制

2023-10-27 08:20:12

springboot微服務(wù)

2024-10-29 08:34:27

RocketMQ消息類型事務(wù)消息

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-11-14 09:10:13

消費(fèi)者RocketMQ負(fù)載均衡

2025-05-28 01:15:00

Golang重試機(jī)制

2023-11-06 08:00:38

接口高可用機(jī)制

2024-08-22 18:49:23

2025-04-18 03:00:00

2024-01-04 18:01:55

高并發(fā)SpringBoot

2022-06-13 11:05:35

RocketMQ消費(fèi)者線程

2017-12-18 11:09:45

消息轉(zhuǎn)發(fā)DemoPerson
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 亚洲一区二区 | 中文字幕 亚洲一区 | 日本男人天堂 | 国产美女精品视频免费观看 | 亚洲精品乱码久久久久久蜜桃91 | 精品国产乱码久久久久久丨区2区 | 中文字幕 欧美 日韩 | 中文字幕免费在线 | 亚洲视频一区 | 二区中文| 国产成人综合亚洲欧美94在线 | 久久久久久久国产精品 | 亚洲成人二区 | 国产精品免费大片 | 青青草av网站| 免费观看黄色片视频 | 精品久久一区二区三区 | 国产色黄| 免费在线观看h片 | 99精品免费久久久久久久久日本 | 欧美一二三四成人免费视频 | 亚洲成人精 | 国产中文字幕亚洲 | 欧美日韩91| 欧美久久国产精品 | 中文字幕一级 | 久久久久亚洲精品 | 在线亚洲欧美 | 男女污网站| 91精品国产综合久久久久久丝袜 | 在线综合视频 | 久久久一二三 | 中国av在线免费观看 | 久久久www | 2020天天操| 91久久久久久久久久久 | 99re在线视频 | 91精品在线看 | 成人视屏在线观看 | 呦呦在线视频 | 精品欧美乱码久久久久久1区2区 |