消費(fèi)者原理分析-RocketMQ知識(shí)體系(四)
前文了解了 RocketMQ消息存儲(chǔ)的相關(guān)原理,本文將講講消息消費(fèi)的過(guò)程及相關(guān)概念。
消息消費(fèi)
關(guān)于消息消費(fèi),消費(fèi)者組這些概念,基本和kafka 是類似的,比如:
一個(gè)消費(fèi)組內(nèi)可以包含多個(gè)消費(fèi)者,1個(gè)消費(fèi)組可訂閱多個(gè)主題。消費(fèi)組之間有集群模式與廣播模式兩種。
集群模式下,主題下的同一消息只允許被消費(fèi)組內(nèi)的一個(gè)消費(fèi)者消費(fèi),消費(fèi)進(jìn)度存儲(chǔ)在 broker 端。廣播模式下,則每個(gè)消費(fèi)者都可以消費(fèi)該消息,消費(fèi)進(jìn)度存儲(chǔ)在消費(fèi)者端。
集群模式下,一個(gè)消費(fèi)隊(duì)列同一時(shí)間,只允許被一個(gè)消費(fèi)者消費(fèi),1個(gè)消費(fèi)者,可以消費(fèi)多個(gè)消息隊(duì)列。具體的可以看我前面的文章。
而且 rocketmq 消息服務(wù)器與消費(fèi)者的消息傳輸有 2 種方式:推模式、拉模式。拉模式,即消費(fèi)者主動(dòng)向消息服務(wù)器發(fā)送請(qǐng)求;推模式,即消息服務(wù)器向消費(fèi)者推送消息。推模式,是基于拉模式實(shí)現(xiàn)的。
消費(fèi)者啟動(dòng)
主要就是初始化了三個(gè)組件,然后啟動(dòng)后臺(tái)定時(shí)任務(wù)。
三個(gè)組件:
- 【RebalanceImpl】均衡消息隊(duì)列服務(wù),負(fù)責(zé)分配當(dāng)前 Consumer 可消費(fèi)的消息隊(duì)列( MessageQueue )。當(dāng)有新的 Consumer 的加入或移除,都會(huì)重新分配消息隊(duì)列。
- 【PullAPIWrapper】拉取消息組件
- 【offsetStore】消費(fèi)進(jìn)度組件
幾個(gè)定時(shí)任務(wù)
- PullMessageService
- 從阻塞隊(duì)列pullRequestQueue中獲取consumer的pull請(qǐng)求
- RebalanceService
- 負(fù)載均衡定時(shí)任務(wù),給 Consumer 分配可消費(fèi)的 MessageQueue
- fetchNameServerAddr
- 定時(shí)獲取 NameSever 地址
- updateTopicRouteInfoFromNameServer
- 定時(shí)更新Topic路由信息
- cleanOfflineBroker
- 定時(shí)清理下線Broker
- sendHeartbeatToAllBrokerWithLock
- 發(fā)送心跳
- persistAllConsumerOffset
- 持久化消費(fèi)進(jìn)度 ConsumerOffset
消息拉取
對(duì)于任何一款消息中間件而言,消費(fèi)者客戶端一般有兩種方式從消息中間件獲取消息并消費(fèi):
Pull
即消費(fèi)者每隔一定時(shí)間主動(dòng)去 Broker 拉取消息
優(yōu)點(diǎn)
消費(fèi)速度、數(shù)量可控
缺點(diǎn)
如果間隔時(shí)間短,可能會(huì)拉空,并且頻繁 RPC 請(qǐng)求增加網(wǎng)絡(luò)開(kāi)銷 如果間隔時(shí)間長(zhǎng),則可能會(huì)有消息延遲 消費(fèi)進(jìn)度offset需要consumer自己來(lái)維護(hù)
Push
即 Broker 主動(dòng)實(shí)時(shí)推送消息給消費(fèi)者
優(yōu)點(diǎn)
消息實(shí)時(shí),保持長(zhǎng)鏈接,不會(huì)頻繁建立鏈接
缺點(diǎn)
如果消息數(shù)量過(guò)大,消費(fèi)者吞吐量小,肯能會(huì)造成消費(fèi)者緩沖區(qū)溢出。
在文章的開(kāi)頭我們也說(shuō)了RocketMQ推模式,是基于拉模式實(shí)現(xiàn)的。
【PullMessageService 消息拉取】
RocketMQ 通過(guò) PullMessageService 拉取消息。
通過(guò)代碼段 PullMessageService#run可以看出:
- public void run() {
- // stopped 是 volidate 修飾的變量,用于線程間通信。
- while (!this.isStopped()) {
- // ..
- // 阻塞隊(duì)列, 如果 pullRequestQueue 沒(méi)有元素,則阻塞
- PullRequest pullRequest = this.pullRequestQueue.take();
- // 消息拉取
- this.pullMessage(pullRequest);
- // ...
- }
- }
關(guān)于PullRequest
- // 消費(fèi)者組
- private String consumerGroup;
- // 消息隊(duì)列
- private MessageQueue messageQueue;
- // 消息處理隊(duì)列,從 Broker 拉取到的消息先存入 ProcessQueue,然后再提交到消費(fèi)者消費(fèi)池消費(fèi)
- private ProcessQueue processQueue;
- // 待拉取的 MessageQueue 偏移量
- private long nextOffset;
- // 是否被鎖定
- private boolean lockedFirst = false;
PullMessageService 添加 PullRequest 有兩種方式:
延時(shí)添加
立即添加
【關(guān)于ProcessQueue】
ProcessQueue 是 MessageQueue 在消費(fèi)端的重現(xiàn)、快照。PullMessageService 從消息服務(wù)器默認(rèn)每次拉取 32 條消息,按消息的隊(duì)列偏移量順序存放在 ProcessQueue 中,PullMessageService 再將消息提交到消費(fèi)者消費(fèi)線程池。消息消費(fèi)成功后,從 ProcessQueue 中移除。
- // 讀寫(xiě)鎖
- private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
- // 消息存儲(chǔ)容器, k:消息偏移量,v:消息實(shí)體
- private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
- // ProcessQueue 中消息總數(shù)
- private final AtomicLong msgCount = new AtomicLong();
- // ProcessQueue 中消息總大小
- private final AtomicLong msgSize = new AtomicLong();
- // 當(dāng)前 ProcessQueue 中包含的最大隊(duì)列偏移量
- private volatile long queueOffsetMax = 0L;
- // 當(dāng)前 ProcessQueue 是否被丟棄
- private volatile boolean dropped = false;
- // 上一次開(kāi)始消息拉取時(shí)間戳
- private volatile long lastPullTimestamp = System.currentTimeMillis();
- // 上一次消息消費(fèi)時(shí)間戳
- private volatile long lastConsumeTimestamp = System.currentTimeMillis();
【對(duì)消息拉取進(jìn)行流量控制】
processQueue 的消息數(shù)量 大于 1000, processQueue 的消息大小 大于 100 MB,將延遲 50 毫秒后拉取消息
processQueue 中偏移量最大的消息與偏移量最小的消息的跨度超過(guò) 2000 則延遲 50 毫秒再拉取消息。
根據(jù)主題拉取訂閱的消息,如果為空,延遲 3 秒,再拉取。
【消息服務(wù)端 broker 組裝消息】
代碼位置:PullMessageProcessor#processRequest
- 根據(jù)訂閱消息,構(gòu)建消息過(guò)濾器
- 調(diào)用 MessageStore.getMessage 查找消息
- 根據(jù)主題名與隊(duì)列編號(hào)獲取消息消費(fèi)隊(duì)列
- 消息偏移量異常情況校對(duì)下一次拉取偏移量
- 根據(jù) PullRequest 填充 responseHeader 的 nextBeginOffset、minOffset、maxOffset
- 根據(jù)主從同步延遲,如果從節(jié)點(diǎn)數(shù)據(jù)包含下一次拉取的偏移量,設(shè)置下一次拉取任務(wù)的 brokerId
- 如果 commitlog 標(biāo)記可用并且當(dāng)前節(jié)點(diǎn)為主節(jié)點(diǎn),則更新消息消費(fèi)進(jìn)度
【消息拉取長(zhǎng)輪詢機(jī)制】
RocketMQ 推模式是循環(huán)向消息服務(wù)端發(fā)送消息拉取請(qǐng)求。
消費(fèi)者向 broker 拉取消息時(shí),如果消息未到達(dá)消費(fèi)隊(duì)列,并且未啟用 長(zhǎng)輪詢機(jī)制,則會(huì)在服務(wù)端等待 shortPollingTimeMills(默認(rèn)1秒) 時(shí)間后再去判斷消息是否已經(jīng)到達(dá)消息隊(duì)列,如果消息未到達(dá),則提示消息拉取客戶端 PULL_NOT_FOUND。
如果開(kāi)啟長(zhǎng)輪詢模式,rocketMQ 會(huì)每 5s 輪詢檢查一次消息是否可達(dá),同時(shí)一有新消息到達(dá)后立馬通知掛起線程再次驗(yàn)證新消息是否是自己感興趣的消息,如果是則從 commitlog 文件提取消息返回給消息拉取客戶端,否則直到掛起超時(shí),超時(shí)時(shí)間由消息拉取方在消息拉取時(shí)封裝在請(qǐng)求參數(shù)中,PUSH 模式默認(rèn) 15s。
PULL 模式通過(guò) DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis 設(shè)置。RocketMQ 通過(guò)在 Broker 端配置 longPollingEnable 為 true 來(lái)開(kāi)啟長(zhǎng)輪詢模式。
RocketMQ 的長(zhǎng)輪詢機(jī)制由 2 個(gè)線程共同完成。PullRequestHoldService、ReputMessageService。
【Push消費(fèi)模式流程簡(jiǎn)析】
后臺(tái)獨(dú)立線程RebalanceServic根據(jù)Topic中消息隊(duì)列個(gè)數(shù)和當(dāng)前消費(fèi)組內(nèi)消費(fèi)者個(gè)數(shù)進(jìn)行負(fù)載均衡,給當(dāng)前消費(fèi)者分配對(duì)應(yīng)的MessageQueue,將其封裝為PullRequest實(shí)例放入隊(duì)列pullRequestQueue中。
Consumer端開(kāi)啟后臺(tái)獨(dú)立的線程PullMessageService不斷地從隊(duì)列pullRequestQueue中獲取PullRequest并通過(guò)網(wǎng)絡(luò)通信模塊異步發(fā)送Pull消息的RPC請(qǐng)求給Broker端。這里算是比較典型的生產(chǎn)者-消費(fèi)者模型,實(shí)現(xiàn)了準(zhǔn)實(shí)時(shí)的自動(dòng)消息拉取。
PullMessageService異步拉取到消息后,通過(guò)PullCallback進(jìn)行回調(diào)處理,如果拉取成功,則更新消費(fèi)進(jìn)度,putPullRequest到阻塞隊(duì)列pullRequestQueue中,接著立即進(jìn)行拉取
監(jiān)聽(tīng)器 ConsumeMessageConcurrentlyService 會(huì)一直監(jiān)聽(tīng)回調(diào)方法 PullCallback,把拉取到的消息交給Consumerrequest進(jìn)行處理,Consumerrequest會(huì)調(diào)用消費(fèi)者業(yè)務(wù)方實(shí)現(xiàn)的consumeMessage()接口處理具體業(yè)務(wù),消費(fèi)者業(yè)務(wù)方處理完成后返回ACK給Consumerrequest,如果消費(fèi)者ACK返回的失敗,則在集群模式下把消息發(fā)回 Broker 進(jìn)行重試(廣播模型重試的成本太高),最后更新消費(fèi)進(jìn)度offsetTable
在Broker端,PullMessageProcessor業(yè)務(wù)處理器收到Pull消息的RPC請(qǐng)求后,通過(guò)MessageStore實(shí)例從commitLog獲取消息。如果第一次嘗試Pull消息失敗(比如Broker端沒(méi)有可以消費(fèi)的消息),則通過(guò)長(zhǎng)輪詢機(jī)制先hold住并且掛起該請(qǐng)求,然后通過(guò)Broker端的后臺(tái)線程PullRequestHoldService重新嘗試和后臺(tái)線程ReputMessageService進(jìn)行二次處理。
【Push消息流程圖】

