RocketMQ 常見問題與深度解析
在當(dāng)今的分布式系統(tǒng)架構(gòu)中,消息隊(duì)列作為解耦、異步處理和流量削峰的核心組件,扮演著至關(guān)重要的角色。而RocketMQ作為阿里巴巴開源的一款高性能、高可用的分布式消息中間件,憑借其強(qiáng)大的功能和廣泛的應(yīng)用場景,成為眾多企業(yè)和開發(fā)者關(guān)注的焦點(diǎn)。
無論是面試中還是實(shí)際工作中,掌握RocketMQ的核心原理和常見問題都顯得尤為重要。本文將從基礎(chǔ)到高級(jí),全面總結(jié)RocketMQ的面試高頻問題,并結(jié)合實(shí)際應(yīng)用場景和源碼解析,幫助讀者深入理解其設(shè)計(jì)思想和實(shí)現(xiàn)機(jī)制。無論你是準(zhǔn)備面試的技術(shù)人,還是希望提升對(duì)RocketMQ理解的開發(fā)者,相信這篇文章都能為你提供有價(jià)值的參考。
接下來,我們將從RocketMQ的基本概念、核心組件、消息存儲(chǔ)與消費(fèi)、高可用性設(shè)計(jì)以及性能優(yōu)化等方面展開詳細(xì)解析,助你輕松應(yīng)對(duì)面試中的各種挑戰(zhàn)!
RocketMQ的架構(gòu)是怎么樣的
整體來說,RocketMQ是由如下幾個(gè)部分組成:
producer:投遞消息的生產(chǎn)者,主要負(fù)責(zé)將消息投遞到broker上。
- broker:可以理解為消息中轉(zhuǎn)服務(wù)器,主要負(fù)責(zé)消息存儲(chǔ),以及消息路由的轉(zhuǎn)發(fā),RocketMQ支持多個(gè)broker構(gòu)成集群,每個(gè)broker都有獨(dú)立的存儲(chǔ)空間和隊(duì)列。
- consumer:訂閱topic并從broker中獲得消息并消費(fèi)。
- nameserver:提供服務(wù)發(fā)現(xiàn)和路由功能,負(fù)責(zé)維護(hù)broker的數(shù)據(jù)信息,包括broker地址、topic和queue等,對(duì)應(yīng)的producer和consumer在啟動(dòng)時(shí)都需要通過nameserver獲取broker的地址信息。
- topic:消費(fèi)主題,對(duì)于消息的邏輯分類的單位,producer消息都會(huì)發(fā)送到特定的topic上,對(duì)應(yīng)的consumer就會(huì)從這些topic拿到消費(fèi)的消息。
RocketMQ的事務(wù)消息是如何實(shí)現(xiàn)的
大體是通過half消息完成,大體工作流程為:
- 生產(chǎn)者即應(yīng)用程序像mq的broker發(fā)送一條half消息,mq收到該消息后在事務(wù)消息日志中將其標(biāo)記為prepared狀態(tài),然后回復(fù)ack確認(rèn)。
- 生產(chǎn)者執(zhí)行本地事務(wù),將事務(wù)執(zhí)行結(jié)果發(fā)送提交指令告知mq可以提交事務(wù)消息給消費(fèi)者。
- mq若收到提交通知后,將消息從prepared改為commited,然后將消息提交給消費(fèi)者,當(dāng)然如果mq長時(shí)間沒有收到提交通知?jiǎng)t發(fā)送回查給生產(chǎn)者詢問該事務(wù)的執(zhí)行情況。
- 基于事務(wù)結(jié)果若成功則將事務(wù)消息提交給消費(fèi)者,反之回滾該消息即將消息設(shè)置為rollback并將該消息從消息日志中刪除,從而保證消息不被消費(fèi)。
RocketMQ如何保證消息的順序性
針對(duì)保證消費(fèi)順序性的問題,我們可以基于下面這樣的一個(gè)場景來分析,假設(shè)我們有一個(gè)下單請求,生產(chǎn)者要求按需投遞下面這些消息讓消費(fèi)者消費(fèi):
- 創(chuàng)建訂單
- 用戶付款
- 庫存扣減
同理消費(fèi)者也得嚴(yán)格按照這個(gè)順序完成消費(fèi),此時(shí)如果按照簡單維度的架構(gòu)來說,我們可以全局設(shè)置一個(gè)topic讓生產(chǎn)者準(zhǔn)確有序的投遞每一個(gè)消息,然后消費(fèi)者準(zhǔn)確依次消費(fèi)消息即可,但是這樣做對(duì)于并發(fā)的場景下性能表現(xiàn)就會(huì)非常差勁:
為了適當(dāng)提升兩端性能比對(duì)消息堆積,我們選擇增加隊(duì)列用多個(gè)隊(duì)列處理這個(gè)原子業(yè)務(wù):
有了這樣的架構(gòu)基礎(chǔ),我們就需要考慮生產(chǎn)者和消費(fèi)者的有序生產(chǎn)和有序消費(fèi)的落地思路了,先來說說生產(chǎn)者有序投遞,這樣做比較簡單,我們可以直接通過訂單號(hào)進(jìn)行hash并結(jié)合topic隊(duì)列數(shù)進(jìn)行取模的方式將一個(gè)訂單的創(chuàng)建、余額扣減、庫存扣減的消息有序投遞到某個(gè)隊(duì)列中,這樣就能保證單個(gè)訂單的業(yè)務(wù)消息有序性:
對(duì)應(yīng)的我們也給出生產(chǎn)者的代碼使用示例:
//基于訂單號(hào)orderNo進(jìn)行哈希取模發(fā)送訂單消息
Message<Order> message = MessageBuilder.withPayload(order).build();
rocketMQTemplate.syncSendOrderly("ORDER_ADD", message, order.getOrderNo());
這塊哈希取模的實(shí)現(xiàn)可以從底層源碼DefaultMQProducerImpl的sendSelectImpl看到,它會(huì)將arg(也就是我們的orderNo)通過selector的select進(jìn)行運(yùn)算獲得單topic下的某個(gè)隊(duì)列:
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//......
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
//傳入arg也就是我們的orderNo基于selector算法進(jìn)行哈希取模
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
//......
}
//......
}
這個(gè)調(diào)用會(huì)來到SelectMessageQueueByHash的select,從源碼可以看出這塊代碼看出,它的算法就是通過參數(shù)哈希運(yùn)算后結(jié)合隊(duì)列數(shù)(默認(rèn)為4)進(jìn)行取模:
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//數(shù)值哈希運(yùn)算
int value = arg.hashCode();
//......
//結(jié)合隊(duì)列數(shù)取模得到隊(duì)列返回
value = value % mqs.size();
return mqs.get(value);
}
}
消費(fèi)者端就比較簡單了,consumeMode指定為有序消費(fèi)即可:
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
topic = "ORDER_ADD",
consumeMode = ConsumeMode.ORDERLY//同一個(gè)topic下同一個(gè)隊(duì)列只有一個(gè)消費(fèi)者線程消費(fèi)
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到訂單,訂單信息:[{}],進(jìn)行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSONUtil.toJsonStr(order));
}
}
這里我們也基于源碼解析一下這個(gè)有序消費(fèi)的實(shí)現(xiàn),本質(zhì)上消費(fèi)者啟動(dòng)的時(shí)候會(huì)開啟一個(gè)定時(shí)任務(wù)嘗試獲取分布式上鎖隊(duì)列信息的執(zhí)行如下步驟:
- 獲取知道的broker及其隊(duì)列。
- 獲取對(duì)應(yīng)broker的master地址。
- 發(fā)送請求到服務(wù)端詢問master獲取所有隊(duì)列的分布式鎖。
- 基于請求結(jié)果獲取查看那些隊(duì)列上鎖成功。
- 更新本地結(jié)果。
完成后,消費(fèi)者就拉取到全局可唯一消費(fèi)的隊(duì)列信息,因?yàn)槊總€(gè)消費(fèi)者都是基于多線程執(zhí)行,所以為了保證本地多線程消費(fèi)有序性,每一個(gè)線程進(jìn)行消費(fèi)時(shí)都會(huì)以消息隊(duì)列messageQueue作為key用synchronized上鎖后才能消費(fèi)。
代碼如下所示,可以看到上鎖成功后就會(huì)執(zhí)行messageListener.consumeMessage方法,該方法就會(huì)走到我們上文中聲明的監(jiān)聽上了:
public void run() {
//......
//消費(fèi)請求線程消費(fèi)前會(huì)獲取消息隊(duì)列鎖
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
//上鎖
synchronized (objLock) {
//......
//將消息發(fā)送給實(shí)現(xiàn)有序監(jiān)聽的監(jiān)聽器線程
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
//......
} finally {
//......
}
//......
}
RocketMQ有幾種集群方式
- 多master模式:該模式僅由多master構(gòu)成,配置比較簡單,單個(gè)master宕機(jī)或重啟對(duì)于應(yīng)用全局沒有任何影響,尤其在磁盤為RAID10的情況下,即使服務(wù)器不可恢復(fù),只要我們使用同步刷盤策略,基本上消息都不會(huì)丟失,而缺點(diǎn)也非常明顯,單臺(tái)機(jī)器宕機(jī)期間,這臺(tái)機(jī)器未被消費(fèi)的消息在恢復(fù)之前不可訂閱,消息的實(shí)時(shí)性會(huì)受影響。
- 多master-多slave異步復(fù)制:多個(gè)master和多個(gè)slave構(gòu)成,即使master宕機(jī)后slave依然可以對(duì)外提供服務(wù),所以消息實(shí)時(shí)性不會(huì)受影響,缺點(diǎn)是主從復(fù)制是異步的,如果master宕機(jī)時(shí)同步的消息可能丟失部分,且沒有解決slave自動(dòng)切換為master。
- 多master-多slave同步復(fù)制:上者的優(yōu)化版即同步策略采用同步的方式,保證在RAID10的情況下消息基本不丟失,但因采用的是同步復(fù)制,所以發(fā)送單個(gè)消息的RT可能略高,且同樣沒有解決slave自動(dòng)切換為master。
- Dledger模式:該集群模式要求至少由3個(gè)broker構(gòu)成,即一個(gè)master必須對(duì)應(yīng)兩個(gè)slave,一旦某個(gè)master宕機(jī)后通過raft一致性算法選舉新的master對(duì)外提供服務(wù)。具體實(shí)踐可以參考:https://rocketmq.apache.org/zh/docs/bestPractice/02dledger/
RocketMQ消息堆積了怎么解決
消息堆積的原因比較多,大體是客戶端隊(duì)列并發(fā)度不夠或者客戶端消費(fèi)能力不足,所以我們可以針對(duì)以下幾個(gè)角度進(jìn)行針對(duì)性的優(yōu)化:
- 增加消費(fèi)者實(shí)例:如果是消費(fèi)速度過慢導(dǎo)致的消息堆積,則建議增加消費(fèi)者數(shù)量,讓更多的實(shí)例來消費(fèi)這些消息。
- 提升消費(fèi)者消費(fèi)速度:如果是消息消費(fèi)處理耗時(shí)長,則針對(duì)性的業(yè)務(wù)流程調(diào)優(yōu),例如引入線程池、本地消息存儲(chǔ)后立即返回成功等方式提前消息進(jìn)行批量的預(yù)消化。
- 降低生產(chǎn)者速度:如果生產(chǎn)端可控且消費(fèi)者已經(jīng)沒有調(diào)優(yōu)的空間時(shí),我們建議降低生產(chǎn)者生產(chǎn)速度。
- 清理過期消息:對(duì)于一些過期且一直無法處理成功的消息,在進(jìn)行業(yè)務(wù)上的評(píng)估后,我們可以針對(duì)性的進(jìn)行清理。
- 增加topic隊(duì)列數(shù):如果是因?yàn)殛?duì)列少導(dǎo)致并發(fā)度不夠可以考慮增加一下消費(fèi)者隊(duì)列,來提升消息隊(duì)列的并發(fā)度。
- 參數(shù)調(diào)優(yōu):我們可以針對(duì)各個(gè)節(jié)點(diǎn)耗時(shí)針對(duì):消費(fèi)模式、消息拉取間隔等參數(shù)進(jìn)行優(yōu)化。
RocketMQ的工作流程詳解
上文已經(jīng)介紹了幾個(gè)基本的概念,我們這里直接將其串聯(lián)起來:
- 啟動(dòng)nameServer,等待broker、producer和consumer的接入。
- 啟動(dòng)broker和nameserver建立連接,并定時(shí)發(fā)送心跳包,心跳包中包含broker信息(ip、端口號(hào)等)以及topic以及broker與topic的映射關(guān)系。
- 啟動(dòng)producer,producer啟動(dòng)時(shí)會(huì)隨機(jī)通過nameserver集群中的一臺(tái)建立長連接,并從中獲取發(fā)送的topic和所有broker地址信息,基于這些信息拿到topic對(duì)應(yīng)的隊(duì)列,與隊(duì)列所在的broker建立長連接,自此開始消息發(fā)送。
- broker接收producer發(fā)送的消息時(shí),會(huì)根據(jù)配置同步和刷盤策略進(jìn)行狀態(tài)回復(fù):
1. 若為同步復(fù)制則master需要復(fù)制到slave節(jié)點(diǎn)后才能返回寫狀態(tài)成功
2. 若配置同步刷盤,還需要基于上述步驟再將數(shù)據(jù)寫入磁盤才能返回寫成功
3. 若是異步刷盤和異步復(fù)制,則消息一到master就直接回復(fù)成功
- 啟動(dòng)consumer,和nameserver建立連接然后訂閱信息,然后對(duì)感興趣的broker建立連接,獲取消息并消費(fèi)。
RocketMQ的消息是采用推模式還是采用拉模式
消費(fèi)模式分為3種:
- push:服務(wù)端主動(dòng)推送消息給客戶端。
- pull:客戶端主動(dòng)到服務(wù)端輪詢獲取數(shù)據(jù)。
- pop:5.0之后的新模式,后文會(huì)介紹。
總的來說push模式是性能比較好,但是客戶端沒有做好留空,可能會(huì)出現(xiàn)大量消息把客戶端打死的情況。 而poll模式同理,頻繁拉取服務(wù)端可能會(huì)造成服務(wù)器壓力,若設(shè)置不好輪詢間隔,可能也會(huì)出現(xiàn)消費(fèi)不及時(shí)的情況,
整體來說RocketMQ本質(zhì)上還是采用pull模式,具體后文會(huì)有介紹。
用了RocketMQ一定能做到削峰嗎
削峰本質(zhì)就是將高并發(fā)場景下短時(shí)間涌入的消息平攤通過消息隊(duì)列構(gòu)成緩沖區(qū)然后平攤到各個(gè)時(shí)間點(diǎn)進(jìn)行消費(fèi),從而實(shí)現(xiàn)平滑處理。
這也不意味著用mq就一定可以解決問題,假如用push模式,這就意味著你的消息都是mq立即收到立即推送的,本質(zhì)上只是加了一個(gè)無腦轉(zhuǎn)發(fā)的中間層,并沒有實(shí)際解決問題。 所以要想做到削峰,就必須用拉模式,通過主動(dòng)拉去保證消費(fèi)的速度,讓消息堆積在mq隊(duì)列中作為緩沖。
常見消息隊(duì)列的消息模型有哪些?RocketMQ用的是那種消息模型
消息隊(duì)列的消息模型有兩種,一種是隊(duì)列模型,生產(chǎn)者負(fù)責(zé)把消息扔到消息隊(duì)列中,消費(fèi)者去消息隊(duì)列中搶消息,消息只有一個(gè),先到者先得:
還有一種就是發(fā)布/訂閱模型了,發(fā)布訂閱模型的消息只要消費(fèi)者有訂閱就能消費(fèi)消息:
RocketMQ是支持發(fā)布訂閱模式的,如下所示,筆者又新建一個(gè)監(jiān)聽者,用的是不同的消費(fèi)者組consumerGroup,運(yùn)行時(shí)即可看到兩組訂閱的消費(fèi)者消費(fèi)一份消息:
@Component
@RocketMQMessageListener(consumerGroup = "gourp2", topic = "ORDER_ADD")
public class OrderMqListener2 implements RocketMQListener<Order> {
private static Logger logger = LoggerFactory.getLogger(OrderMqListener2.class);
@Override
public void onMessage(Order order) {
logger.info("訂閱者2收到消息,訂單信息:[{}],進(jìn)行新春福利活動(dòng).....", JSON.toJSONString(order));
}
}
RocketMQ消息的消費(fèi)模式有哪些
有兩種消費(fèi)模式:
集群消費(fèi):這種是RocketMQ默認(rèn)模式,一個(gè)主題下的多個(gè)隊(duì)列都會(huì)被消費(fèi)者組中的某個(gè)消費(fèi)者消費(fèi)掉。
廣播消費(fèi):廣播消費(fèi)模式會(huì)讓每個(gè)消費(fèi)者組中的每個(gè)消費(fèi)者都能使用這個(gè)消息。
如何保證消息可用性和可靠性呢?
這個(gè)問題我們要從3個(gè)角度考慮:
對(duì)于生產(chǎn)階段,生產(chǎn)者發(fā)送消息要想確??煽勘仨氉裱韵?點(diǎn):
- 沒有發(fā)送成功就需要進(jìn)行重試:
SendResult result = producer.send(message);
if (!"SEND_OK".equals(result.getSendStatus().name())){
logger.warn("消息發(fā)送失敗,執(zhí)行重試的邏輯");
}
- 如果發(fā)送超時(shí),我們可以從日志相關(guān)API中查看是否存到Broker中。
- 如果是異步消息,則需要到回調(diào)接口中做相應(yīng)處理。
針對(duì)存儲(chǔ)階段,存儲(chǔ)階段要保證可靠性就需要從以下幾個(gè)角度保證:
- 開啟主從復(fù)制模式,使得Master掛了還有Slave可以用。
- 為了確保發(fā)送期間服務(wù)器宕機(jī)的情況,我們建議刷盤機(jī)制改為同步刷盤,確保消息發(fā)送并寫到CommitLog中再返回成功。
這里補(bǔ)充一下同步刷盤和異步刷盤的區(qū)別:
- 同步刷盤,生產(chǎn)者投遞的消息持久化時(shí)必須真正寫到磁盤才會(huì)返回成功,可靠性高,但是因?yàn)镮O問題會(huì)使得組件處理效率下降。
- 異步刷盤,如下圖所示,可以僅僅是存到page cache即可返回成功,至于何時(shí)持久化操磁盤由操作系統(tǒng)后臺(tái)異步的頁緩存置換算法決定。
對(duì)應(yīng)的刷盤策略,我們只需修改broker.conf的配置文件即可:
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
對(duì)于消費(fèi)階段,消費(fèi)者編碼邏輯一定要確保消費(fèi)成功了再返回消費(fèi)成功:
consumer.registerMessageListener((List<MessageExt> msgs,
ConsumeConcurrentlyContext context) -> {
String msg = new String(msgs.stream().findFirst().get().getBody());
logger.info("消費(fèi)收到消息,消息內(nèi)容={}", msg);
//消費(fèi)完全成功再返回成功狀態(tài)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
如果避免消息重復(fù)消費(fèi)問題(重點(diǎn))
這個(gè)我們可以分不同的情況討論,有些場景下,我們只需保證業(yè)務(wù)冪等即可,例如:我們需要給訂單服務(wù)發(fā)送一個(gè)用戶下單成功的消息,無論發(fā)送多少次訂單服務(wù)只是將訂單表狀態(tài)設(shè)置成已完成。
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
public class OrderMqListener implements RocketMQListener<Order> {
private static Logger logger = LoggerFactory.getLogger(OrderMqListener.class);
@Override
public void onMessage(Order order) {
logger.info("消費(fèi)者收到訂單,訂單信息:[{}],進(jìn)行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSON.toJSONString(order));
updateOrderFinish(order);
}
private void updateOrderFinish(Order order){
logger.info("執(zhí)行dao層邏輯,將訂單設(shè)置下單完成,無論多少次,執(zhí)行到這個(gè)消費(fèi)邏輯都是將訂單設(shè)置為處理完成");
}
}
還有一種方式就是業(yè)務(wù)去重,例如我們現(xiàn)在要?jiǎng)?chuàng)建訂單,每次訂單創(chuàng)建完都會(huì)往一張記錄消費(fèi)信息表中插入數(shù)據(jù)。一旦我們收到重復(fù)的消息,只需帶著唯一標(biāo)識(shí)去數(shù)據(jù)庫中查,如果有則直接返回成功即可:
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
public class OrderMqListener implements RocketMQListener<Order> {
//......
@Override
public void onMessage(Order order) {
logger.info("消費(fèi)者收到訂單,訂單信息:[{}],進(jìn)行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSON.toJSONString(order));
//消費(fèi)者消費(fèi)時(shí)判斷訂單是否存在,如果存在則直接返回
if (isExist(order)){
return;
}
updateOrderFinish(order);
}
}
延時(shí)消息底層是怎么實(shí)現(xiàn)
我們都知道投遞消息到消息隊(duì)列的時(shí)候,消息都會(huì)寫入到commitLog上,在此之前MQ會(huì)檢查當(dāng)前消息延遲等級(jí)是否大于0,如果是則說明該消息是延遲消息,則會(huì)將其topic設(shè)置為RMQ_SYS_SCHEDULE_TOPIC并基于延遲等級(jí)獲取對(duì)應(yīng)的隊(duì)列,最后基于零拷貝的方式寫入磁盤,注意此時(shí)消息還不可被消費(fèi):
對(duì)此我們也給出這段投遞消息的源碼,即位于CommitLog的asyncPutMessage異步投遞消息的方法:
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
//......
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
//如果大于0則說明是延遲消息
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//設(shè)置topic設(shè)置為SCHEDULE_TOPIC_XXXX
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
//基于等級(jí)獲取延遲消息隊(duì)列
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
//......
//基于上述設(shè)置topic和隊(duì)列信息
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
//......
//基于零拷貝的方式添加
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
//......
}
MQ啟動(dòng)的時(shí)候底層的消息調(diào)度服務(wù)會(huì)基于延遲消息的等級(jí)初始化幾個(gè)任務(wù),這些任務(wù)會(huì)基于定時(shí)的間隔檢查是否有到期的消息到來,如果到期則將其投遞到真正topic的隊(duì)列中供消費(fèi)者消費(fèi):
基于此邏輯我們也給出ScheduleMessageService的start方法查看調(diào)度器的初始化邏輯,可以看到初始化階段,它會(huì)遍歷所有延遲級(jí)別并為其初始化一個(gè)定時(shí)任務(wù):
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
//遍歷所有延遲級(jí)別
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
//設(shè)置timer定時(shí)器
if (timeDelay != null) {
//投遞給定時(shí)器對(duì)應(yīng)等級(jí)的定時(shí)任務(wù)
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
//......
}
查看DeliverDelayedMessageTimerTask的核心邏輯即run方法,也就是我們所說的定時(shí)檢查是否有到期消息,若存在則將其存入原本的topic上,消費(fèi)者就可以消費(fèi)了:
@Override
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
//......
}
}
public void executeOnTimeup() {
//基于topic和隊(duì)列id獲取延遲隊(duì)列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
//根據(jù)偏移量獲取有效消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
//......
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
//......
long now = System.currentTimeMillis();
//計(jì)算可消費(fèi)時(shí)間
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
//如果小于0說明可消費(fèi)
if (countdown <= 0) {
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
//清除延遲級(jí)別恢復(fù)到真正的topic和隊(duì)列id
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
//......
//放到消息隊(duì)列上
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
//......
} else {
//......
} catch (Exception e) {
//......
} else {
//......
}
} // end of for
//......
}
什么是死信隊(duì)列
通俗來說一個(gè)消息消費(fèi)失敗并重試達(dá)到最大次數(shù)后,MQ就會(huì)將其放到死信隊(duì)列中。超過三天該消息就會(huì)被銷毀。 需要補(bǔ)充的時(shí)死信隊(duì)列是針對(duì)一個(gè)group id為單位創(chuàng)建的隊(duì)列,如果一個(gè)gourp中都沒有死信的話,那么MQ就不會(huì)為這個(gè)組創(chuàng)建死信隊(duì)列。
Broker是進(jìn)行消息持久化的
要想了解Broker如何保存數(shù)據(jù),我們必須了解RocketMQ三大文件:
首先是commitlog,producer發(fā)送的消息最終都會(huì)通過刷盤機(jī)制存到commitlog文件夾下。commitlog下一個(gè)文件名為00000000000000000000一旦寫滿,就會(huì)再創(chuàng)建一個(gè)文件寫,一般來說第二個(gè)文件名為00000000001073741824,名稱即是第一個(gè)文件的字節(jié)數(shù)。文件大小一般是1G:
然后是consumequeue文件夾,這個(gè)文件夾下記錄的都是commitlog中每個(gè)topic下的隊(duì)列信息物理偏移量、消息大小、hashCode值,如下圖,consumequeue文件夾下會(huì)為每個(gè)topic創(chuàng)建一個(gè)文件夾:
打開任意一個(gè)文件夾就會(huì)看到這樣一個(gè)名為00000000000000000000的文件:
而這個(gè)文件內(nèi)部最多維護(hù)30w個(gè)條目,注意文件中每個(gè)條目大約20字節(jié),8字節(jié)代表當(dāng)前消息在commitLog中的偏移量,4字節(jié)存放消息大小,8字節(jié)存放tag和hashCode的值。
最后就算index,維護(hù)消息的索引,基于HashMap結(jié)構(gòu),這個(gè)文件使得我們可以通過key或者時(shí)間區(qū)間查詢消息:
文件名基本用時(shí)間戳生成的,大小一般為400M,差不多維護(hù)2000w個(gè)索引:
簡單小結(jié)一下RocketMQ持久化的物理文件:MQ會(huì)為每個(gè)broker維護(hù)一個(gè)commitlog,一旦文件存放到commitlog,消息就不會(huì)丟失。當(dāng)無法拉取消息時(shí),broker允許producer在30s內(nèi)發(fā)送一個(gè)消息,然后直接給消費(fèi)者消費(fèi)。
后兩個(gè)索引文件的維護(hù)是基于一個(gè)線程ReputMessageService進(jìn)行異步維護(hù)consumeQueue(邏輯消費(fèi)隊(duì)列)和IndexFile(索引文件)數(shù)據(jù):
RocketMQ如何進(jìn)行文件讀寫的呢?
對(duì)于讀寫IO處理有以下兩種:
- pageCache:在RocketMQ中,ConsumeQueue存儲(chǔ)數(shù)據(jù)較少,并且是順序讀取,在pageCache預(yù)讀的機(jī)制下讀取速率是非??陀^的(即使有大量的消息堆積)。操作系統(tǒng)會(huì)將一部分內(nèi)存用作pageCache,當(dāng)數(shù)據(jù)寫入磁盤會(huì)先經(jīng)過pageCache然后通過內(nèi)核線程pdflush寫入物理磁盤。 針對(duì)ConsumeQueue下關(guān)于消息索引的數(shù)據(jù)查詢時(shí),會(huì)先去pageCache查詢是否有數(shù)據(jù),若有則直接返回。若沒有則去ConsumeQueue文件中讀取需要的數(shù)據(jù)以及這個(gè)數(shù)據(jù)附近的數(shù)據(jù)一起加載到pageCache中,這樣后續(xù)的讀取就是走緩存,效率自然上去了,這種磁盤預(yù)讀目標(biāo)數(shù)據(jù)的附近數(shù)據(jù)就是我們常說的局部性原理。而commitLog隨機(jī)性比較強(qiáng)特定情況下讀寫性能會(huì)相對(duì)差一些,所以在操作系統(tǒng)層面IO讀寫調(diào)度算法可以改為deadline并選用SSD盤以保證操作系統(tǒng)在指定時(shí)間完成數(shù)據(jù)讀寫保證性能。
- 零拷貝技術(shù):這是MQ基于NIO的FileChannel模型的一種直接將物理文件映射到用戶態(tài)內(nèi)存地址的一種技術(shù),通過MappedByteBuffer,它的工作機(jī)制是直接建立內(nèi)存映射,文件數(shù)據(jù)并沒有經(jīng)過JVM和操作系統(tǒng)直接復(fù)制的過程,相當(dāng)于直接操作內(nèi)存,所以效率就非常高??梢詤⒖? 能不能給我講講零拷貝:https://mp.weixin.qq.com/s/zS2n2a4h3YQifBYKFgcUCA
消息刷盤如何實(shí)現(xiàn)呢?
兩種方式分別是同步刷盤和異步刷盤
- 同步刷盤: producer發(fā)送的消息經(jīng)過broker后必須寫入到物理磁盤commitLog后才會(huì)返回成功。
- 異步刷盤:producer發(fā)送的消息到達(dá)broker之后,直接返回成功,刷盤的邏輯交給一個(gè)異步線程實(shí)現(xiàn)。
而上面說的刷盤都是通過MappedByteBuffer.force() 這個(gè)方法完成的,需要補(bǔ)充異步刷盤是通過一個(gè)異步線程FlushCommitLogService實(shí)現(xiàn)的,其底層通過MappedFileQueue針對(duì)內(nèi)存中的隊(duì)列消息調(diào)用flush進(jìn)行刷盤從而完成消息寫入:
public boolean flush(final int flushLeastPages) {
boolean result = true;
//拉取文件處理偏移量信息
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
//基于mmap零拷貝技術(shù)進(jìn)行刷盤
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
//如果刷盤后的進(jìn)度和預(yù)期一樣說明刷盤成功
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
//維護(hù)處理時(shí)間
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
RocketMQ負(fù)載均衡
MQ中負(fù)載均衡的主要是體現(xiàn)在生產(chǎn)者端和消費(fèi)者端,Producer負(fù)載均衡算法在上述中有序消費(fèi)中的源碼已經(jīng)說明,這里就不多做贅述,本質(zhì)上就是通過底層的selector進(jìn)行輪詢投遞:
Message<Order> message = MessageBuilder.withPayload(order).build();
rocketMQTemplate.syncSendOrderly("ORDER_ADD", message, order.getOrderNo());
再來consumer負(fù)載均衡算法,mq客戶端啟動(dòng)時(shí)會(huì)開啟一個(gè)負(fù)載均衡服務(wù)執(zhí)行負(fù)載均衡隊(duì)列輪詢邏輯,通過負(fù)載均衡算法得出每個(gè)消費(fèi)者應(yīng)該處理的隊(duì)列信息后生產(chǎn)拉取消息的請求,交由有MQ客戶端去拉取消息:
默認(rèn)情況下,負(fù)載均衡算法選定隊(duì)列后拉取消息進(jìn)行消費(fèi),默認(rèn)情況下它會(huì)根據(jù)隊(duì)列數(shù)和消費(fèi)者數(shù)決定如何進(jìn)行負(fù)載分擔(dān),按照平均算法:
- 如果消費(fèi)者數(shù)大于隊(duì)列數(shù),則將隊(duì)列分配給有限的幾個(gè)消費(fèi)者。
- 如果消費(fèi)者數(shù)小于隊(duì)列數(shù),默認(rèn)情況下會(huì)按照隊(duì)列數(shù)/消費(fèi)者數(shù)取下限+1進(jìn)行分配,例如隊(duì)列為4,消費(fèi)者為3,那么每個(gè)消費(fèi)者就會(huì)拿到2個(gè)隊(duì)列,其中第三個(gè)消費(fèi)者則沒有處理任何數(shù)據(jù)。
對(duì)應(yīng)的我們給出MQ客戶端初始化的代碼RebalanceService的run方法,可以看到它會(huì)調(diào)用mqClientFactory執(zhí)行負(fù)載均衡方法doRebalance:
@Override
public void run() {
//.......
while (!this.isStopped()) {
//.......
//客戶端執(zhí)行負(fù)載均衡
this.mqClientFactory.doRebalance();
}
//.......
}
步入其內(nèi)部邏輯會(huì)走到RebalanceImpl的doRebalance,它遍歷每個(gè)topic進(jìn)行負(fù)載均衡運(yùn)算:
public void doRebalance(final boolean isOrder) {
//......
if (subTable != null) {
//遍歷每個(gè)topic
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//計(jì)算該topic中當(dāng)前消費(fèi)者要處理的隊(duì)列
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
//......
}
}
}
//......
}
最終我們來到了核心邏輯rebalanceByTopic方法,可以看到它會(huì)基于我們查到的topic的隊(duì)列和消費(fèi)者通過策略模式找到對(duì)應(yīng)的消息分配策略AllocateMessageQueueStrategy從而算得當(dāng)前消費(fèi)者需要處理的隊(duì)列,然后在基于這份結(jié)果調(diào)用updateProcessQueueTableInRebalance生成pullRequest告知客戶端為該消費(fèi)者拉取消息:
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
//......
}
case CLUSTERING: {
//根據(jù)主題獲取消息隊(duì)列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//根據(jù) topic 與 consumerGroup 獲取所有的 consumerId
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
//......
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//// 排序后才能保證消費(fèi)者負(fù)載策略相對(duì)穩(wěn)定
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//按負(fù)載策略進(jìn)行分配,返回當(dāng)前消費(fèi)者實(shí)際訂閱的messageQueue集合
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
//......
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
//......
break;
}
default:
break;
}
}
對(duì)應(yīng)的我們也給出負(fù)載均衡算法AllocateMessageQueueAveragely的源碼,大體算法和筆者上述說明的基本一致,讀者可以參考上圖講解了解一下:
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
//......
//獲取消費(fèi)者id對(duì)應(yīng)的索引
int index = cidAll.indexOf(currentCID);
//基于隊(duì)列總數(shù)和客戶端總數(shù)進(jìn)行取模
int mod = mqAll.size() % cidAll.size();
/**
*計(jì)算每個(gè)消費(fèi)者的可消費(fèi)的平均值:
* 1. 如果消費(fèi)者多于隊(duì)列就取1
* 2. 如果消費(fèi)者少于隊(duì)列就按照取模結(jié)果來計(jì)算
*/
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
//基于當(dāng)前客戶端的索引定位其處理的隊(duì)列位置
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
//獲取消費(fèi)者的隊(duì)列消費(fèi)范圍
int range = Math.min(averageSize, mqAll.size() - startIndex);
//遍歷隊(duì)列存入結(jié)果集
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
完成后基于這份結(jié)果生成pullRequest存入pullRequestQueue中:
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
//......
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
//......
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
//......
} else {
//生成pullRequest
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
//存入pullRequestQueue中
this.dispatchPullRequest(pullRequestList);
return changed;
}
最后消費(fèi)者的PullMessageService這個(gè)線程就會(huì)從隊(duì)列中取出該請求向MQ發(fā)起消息拉取請求:
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//獲取一個(gè)拉消息的請求pullRequest
PullRequest pullRequest = this.pullRequestQueue.take();
//拉消息
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
RocketMQ消息長輪詢
消費(fèi)者獲取消息大體有兩種方案:
- 消息隊(duì)列主動(dòng)push:由消息隊(duì)列主動(dòng)去推送消息給消費(fèi)者,高并發(fā)場景下,對(duì)于服務(wù)端性能開銷略大。
- 消費(fèi)者定期pull消息:由客戶端主動(dòng)去拉取消息,但是需要客戶端設(shè)置好拉取的間隔,太頻繁對(duì)于消息隊(duì)列開銷還是很大,間隔太長消息實(shí)時(shí)性又無法保證。
對(duì)此RocketMQ采用長輪詢機(jī)制保證了實(shí)時(shí)性同時(shí)又降低了服務(wù)端的開銷,總的來說,它的整體思路為:
- 消費(fèi)者發(fā)起一個(gè)消費(fèi)請求,內(nèi)容傳入topic、queueId和客戶端socket、pullFromThisOffset等數(shù)據(jù)。
- 服務(wù)端收到請求后查看該隊(duì)列是否有數(shù)據(jù),若沒有則掛起。
- 在一個(gè)最大超時(shí)時(shí)間內(nèi)定時(shí)輪詢,如果有則將結(jié)果返回給客戶端。
- 反之處理超時(shí),也直接告知客戶端超時(shí)了也沒有消息。
對(duì)應(yīng)的我們再次給出消費(fèi)者拉取消費(fèi)的源碼PullMessageService的run方法,可以看到其內(nèi)部不斷從阻塞隊(duì)列中拉取請求并發(fā)起消息拉?。?/p>
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//獲取一個(gè)拉消息的請求
PullRequest pullRequest = this.pullRequestQueue.take();
//拉消息
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
服務(wù)端的PullMessageProcessor的processRequest就是處理請求的入口,可以看到該方法如果發(fā)現(xiàn)broker沒有看到新的消息就會(huì)調(diào)用suspendPullRequest將客戶端連接hold?。?/p>
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
//.......
switch (response.getCode()) {
case ResponseCode.SUCCESS:
//.......
} else {
//.......
break;
case ResponseCode.PULL_NOT_FOUND://沒拉取到消息
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
//定位請求的topic以及offset和隊(duì)列id
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
//基于上述數(shù)據(jù)生成pullRequest并調(diào)用suspendPullRequest將其hold住
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
//.......
break;
default:
assert false;
}
} else {
//.......
}
//.......
return response;
}
然后PullRequestHoldService就會(huì)基于上述一部掛起的數(shù)據(jù)定時(shí)檢查是否有新消息到來,直到超期:
@Override
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
//等待
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
//基于超時(shí)時(shí)限內(nèi)定時(shí)查看topic中的隊(duì)列是否有新消息,如果有或者超時(shí)則返回
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
//......
}
}
//......
}
這里我們也給出checkHoldRequest的調(diào)用可以看到,如果查到隊(duì)列offset大于用戶傳的說明就有新消息則返回,超時(shí)則直接返回:
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
//......
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
//......
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
//拉取到新消息就返回
if (newestOffset > request.getPullFromThisOffset()) {
//......
if (match) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
//超時(shí)也返回
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
replayList.add(request);
}
//......
}
}
}