成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

警惕!這八個場景下 RocketMQ 會發生流量控制

開發 前端
本文介紹了 RocketMQ 發生流量控制的 8 個場景,其中 Broker 4 個場景,Consumer 4 個場景。Broker 的流量控制,本質是對 Producer 的流量控制,最好的解決方法就是給 Broker 擴容,增加 Broker 寫入能力。

大家好,我是君哥。

在使用 RocketMQ 的過程中,有時候我們會看到下面的日志:

[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5

這是因為 RocketMQ 觸發了流量控制。今天我們來聊一聊哪些場景下 RocketMQ 會觸發流量控制。

如上圖,生產者把消息寫入 Broker,Consumer 從 Broker 拉取消息。Broker 是 RocketMQ 的核心 ,觸發流量控制主要就是為了防止 Broker 壓力過大而宕機。

一、 Broker 流控

1、 broker busy

RockerMQ 默認采用異步刷盤策略,Producer 把消息發送到 Broker 后,Broker 會先把消息寫入 Page Cache,刷盤線程定時地把數據從 Page Cache 刷到磁盤上,如下圖:

那 broker busy 是怎么導致的呢?

Broker 默認是開啟快速失敗的,處理邏輯類是 BrokerFastFailure,這個類中有一個定時任務用來清理過期的請求,每 10 ms 執行一次,代碼如下:

public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
cleanExpiredRequest();
}
}
}, 1000, 10, TimeUnit.MILLISECONDS);
}

(1)Page Cache 繁忙

清理過期請求之前首先會判斷 Page Cache 是否繁忙,如果繁忙,就會給 Producer 返回一個系統繁忙的狀態碼(code=2,remark="[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d"),也就是本文開頭的異常日志。那怎么判斷 Page Cache 繁忙呢?Broker 收到一條消息后會追加到 Page Cache 或者內存映射文件,這個過程首先獲取一個 CommitLog 寫入鎖,如果持有鎖的時間大于 osPageCacheBusyTimeOutMills(默認 1s,可以配置),就認為 Page Cache 繁忙。具體代碼見 DefaultMessageStore 類 isOSPageCacheBusy 方法。

(2)清理過期請求

清理過期請求時,如果請求線程的創建時間到當前系統時間間隔大于 waitTimeMillsInSendQueue(默認 200ms,可以配置)就會清理這個請求,然后給 Producer 返回一個系統繁忙的狀態碼(code=2,remark="[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d")。

system busy

這個異常在 NettyRemotingAbstract#processRequestCommand 方法。

拒絕請求

如果 NettyRequestProcessor 拒絕了請求,就會給 Producer 返回一個系統繁忙的狀態碼(code=2,remark="[REJECTREQUEST]system busy, start flow control for a while")。那什么情況下請求會被拒絕呢?看下面這段代碼:

//SendMessageProcessor類
public boolean rejectRequest() {
return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}

從代碼中可以看到,請求被拒絕的情況有兩種可能,一個是 Page Cache 繁忙,另一個是 TransientStorePoolDeficient。

跟蹤 isTransientStorePoolDeficient 方法,發現判斷依據是在開啟 transientStorePoolEnable 配置的情況下,是否還有可用的 ByteBuffer。

注意:在開啟 transientStorePoolEnable 的情況下,寫入消息時會先寫入堆外內存(DirectByteBuffer),然后刷入 Page Cache,最后刷入磁盤。而讀取消息是從 Page Cache,這樣可以實現讀寫分離,避免讀寫都在 Page Cache 帶來的問題。如下圖:

線程池拒絕

Broker 收到請求后,會把處理邏輯封裝成到 Runnable 中,由線程池來提交執行,如果線程池滿了就會拒絕請求(這里線程池中隊列的大小默認是 10000,可以通過參數 sendThreadPoolQueueCapacity 進行配置),線程池拒絕后會拋出異常 RejectedExecutionException,程序捕獲到異常后,會判斷是不是單向請求(OnewayRPC),如果不是,就會給 Producer 返回一個系統繁忙的狀態碼(code=2,remark="[OVERLOAD]system busy, start flow control for a while")。

判斷 OnewayRPC 的代碼如下,flag = 2 或者 3 時是單向請求:

public boolean isOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
return (this.flag & bits) == bits;
}

(3) 消息重試