RocketMQ消息消費(fèi)的長(zhǎng)輪詢機(jī)制
普通輪詢和長(zhǎng)輪詢的區(qū)別:
普通輪詢比較簡(jiǎn)單,就是定時(shí)發(fā)起請(qǐng)求,服務(wù)端收到請(qǐng)求后不論數(shù)據(jù)有沒(méi)有更新都立即返回
優(yōu)點(diǎn)就是實(shí)現(xiàn)簡(jiǎn)單,容易理解。
缺點(diǎn)就是服務(wù)端是被動(dòng)的,服務(wù)端要不斷的處理客戶端連接,并且服務(wù)端無(wú)法控制客戶端pull的頻率以及客戶端數(shù)量.
長(zhǎng)輪詢是對(duì)普通輪詢的優(yōu)化,依然由客戶端發(fā)起請(qǐng)求,服務(wù)端收到后并不立即響應(yīng)而是hold住客戶端連接,等待數(shù)據(jù)產(chǎn)生變更后(或者超過(guò)指定時(shí)間還未產(chǎn)生變更)才回復(fù)客戶端
說(shuō)白了,就是對(duì)普通輪詢加了個(gè)控制,你客戶端可以隨時(shí)請(qǐng)求我,但是回不回復(fù)我說(shuō)了算,這就保證了服務(wù)端不會(huì)被客戶端帶節(jié)奏,導(dǎo)致自己的壓力不可控.
在 RocketMq 中消費(fèi)者主動(dòng)發(fā)起pull請(qǐng)求,broker在處理消息拉取請(qǐng)求時(shí),如果沒(méi)有查詢到消息,將不返回消費(fèi)者任何信息,而是先hold住并且掛起請(qǐng)求,使其不會(huì)立即發(fā)起下一次拉取請(qǐng)求,會(huì)將請(qǐng)求信息pullRequest添加到pullRequestTable中,等待觸發(fā)通知消費(fèi)者的事件。
當(dāng)生產(chǎn)者發(fā)送最新消息過(guò)來(lái)后,首先持久化到commitLog文件,通過(guò)異步方式同時(shí)持久化consumerQueue和index。然后激活consumer發(fā)送來(lái)hold的請(qǐng)求,立即將消息通過(guò)channel寫(xiě)入consumer客戶。
如果沒(méi)有消息到達(dá)且客戶端拉取的偏移量是最新的,會(huì)hold住請(qǐng)求。其中hold請(qǐng)求超時(shí)時(shí)間 < 請(qǐng)求設(shè)定的超時(shí)時(shí)間。同時(shí)Broker端也定時(shí)檢測(cè)是否請(qǐng)求超時(shí),超時(shí)則立即將請(qǐng)求返回,狀態(tài)code為NO_NEW_MESSAGE。
然后在Broker端,通過(guò)后臺(tái)獨(dú)立線程PullRequestHoldService遍歷所有掛起的請(qǐng)求pullRequestTable,如果有消息,則返回響應(yīng)給消費(fèi)者。
同時(shí),另外一個(gè)ReputMessageService線程不斷地構(gòu)建ConsumeQueue/IndexFile數(shù)據(jù),不斷的檢測(cè)是否有新消息產(chǎn)生,如果有新消息,則從pullRequestTable通過(guò)Topic+queueId的key獲取對(duì)應(yīng)hold住的請(qǐng)求pullRequest,再根據(jù)其中的長(zhǎng)鏈接channel進(jìn)行通信響應(yīng)。
通過(guò)這種長(zhǎng)輪詢機(jī)制,即可解決Consumer端需要通過(guò)不斷地發(fā)送無(wú)效的輪詢Pull請(qǐng)求,而導(dǎo)致整個(gè)RocketMQ集群中Broker端負(fù)載很高的問(wèn)題。
流程如下:

