成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

面試官:RocketMQ 長輪詢是怎么實現的?

數據庫 其他數據庫
長輪詢可以降低無效的輪詢請求,提升請求效率。RocketMQ 消費者長輪詢支持配置,當消息量不太大,消費者沒有必要頻繁地請求,這時可以設置成長輪詢機制。需要注意的是,消費端設置的請求超時時間必須大于 Broker 輪詢時間。

大家好,我是君哥。

我們知道,消息隊列消費端獲取消息的方式包括推模式和拉模式,RocketMQ 并沒有實現推模式,RocketMQ 的推模式本質上也是拉模式。他們在實現上有下面的不同:

  • 拉模式需要開發在代碼里調用拉取消息的方法,拉取到消息后直接進行消息處理;
  • 推模式是消費者客戶端初始化時利用重平衡線程去拉取消息,拉取消息的方法會注冊回調函數,拉取到消息后,由回調函數觸發監聽器(定義處理邏輯)進行消息處理。

RocketMQ 為了提供拉取消息的效率,采用了長輪詢機制,避免消費端無效的輪詢請求。當消費者發送長輪詢請求后,如果 Broker 上沒有新消息,則不會立刻返回,而是掛起請求,等待新消息到來或者請求超時。

今天來聊一聊 RocketMQ 的長輪詢是怎么實現的。

1 長輪詢

長輪詢的流程如下圖:

圖片圖片

客戶端建立連接后,發送消息拉取請求,如果服務端有新消息,則返回消息。如果服務端沒有新消息,則掛起連接,等待新消息到來后給客戶端返回。客戶端如果連接超時,則斷開連接。

2 RocketMQ 實現

2.1 消費端

RocketMQ 消費端長輪詢有 2 個超時設置:

  • brokerSuspendMaxTimeMillis:長輪詢,Consumer 拉消息請求在 Broker 掛起超過這個時間,就會給消費端返回響應,無論有沒有新消息,單位毫秒。這個參數消費端發送拉取請求時會發給 Broker,Broker 用來判斷這個長連接是否超時。
  • consumerTimeoutMillisWhenSuspend:消費端發送拉取請求的超時時間,這個時間要大于 brokerSuspendMaxTimeMillis,客戶端初始化時會有校驗。

注意,這 2 個超時時間官方都不推薦修改。

if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) {
 throw new MQClientException(
  "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
   + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
  null);
}

2.2 Broker

RocketMQ 在 Broker 端通過設置 longPollingEnable 來開啟長輪詢,默認是開啟。

Broker 長輪詢掛起時間使用 suspendTimeoutMillis 來進行控制,前面提到過,這個時間由消費者發送的 brokerSuspendMaxTimeMillis 參數來賦值。

2.2.1 掛起消息

Broker 收到客戶端拉取消息請求后,如果沒有新消息,則將請求掛起,也就是將請求放到 pullRequestTable。

//PullMessageProcessor#processRequest
case ResponseCode.PULL_NOT_FOUND:

if (brokerAllowSuspend && hasSuspendFlag) {
//suspendTimeoutMillisLong 這個參數就是消費端發來的 consumerTimeoutMillisWhenSuspend
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
   pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
  }

  String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
  PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
   this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//這里掛起消息
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
  response = null;
break;
 }

上面的 suspendPullRequest 調用了 PullRequestHoldService#suspendPullRequest,將請求保存在 pullRequestTable。

2.2.2 處理掛起

消息掛起后,后面怎么恢復呢?這里總需要一個線程去循環處理掛起的消息,這個處理邏輯也在 PullRequestHoldService,看下面代碼:

public void run() {
 log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
   //長輪詢模式,等待 5s 后處理
   if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
    this.waitForRunning(5 * 1000);
   } //...
   //這里處理被掛起的請求
   this.checkHoldRequest();
  } catch (Throwable e) {
   log.warn(this.getServiceName() + " service has exception. ", e);
  }
 }//...
}

處理請求的邏輯參考下面代碼:

protected void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
  String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
   String topic = kArray[0];
   int queueId = Integer.parseInt(kArray[1]);
   finallong offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
   try {
    this.notifyMessageArriving(topic, queueId, offset);
   } catch (Throwable e) {
    log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
   }
  }
 }
}

notifyMessageArriving 方法邏輯如下:

  1. 如果當前請求有新消息到來,則給消費者返回響應;
  2. 如果當前請求沒有新消息,但是掛起請求已經超時,則給消費者返回響應;
  3. 否則, 繼續掛起,等待 5s 后重復執行上面邏輯。

3 總結

長輪詢可以降低無效的輪詢請求,提升請求效率。RocketMQ 消費者長輪詢支持配置,當消息量不太大,消費者沒有必要頻繁地請求,這時可以設置成長輪詢機制。需要注意的是,消費端設置的請求超時時間必須大于 Broker 輪詢時間。


責任編輯:武曉燕 來源: 君哥聊技術
相關推薦

2022-12-05 10:47:08

RocketMQ灰度消息

2024-02-04 10:08:34

2024-12-25 15:44:15

2023-02-08 07:04:20

死鎖面試官單元

2024-10-15 10:00:06

2025-02-26 12:19:52

2021-09-27 07:11:18

MySQLACID特性

2021-09-07 10:44:33

Java 注解開發

2025-04-08 00:00:00

@AsyncSpring異步

2021-02-19 10:02:57

HTTPSJava安全

2024-03-05 10:33:39

AOPSpring編程

2021-11-02 09:05:25

Redis

2025-03-07 00:00:10

2024-08-22 10:39:50

@Async注解代理

2024-02-20 14:10:55

系統緩存冗余

2024-09-11 22:51:19

線程通訊Object

2020-12-09 10:29:53

SSH加密數據安全

2023-11-20 10:09:59

2023-02-08 08:32:41

輪詢鎖

2024-02-22 15:36:23

Java內存模型線程
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产成人精品免高潮在线观看 | 欧美中文字幕一区 | 99re热精品视频| 黄视频免费在线 | 午夜亚洲 | 国产日韩精品久久 | 久色激情 | 三级成人片| 日本成人三级电影 | 久久中文字幕av | 午夜视频一区二区 | 精品欧美乱码久久久久久 | 国产精品自产av一区二区三区 | 久久久久国产一区二区三区 | 日韩淫片免费看 | 亚洲免费人成在线视频观看 | 色婷婷综合久久久中字幕精品久久 | 国产一区不卡在线观看 | 久久久久国产一级毛片 | 99精品免费 | 天天拍天天操 | 毛片网站在线观看 | 欧美日韩精品一区二区三区视频 | 亚洲国产中文字幕 | 亚洲免费视频一区二区 | 午夜a级理论片915影院 | 国产91成人| 国产精品99久久久久久动医院 | 狠狠爱免费视频 | 韩国成人在线视频 | 日韩在线视频一区二区三区 | 亚洲欧美高清 | 日本黄色片免费在线观看 | 日韩中文字幕网 | 国产色在线 | 中文字幕国 | 久久国产99| 欧美黄在线观看 | 最新黄色在线观看 | 91精品国产91久久久久久最新 | 国产一区二区 |