Broker 發生流量控制的情況下,返回給 Producer 系統繁忙的狀態碼(code=2),Producer 收到這個狀態碼是不會進行重試的。下面是會進行重試的響應碼:

//DefaultMQProducer類
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
ResponseCode.TOPIC_NOT_EXIST,
ResponseCode.SERVICE_NOT_AVAILABLE,
ResponseCode.SYSTEM_ERROR,
ResponseCode.NO_PERMISSION,
ResponseCode.NO_BUYER_ID,
ResponseCode.NOT_IN_CURRENT_UNIT
));

二、 Consumer 流控

DefaultMQPushConsumerImpl 類中有 Consumer 流控的邏輯 。

1、 緩存消息數量超過閾值

ProcessQueue 保存的消息數量超過閾值(默認 1000,可以配置),源碼如下:

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

2、緩存消息大小超過閾值

ProcessQueue 保存的消息大小超過閾值(默認 100M,可以配置),源碼如下:

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

3、 緩存消息跨度超過閾值

對于非順序消費的場景,ProcessQueue 中保存的最后一條和第一條消息偏移量之差超過閾值(默認 2000,可以配置)。源代碼如下:

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
}

4、獲取鎖失敗

對于順序消費的情況,ProcessQueue 加鎖失敗,也會延遲拉取,這個延遲時間默認是 3s,可以配置。

三、總結

本文介紹了 RocketMQ 發生流量控制的 8 個場景,其中 Broker 4 個場景,Consumer 4 個場景。Broker 的流量控制,本質是對 Producer 的流量控制,最好的解決方法就是給 Broker 擴容,增加 Broker 寫入能力。而對于 Consumer 端的流量控制,需要解決 Consumer 端消費慢的問題,比如有第三方接口響應慢或者有慢 SQL。

在使用的時候,根據打印的日志可以分析具體是哪種情況的流量控制,并采用相應的措施。

責任編輯:姜華 來源: 君哥聊技術
相關推薦

2010-02-03 23:04:31

流量控制P2P華夏創新

2023-08-07 09:12:51

權限SpringSecurity

2023-10-08 12:14:42

Sentinel流量控制

2025-02-10 10:38:24

2022-05-02 16:18:22

RocketMQBrokertopic

2022-05-06 17:12:35

區塊鏈元宇宙

2022-05-26 00:33:29

權限TienChin項目

2013-07-22 14:25:29

iOS開發ASIHTTPRequ

2011-06-23 09:09:37

流量控制

2018-04-09 12:44:45

Docker使用場景開發

2015-01-06 09:48:34

Docker多租戶docker應用

2021-03-09 07:38:15

Percona Xtr流量控制運維

2010-06-04 10:49:58

Linux流量控制

2010-06-17 17:00:07

Linux流量控制

2024-05-13 18:33:08

SQL日期函數

2022-03-02 11:39:53

物聯網科技

2021-03-22 08:06:59

SpringBootSentinel項目

2019-07-02 10:22:15

TCP流量數據

2010-05-27 11:03:44

Linux流量控制

2009-02-05 10:13:00

局域網流量控制數據流量
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 91在线精品视频 | 日韩欧美国产一区二区 | 在线午夜 | 国产精品国产精品 | 日韩免费网站 | 欧美一区二区三区在线观看 | 国产欧美一区二区三区在线看 | 成人免费在线观看 | 日韩综合网 | 午夜激情免费视频 | 国产玖玖 | 成人免费视频观看 | 精精国产xxxx视频在线野外 | 久久国产精品一区二区 | 精品乱人伦一区二区三区 | 精品视频久久久 | 亚洲女人天堂网 | 欧美色综合一区二区三区 | 黄色日本视频 | 精品蜜桃一区二区三区 | 色婷婷综合久久久中字幕精品久久 | 国产一区二区麻豆 | av大片在线观看 | 国产一二区免费视频 | 日韩欧美一级片 | 欧美影院 | 国产精品99999 | av大全在线 | 男女免费网站 | 亚洲国产高清在线 | 午夜小电影 | 日日操日日干 | 国产在线精品一区 | 国产精品久久久久久一区二区三区 | 久久出精品 | 久久蜜桃资源一区二区老牛 | 欧美日韩一区在线播放 | 国产成人亚洲精品 | 日韩视频区 | 久久91精品国产一区二区 | 久久亚洲一区二区 |