消息隊(duì)列負(fù)載與重新分布機(jī)制
當(dāng)一個(gè)業(yè)務(wù)系統(tǒng)部署多臺(tái)機(jī)器時(shí),每臺(tái)機(jī)器都啟動(dòng)了一個(gè)Consumer,并且這些Consumer都在同一個(gè)ConsumerGroup也就是消費(fèi)組中,此時(shí)一個(gè)消費(fèi)組中多個(gè)Consumer消費(fèi)一個(gè)Topic,而一個(gè)Topic中會(huì)有多個(gè)MessageQueue。
比如有2個(gè)Consumer,3個(gè)MessageQueue,那么這3個(gè)MessageQueue怎么分配呢?這就涉及到Consumer的負(fù)載均衡了。
首先 Consumer 在啟動(dòng)時(shí),會(huì)把自己注冊(cè)給所有 Broker ,并保持心跳,讓每一個(gè) Broker 都知道消費(fèi)組中有哪些 Consumer 。
然后 Consumer 在消費(fèi)時(shí),會(huì)隨機(jī)鏈接一臺(tái) Broker ,獲取消費(fèi)組中的所有 Consumer 。
主要流程如下:

RocketMQ 消息隊(duì)列重新分布由 RebalanceService 線程來(lái)實(shí)現(xiàn)的。RebalanceService 隨著 MQClientInstance 的啟動(dòng)而啟動(dòng)。RebalanceService 默認(rèn)每 20 秒,執(zhí)行一次 MQClientInstance#doRebalance
【主題的消息隊(duì)列負(fù)載流程】
- 獲取主題的隊(duì)列,向 broker 發(fā)送請(qǐng)求,獲取主題下,消費(fèi)組所有消費(fèi)者客戶端ID。
- 只有當(dāng) 2 者均不為空時(shí),才有必要進(jìn)行 rebalance。
- 在 rebalance 時(shí),需要對(duì) 隊(duì)列,還有消費(fèi)者客戶端 ID 進(jìn)行排序,以確保同一個(gè)消費(fèi)組下的視圖是一致的。
- 根據(jù) 分配策略 AllocateMessageQueueStrategy 為 消費(fèi)者分配隊(duì)列。
客戶端執(zhí)行期間 伴隨著PullMessageService 與 RebalanceService 線程交互

消息消費(fèi)過(guò)程
【消費(fèi)過(guò)程】
- 默認(rèn)拉取32條消息,如果消息數(shù)量大于 32 則分頁(yè)處理。
- 每次進(jìn)行消費(fèi)時(shí),都會(huì)判斷 processQueue 是否被刪除,阻止消費(fèi)者 消費(fèi) 不屬于自己的 隊(duì)列
- 恢復(fù)重試消息主題名, rocketMQ 消息重試機(jī)制,決定了,如果發(fā)現(xiàn)消息的延時(shí)級(jí)別 delayTimeLevel 大于 0,會(huì)首先將重試主題存入消息的屬性中,然后設(shè)置主題名稱為 SCHEDULE_TOPIC ,以便時(shí)間到后重新參與消息消費(fèi)。
- 在消費(fèi)之前,執(zhí)行 hock
- 執(zhí)行,我們編寫(xiě)的消費(fèi)代碼
- 在消費(fèi)之后,執(zhí)行 hock
- 消費(fèi)完畢后,再次驗(yàn)證 processQueue 是否被刪除,如果被刪除,不處理結(jié)果。
- 對(duì)消費(fèi)者返回的結(jié)果,進(jìn)行處理
- 如果消費(fèi)成功,那么 ack = consumeRequest.getMsgs().size() - 1。會(huì)直接更新消費(fèi)進(jìn)度。如果消費(fèi)失敗,那么 ack = -1,重新發(fā)送消息。如果在重新發(fā)送消息時(shí),又失敗了,那么會(huì)延遲 5 秒在繼續(xù)消費(fèi)。
- 不管是消費(fèi)成功,還是失敗,都會(huì)更新消費(fèi)進(jìn)度
【消息確認(rèn)】
客戶端在發(fā)送重試消息時(shí),封裝了 ConsumerSendMsgBackRequestHeader。
- // 消息物理偏移量
- private Long offset;
- // 消費(fèi)組
- private String group;
- // 延遲等級(jí)
- private Integer delayLevel;
- // 消息ID
- private String originMsgId;
- // 消息主題
- private String originTopic;
- // 最大重新消費(fèi)次數(shù),默認(rèn) 16 次 SubscriptionGroupConfig.retryMaxTimes 中定義
- private Integer maxReconsumeTimes;
服務(wù)端的接收邏輯
- 先獲取消費(fèi)組訂閱配置信息,不存在則直接返回
- 創(chuàng)建主題:%RETRY% + group,并隨機(jī)選擇一個(gè)隊(duì)列
- 用原來(lái)的消息,創(chuàng)建一個(gè)新的消息
- 如果重試消息的最大重試次數(shù)超過(guò) 16 次(默認(rèn)),則將消息放入 %DLQ% 隊(duì)列(死信隊(duì)列)。等待人工處理
- 由 Commitlog.putMessage 存入消息。
小結(jié)
從消息消費(fèi)者和消費(fèi)者組的基本概念,到消息消費(fèi)的流程。我們了解了RocetMQ消息消費(fèi)的相關(guān)原理。消費(fèi)者客戶端的啟動(dòng)后,會(huì)后臺(tái)運(yùn)行幾個(gè)定時(shí)任務(wù)來(lái)處理相關(guān)的邏輯。也知道了RocetMQ消息獲取有推拉兩種模式,而且推模式也是建立在拉模式的基礎(chǔ)之上。知道了普通輪詢和長(zhǎng)輪詢的區(qū)別,并且了解了長(zhǎng)輪詢的實(shí)現(xiàn)邏輯。對(duì)消息消費(fèi)和確認(rèn)流程有了了解。