成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

RocketMQ 常見問題與深度解析

開發(fā)
本文我們將從RocketMQ的基本概念、核心組件、消息存儲(chǔ)與消費(fèi)、高可用性設(shè)計(jì)以及性能優(yōu)化等方面展開詳細(xì)解析,助你輕松應(yīng)對(duì)面試中的各種挑戰(zhàn)!

在當(dāng)今的分布式系統(tǒng)架構(gòu)中,消息隊(duì)列作為解耦、異步處理和流量削峰的核心組件,扮演著至關(guān)重要的角色。而RocketMQ作為阿里巴巴開源的一款高性能、高可用的分布式消息中間件,憑借其強(qiáng)大的功能和廣泛的應(yīng)用場景,成為眾多企業(yè)和開發(fā)者關(guān)注的焦點(diǎn)。

無論是面試中還是實(shí)際工作中,掌握RocketMQ的核心原理和常見問題都顯得尤為重要。本文將從基礎(chǔ)到高級(jí),全面總結(jié)RocketMQ的面試高頻問題,并結(jié)合實(shí)際應(yīng)用場景和源碼解析,幫助讀者深入理解其設(shè)計(jì)思想和實(shí)現(xiàn)機(jī)制。無論你是準(zhǔn)備面試的技術(shù)人,還是希望提升對(duì)RocketMQ理解的開發(fā)者,相信這篇文章都能為你提供有價(jià)值的參考。

接下來,我們將從RocketMQ的基本概念、核心組件、消息存儲(chǔ)與消費(fèi)、高可用性設(shè)計(jì)以及性能優(yōu)化等方面展開詳細(xì)解析,助你輕松應(yīng)對(duì)面試中的各種挑戰(zhàn)!

RocketMQ的架構(gòu)是怎么樣的

整體來說,RocketMQ是由如下幾個(gè)部分組成:

producer:投遞消息的生產(chǎn)者,主要負(fù)責(zé)將消息投遞到broker上。

  • broker:可以理解為消息中轉(zhuǎn)服務(wù)器,主要負(fù)責(zé)消息存儲(chǔ),以及消息路由的轉(zhuǎn)發(fā),RocketMQ支持多個(gè)broker構(gòu)成集群,每個(gè)broker都有獨(dú)立的存儲(chǔ)空間和隊(duì)列。
  • consumer:訂閱topic并從broker中獲得消息并消費(fèi)。
  • nameserver:提供服務(wù)發(fā)現(xiàn)和路由功能,負(fù)責(zé)維護(hù)broker的數(shù)據(jù)信息,包括broker地址、topic和queue等,對(duì)應(yīng)的producer和consumer在啟動(dòng)時(shí)都需要通過nameserver獲取broker的地址信息。
  • topic:消費(fèi)主題,對(duì)于消息的邏輯分類的單位,producer消息都會(huì)發(fā)送到特定的topic上,對(duì)應(yīng)的consumer就會(huì)從這些topic拿到消費(fèi)的消息。

RocketMQ的事務(wù)消息是如何實(shí)現(xiàn)的

大體是通過half消息完成,大體工作流程為:

  • 生產(chǎn)者即應(yīng)用程序像mq的broker發(fā)送一條half消息,mq收到該消息后在事務(wù)消息日志中將其標(biāo)記為prepared狀態(tài),然后回復(fù)ack確認(rèn)。
  • 生產(chǎn)者執(zhí)行本地事務(wù),將事務(wù)執(zhí)行結(jié)果發(fā)送提交指令告知mq可以提交事務(wù)消息給消費(fèi)者。
  • mq若收到提交通知后,將消息從prepared改為commited,然后將消息提交給消費(fèi)者,當(dāng)然如果mq長時(shí)間沒有收到提交通知?jiǎng)t發(fā)送回查給生產(chǎn)者詢問該事務(wù)的執(zhí)行情況。
  • 基于事務(wù)結(jié)果若成功則將事務(wù)消息提交給消費(fèi)者,反之回滾該消息即將消息設(shè)置為rollback并將該消息從消息日志中刪除,從而保證消息不被消費(fèi)。

RocketMQ如何保證消息的順序性

針對(duì)保證消費(fèi)順序性的問題,我們可以基于下面這樣的一個(gè)場景來分析,假設(shè)我們有一個(gè)下單請求,生產(chǎn)者要求按需投遞下面這些消息讓消費(fèi)者消費(fèi):

  • 創(chuàng)建訂單
  • 用戶付款
  • 庫存扣減

同理消費(fèi)者也得嚴(yán)格按照這個(gè)順序完成消費(fèi),此時(shí)如果按照簡單維度的架構(gòu)來說,我們可以全局設(shè)置一個(gè)topic讓生產(chǎn)者準(zhǔn)確有序的投遞每一個(gè)消息,然后消費(fèi)者準(zhǔn)確依次消費(fèi)消息即可,但是這樣做對(duì)于并發(fā)的場景下性能表現(xiàn)就會(huì)非常差勁:

為了適當(dāng)提升兩端性能比對(duì)消息堆積,我們選擇增加隊(duì)列用多個(gè)隊(duì)列處理這個(gè)原子業(yè)務(wù):

有了這樣的架構(gòu)基礎(chǔ),我們就需要考慮生產(chǎn)者和消費(fèi)者的有序生產(chǎn)和有序消費(fèi)的落地思路了,先來說說生產(chǎn)者有序投遞,這樣做比較簡單,我們可以直接通過訂單號(hào)進(jìn)行hash并結(jié)合topic隊(duì)列數(shù)進(jìn)行取模的方式將一個(gè)訂單的創(chuàng)建、余額扣減、庫存扣減的消息有序投遞到某個(gè)隊(duì)列中,這樣就能保證單個(gè)訂單的業(yè)務(wù)消息有序性:

對(duì)應(yīng)的我們也給出生產(chǎn)者的代碼使用示例:

//基于訂單號(hào)orderNo進(jìn)行哈希取模發(fā)送訂單消息
Message<Order> message = MessageBuilder.withPayload(order).build();
rocketMQTemplate.syncSendOrderly("ORDER_ADD", message, order.getOrderNo());

這塊哈希取模的實(shí)現(xiàn)可以從底層源碼DefaultMQProducerImpl的sendSelectImpl看到,它會(huì)將arg(也就是我們的orderNo)通過selector的select進(jìn)行運(yùn)算獲得單topic下的某個(gè)隊(duì)列:

private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
      //......
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            try {
             //傳入arg也就是我們的orderNo基于selector算法進(jìn)行哈希取模
                mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
            } catch (Throwable e) {
               //......
            }

          //......
    }

這個(gè)調(diào)用會(huì)來到SelectMessageQueueByHash的select,從源碼可以看出這塊代碼看出,它的算法就是通過參數(shù)哈希運(yùn)算后結(jié)合隊(duì)列數(shù)(默認(rèn)為4)進(jìn)行取模:

public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    //數(shù)值哈希運(yùn)算
        int value = arg.hashCode();
      //......
      //結(jié)合隊(duì)列數(shù)取模得到隊(duì)列返回
        value = value % mqs.size();
        return mqs.get(value);
    }
}

消費(fèi)者端就比較簡單了,consumeMode指定為有序消費(fèi)即可:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
        topic = "ORDER_ADD",
        consumeMode = ConsumeMode.ORDERLY//同一個(gè)topic下同一個(gè)隊(duì)列只有一個(gè)消費(fèi)者線程消費(fèi)
        
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("收到訂單,訂單信息:[{}],進(jìn)行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSONUtil.toJsonStr(order));
    }
}

這里我們也基于源碼解析一下這個(gè)有序消費(fèi)的實(shí)現(xiàn),本質(zhì)上消費(fèi)者啟動(dòng)的時(shí)候會(huì)開啟一個(gè)定時(shí)任務(wù)嘗試獲取分布式上鎖隊(duì)列信息的執(zhí)行如下步驟:

  • 獲取知道的broker及其隊(duì)列。
  • 獲取對(duì)應(yīng)broker的master地址。
  • 發(fā)送請求到服務(wù)端詢問master獲取所有隊(duì)列的分布式鎖。
  • 基于請求結(jié)果獲取查看那些隊(duì)列上鎖成功。
  • 更新本地結(jié)果。

完成后,消費(fèi)者就拉取到全局可唯一消費(fèi)的隊(duì)列信息,因?yàn)槊總€(gè)消費(fèi)者都是基于多線程執(zhí)行,所以為了保證本地多線程消費(fèi)有序性,每一個(gè)線程進(jìn)行消費(fèi)時(shí)都會(huì)以消息隊(duì)列messageQueue作為key用synchronized上鎖后才能消費(fèi)。

代碼如下所示,可以看到上鎖成功后就會(huì)執(zhí)行messageListener.consumeMessage方法,該方法就會(huì)走到我們上文中聲明的監(jiān)聽上了:

public void run() {
            //......
            //消費(fèi)請求線程消費(fèi)前會(huì)獲取消息隊(duì)列鎖
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            //上鎖
            synchronized (objLock) {
                 //......
                                //將消息發(fā)送給實(shí)現(xiàn)有序監(jiān)聽的監(jiān)聽器線程
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                                 //......
                            } finally {
                                 //......
                            }
  //......
        }

RocketMQ有幾種集群方式

  • 多master模式:該模式僅由多master構(gòu)成,配置比較簡單,單個(gè)master宕機(jī)或重啟對(duì)于應(yīng)用全局沒有任何影響,尤其在磁盤為RAID10的情況下,即使服務(wù)器不可恢復(fù),只要我們使用同步刷盤策略,基本上消息都不會(huì)丟失,而缺點(diǎn)也非常明顯,單臺(tái)機(jī)器宕機(jī)期間,這臺(tái)機(jī)器未被消費(fèi)的消息在恢復(fù)之前不可訂閱,消息的實(shí)時(shí)性會(huì)受影響。
  • 多master-多slave異步復(fù)制:多個(gè)master和多個(gè)slave構(gòu)成,即使master宕機(jī)后slave依然可以對(duì)外提供服務(wù),所以消息實(shí)時(shí)性不會(huì)受影響,缺點(diǎn)是主從復(fù)制是異步的,如果master宕機(jī)時(shí)同步的消息可能丟失部分,且沒有解決slave自動(dòng)切換為master。
  • 多master-多slave同步復(fù)制:上者的優(yōu)化版即同步策略采用同步的方式,保證在RAID10的情況下消息基本不丟失,但因采用的是同步復(fù)制,所以發(fā)送單個(gè)消息的RT可能略高,且同樣沒有解決slave自動(dòng)切換為master。
  • Dledger模式:該集群模式要求至少由3個(gè)broker構(gòu)成,即一個(gè)master必須對(duì)應(yīng)兩個(gè)slave,一旦某個(gè)master宕機(jī)后通過raft一致性算法選舉新的master對(duì)外提供服務(wù)。具體實(shí)踐可以參考:https://rocketmq.apache.org/zh/docs/bestPractice/02dledger/

RocketMQ消息堆積了怎么解決

消息堆積的原因比較多,大體是客戶端隊(duì)列并發(fā)度不夠或者客戶端消費(fèi)能力不足,所以我們可以針對(duì)以下幾個(gè)角度進(jìn)行針對(duì)性的優(yōu)化:

  • 增加消費(fèi)者實(shí)例:如果是消費(fèi)速度過慢導(dǎo)致的消息堆積,則建議增加消費(fèi)者數(shù)量,讓更多的實(shí)例來消費(fèi)這些消息。
  • 提升消費(fèi)者消費(fèi)速度:如果是消息消費(fèi)處理耗時(shí)長,則針對(duì)性的業(yè)務(wù)流程調(diào)優(yōu),例如引入線程池、本地消息存儲(chǔ)后立即返回成功等方式提前消息進(jìn)行批量的預(yù)消化。
  • 降低生產(chǎn)者速度:如果生產(chǎn)端可控且消費(fèi)者已經(jīng)沒有調(diào)優(yōu)的空間時(shí),我們建議降低生產(chǎn)者生產(chǎn)速度。
  • 清理過期消息:對(duì)于一些過期且一直無法處理成功的消息,在進(jìn)行業(yè)務(wù)上的評(píng)估后,我們可以針對(duì)性的進(jìn)行清理。
  • 增加topic隊(duì)列數(shù):如果是因?yàn)殛?duì)列少導(dǎo)致并發(fā)度不夠可以考慮增加一下消費(fèi)者隊(duì)列,來提升消息隊(duì)列的并發(fā)度。
  • 參數(shù)調(diào)優(yōu):我們可以針對(duì)各個(gè)節(jié)點(diǎn)耗時(shí)針對(duì):消費(fèi)模式、消息拉取間隔等參數(shù)進(jìn)行優(yōu)化。

RocketMQ的工作流程詳解

上文已經(jīng)介紹了幾個(gè)基本的概念,我們這里直接將其串聯(lián)起來:

  • 啟動(dòng)nameServer,等待broker、producer和consumer的接入。
  • 啟動(dòng)broker和nameserver建立連接,并定時(shí)發(fā)送心跳包,心跳包中包含broker信息(ip、端口號(hào)等)以及topic以及broker與topic的映射關(guān)系。
  • 啟動(dòng)producer,producer啟動(dòng)時(shí)會(huì)隨機(jī)通過nameserver集群中的一臺(tái)建立長連接,并從中獲取發(fā)送的topic和所有broker地址信息,基于這些信息拿到topic對(duì)應(yīng)的隊(duì)列,與隊(duì)列所在的broker建立長連接,自此開始消息發(fā)送。
  • broker接收producer發(fā)送的消息時(shí),會(huì)根據(jù)配置同步和刷盤策略進(jìn)行狀態(tài)回復(fù):
1. 若為同步復(fù)制則master需要復(fù)制到slave節(jié)點(diǎn)后才能返回寫狀態(tài)成功
2. 若配置同步刷盤,還需要基于上述步驟再將數(shù)據(jù)寫入磁盤才能返回寫成功
3. 若是異步刷盤和異步復(fù)制,則消息一到master就直接回復(fù)成功
  • 啟動(dòng)consumer,和nameserver建立連接然后訂閱信息,然后對(duì)感興趣的broker建立連接,獲取消息并消費(fèi)。

RocketMQ的消息是采用推模式還是采用拉模式

消費(fèi)模式分為3種:

  • push:服務(wù)端主動(dòng)推送消息給客戶端。
  • pull:客戶端主動(dòng)到服務(wù)端輪詢獲取數(shù)據(jù)。
  • pop:5.0之后的新模式,后文會(huì)介紹。

總的來說push模式是性能比較好,但是客戶端沒有做好留空,可能會(huì)出現(xiàn)大量消息把客戶端打死的情況。 而poll模式同理,頻繁拉取服務(wù)端可能會(huì)造成服務(wù)器壓力,若設(shè)置不好輪詢間隔,可能也會(huì)出現(xiàn)消費(fèi)不及時(shí)的情況,

整體來說RocketMQ本質(zhì)上還是采用pull模式,具體后文會(huì)有介紹。

用了RocketMQ一定能做到削峰嗎

削峰本質(zhì)就是將高并發(fā)場景下短時(shí)間涌入的消息平攤通過消息隊(duì)列構(gòu)成緩沖區(qū)然后平攤到各個(gè)時(shí)間點(diǎn)進(jìn)行消費(fèi),從而實(shí)現(xiàn)平滑處理。

這也不意味著用mq就一定可以解決問題,假如用push模式,這就意味著你的消息都是mq立即收到立即推送的,本質(zhì)上只是加了一個(gè)無腦轉(zhuǎn)發(fā)的中間層,并沒有實(shí)際解決問題。 所以要想做到削峰,就必須用拉模式,通過主動(dòng)拉去保證消費(fèi)的速度,讓消息堆積在mq隊(duì)列中作為緩沖。

常見消息隊(duì)列的消息模型有哪些?RocketMQ用的是那種消息模型

消息隊(duì)列的消息模型有兩種,一種是隊(duì)列模型,生產(chǎn)者負(fù)責(zé)把消息扔到消息隊(duì)列中,消費(fèi)者去消息隊(duì)列中搶消息,消息只有一個(gè),先到者先得:

還有一種就是發(fā)布/訂閱模型了,發(fā)布訂閱模型的消息只要消費(fèi)者有訂閱就能消費(fèi)消息:

RocketMQ是支持發(fā)布訂閱模式的,如下所示,筆者又新建一個(gè)監(jiān)聽者,用的是不同的消費(fèi)者組consumerGroup,運(yùn)行時(shí)即可看到兩組訂閱的消費(fèi)者消費(fèi)一份消息:

@Component
@RocketMQMessageListener(consumerGroup = "gourp2", topic = "ORDER_ADD")
public class OrderMqListener2 implements RocketMQListener<Order> {

    private static Logger logger = LoggerFactory.getLogger(OrderMqListener2.class);


    @Override
    public void onMessage(Order order) {

        logger.info("訂閱者2收到消息,訂單信息:[{}],進(jìn)行新春福利活動(dòng).....", JSON.toJSONString(order));

    }
}

RocketMQ消息的消費(fèi)模式有哪些

有兩種消費(fèi)模式:

集群消費(fèi):這種是RocketMQ默認(rèn)模式,一個(gè)主題下的多個(gè)隊(duì)列都會(huì)被消費(fèi)者組中的某個(gè)消費(fèi)者消費(fèi)掉。

廣播消費(fèi):廣播消費(fèi)模式會(huì)讓每個(gè)消費(fèi)者組中的每個(gè)消費(fèi)者都能使用這個(gè)消息。

如何保證消息可用性和可靠性呢?

這個(gè)問題我們要從3個(gè)角度考慮:

對(duì)于生產(chǎn)階段,生產(chǎn)者發(fā)送消息要想確??煽勘仨氉裱韵?點(diǎn):

  • 沒有發(fā)送成功就需要進(jìn)行重試:
SendResult result = producer.send(message);
                if (!"SEND_OK".equals(result.getSendStatus().name())){
                    logger.warn("消息發(fā)送失敗,執(zhí)行重試的邏輯");
                }
  • 如果發(fā)送超時(shí),我們可以從日志相關(guān)API中查看是否存到Broker中。
  • 如果是異步消息,則需要到回調(diào)接口中做相應(yīng)處理。

針對(duì)存儲(chǔ)階段,存儲(chǔ)階段要保證可靠性就需要從以下幾個(gè)角度保證:

  • 開啟主從復(fù)制模式,使得Master掛了還有Slave可以用。
  • 為了確保發(fā)送期間服務(wù)器宕機(jī)的情況,我們建議刷盤機(jī)制改為同步刷盤,確保消息發(fā)送并寫到CommitLog中再返回成功。

這里補(bǔ)充一下同步刷盤和異步刷盤的區(qū)別:

  • 同步刷盤,生產(chǎn)者投遞的消息持久化時(shí)必須真正寫到磁盤才會(huì)返回成功,可靠性高,但是因?yàn)镮O問題會(huì)使得組件處理效率下降。
  • 異步刷盤,如下圖所示,可以僅僅是存到page cache即可返回成功,至于何時(shí)持久化操磁盤由操作系統(tǒng)后臺(tái)異步的頁緩存置換算法決定。

對(duì)應(yīng)的刷盤策略,我們只需修改broker.conf的配置文件即可:

#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH

對(duì)于消費(fèi)階段,消費(fèi)者編碼邏輯一定要確保消費(fèi)成功了再返回消費(fèi)成功:

consumer.registerMessageListener((List<MessageExt> msgs,
                                          ConsumeConcurrentlyContext context) -> {
            String msg = new String(msgs.stream().findFirst().get().getBody());
            logger.info("消費(fèi)收到消息,消息內(nèi)容={}", msg);
            
            //消費(fèi)完全成功再返回成功狀態(tài)
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

如果避免消息重復(fù)消費(fèi)問題(重點(diǎn))

這個(gè)我們可以分不同的情況討論,有些場景下,我們只需保證業(yè)務(wù)冪等即可,例如:我們需要給訂單服務(wù)發(fā)送一個(gè)用戶下單成功的消息,無論發(fā)送多少次訂單服務(wù)只是將訂單表狀態(tài)設(shè)置成已完成。

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
public class OrderMqListener implements RocketMQListener<Order> {

    private static Logger logger = LoggerFactory.getLogger(OrderMqListener.class);


    @Override
    public void onMessage(Order order) {
        logger.info("消費(fèi)者收到訂單,訂單信息:[{}],進(jìn)行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSON.toJSONString(order));
        updateOrderFinish(order);
    }

    private void updateOrderFinish(Order order){
        logger.info("執(zhí)行dao層邏輯,將訂單設(shè)置下單完成,無論多少次,執(zhí)行到這個(gè)消費(fèi)邏輯都是將訂單設(shè)置為處理完成");
    }

}

還有一種方式就是業(yè)務(wù)去重,例如我們現(xiàn)在要?jiǎng)?chuàng)建訂單,每次訂單創(chuàng)建完都會(huì)往一張記錄消費(fèi)信息表中插入數(shù)據(jù)。一旦我們收到重復(fù)的消息,只需帶著唯一標(biāo)識(shí)去數(shù)據(jù)庫中查,如果有則直接返回成功即可:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
public class OrderMqListener implements RocketMQListener<Order> {

 //......
    @Override
    public void onMessage(Order order) {
        logger.info("消費(fèi)者收到訂單,訂單信息:[{}],進(jìn)行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSON.toJSONString(order));
        //消費(fèi)者消費(fèi)時(shí)判斷訂單是否存在,如果存在則直接返回
        if (isExist(order)){
            return;
        }
        updateOrderFinish(order);
    }

   

}

延時(shí)消息底層是怎么實(shí)現(xiàn)

我們都知道投遞消息到消息隊(duì)列的時(shí)候,消息都會(huì)寫入到commitLog上,在此之前MQ會(huì)檢查當(dāng)前消息延遲等級(jí)是否大于0,如果是則說明該消息是延遲消息,則會(huì)將其topic設(shè)置為RMQ_SYS_SCHEDULE_TOPIC并基于延遲等級(jí)獲取對(duì)應(yīng)的隊(duì)列,最后基于零拷貝的方式寫入磁盤,注意此時(shí)消息還不可被消費(fèi):

對(duì)此我們也給出這段投遞消息的源碼,即位于CommitLog的asyncPutMessage異步投遞消息的方法:

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
       //......

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
           
            //如果大于0則說明是延遲消息
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                //設(shè)置topic設(shè)置為SCHEDULE_TOPIC_XXXX
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                //基于等級(jí)獲取延遲消息隊(duì)列
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

               //......
                //基于上述設(shè)置topic和隊(duì)列信息
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

         //......
            //基于零拷貝的方式添加
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            //......
    }

MQ啟動(dòng)的時(shí)候底層的消息調(diào)度服務(wù)會(huì)基于延遲消息的等級(jí)初始化幾個(gè)任務(wù),這些任務(wù)會(huì)基于定時(shí)的間隔檢查是否有到期的消息到來,如果到期則將其投遞到真正topic的隊(duì)列中供消費(fèi)者消費(fèi):

基于此邏輯我們也給出ScheduleMessageService的start方法查看調(diào)度器的初始化邏輯,可以看到初始化階段,它會(huì)遍歷所有延遲級(jí)別并為其初始化一個(gè)定時(shí)任務(wù):

public void start() {
        if (started.compareAndSet(false, true)) {
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            //遍歷所有延遲級(jí)別
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }
                //設(shè)置timer定時(shí)器
                if (timeDelay != null) {
                    //投遞給定時(shí)器對(duì)應(yīng)等級(jí)的定時(shí)任務(wù)
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
          //......
    }

查看DeliverDelayedMessageTimerTask的核心邏輯即run方法,也就是我們所說的定時(shí)檢查是否有到期消息,若存在則將其存入原本的topic上,消費(fèi)者就可以消費(fèi)了:

@Override
        public void run() {
            try {
                if (isStarted()) {
                    this.executeOnTimeup();
                }
            } catch (Exception e) {
              //......
            }
        }


public void executeOnTimeup() {

            //基于topic和隊(duì)列id獲取延遲隊(duì)列
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) {
                //根據(jù)偏移量獲取有效消息
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                      //......
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                              //......

                            long now = System.currentTimeMillis();
                            //計(jì)算可消費(fèi)時(shí)間
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                            long countdown = deliverTimestamp - now;
                            //如果小于0說明可消費(fèi)
                            if (countdown <= 0) {
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                                if (msgExt != null) {
                                    try {
                                        //清除延遲級(jí)別恢復(fù)到真正的topic和隊(duì)列id
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                      //......
                                        //放到消息隊(duì)列上
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);

                                         //......
                                        } else {
                                           //......
                                    } catch (Exception e) {
                                       //......
                               
                            } else {
                                //......
                            }
                        } // end of for

                         //......
        }

什么是死信隊(duì)列

通俗來說一個(gè)消息消費(fèi)失敗并重試達(dá)到最大次數(shù)后,MQ就會(huì)將其放到死信隊(duì)列中。超過三天該消息就會(huì)被銷毀。 需要補(bǔ)充的時(shí)死信隊(duì)列是針對(duì)一個(gè)group id為單位創(chuàng)建的隊(duì)列,如果一個(gè)gourp中都沒有死信的話,那么MQ就不會(huì)為這個(gè)組創(chuàng)建死信隊(duì)列。

Broker是進(jìn)行消息持久化的

要想了解Broker如何保存數(shù)據(jù),我們必須了解RocketMQ三大文件:

首先是commitlog,producer發(fā)送的消息最終都會(huì)通過刷盤機(jī)制存到commitlog文件夾下。commitlog下一個(gè)文件名為00000000000000000000一旦寫滿,就會(huì)再創(chuàng)建一個(gè)文件寫,一般來說第二個(gè)文件名為00000000001073741824,名稱即是第一個(gè)文件的字節(jié)數(shù)。文件大小一般是1G:

然后是consumequeue文件夾,這個(gè)文件夾下記錄的都是commitlog中每個(gè)topic下的隊(duì)列信息物理偏移量、消息大小、hashCode值,如下圖,consumequeue文件夾下會(huì)為每個(gè)topic創(chuàng)建一個(gè)文件夾:

打開任意一個(gè)文件夾就會(huì)看到這樣一個(gè)名為00000000000000000000的文件:

而這個(gè)文件內(nèi)部最多維護(hù)30w個(gè)條目,注意文件中每個(gè)條目大約20字節(jié),8字節(jié)代表當(dāng)前消息在commitLog中的偏移量,4字節(jié)存放消息大小,8字節(jié)存放tag和hashCode的值。

最后就算index,維護(hù)消息的索引,基于HashMap結(jié)構(gòu),這個(gè)文件使得我們可以通過key或者時(shí)間區(qū)間查詢消息:

文件名基本用時(shí)間戳生成的,大小一般為400M,差不多維護(hù)2000w個(gè)索引:

簡單小結(jié)一下RocketMQ持久化的物理文件:MQ會(huì)為每個(gè)broker維護(hù)一個(gè)commitlog,一旦文件存放到commitlog,消息就不會(huì)丟失。當(dāng)無法拉取消息時(shí),broker允許producer在30s內(nèi)發(fā)送一個(gè)消息,然后直接給消費(fèi)者消費(fèi)。

后兩個(gè)索引文件的維護(hù)是基于一個(gè)線程ReputMessageService進(jìn)行異步維護(hù)consumeQueue(邏輯消費(fèi)隊(duì)列)和IndexFile(索引文件)數(shù)據(jù):

RocketMQ如何進(jìn)行文件讀寫的呢?

對(duì)于讀寫IO處理有以下兩種:

  • pageCache:在RocketMQ中,ConsumeQueue存儲(chǔ)數(shù)據(jù)較少,并且是順序讀取,在pageCache預(yù)讀的機(jī)制下讀取速率是非??陀^的(即使有大量的消息堆積)。操作系統(tǒng)會(huì)將一部分內(nèi)存用作pageCache,當(dāng)數(shù)據(jù)寫入磁盤會(huì)先經(jīng)過pageCache然后通過內(nèi)核線程pdflush寫入物理磁盤。 針對(duì)ConsumeQueue下關(guān)于消息索引的數(shù)據(jù)查詢時(shí),會(huì)先去pageCache查詢是否有數(shù)據(jù),若有則直接返回。若沒有則去ConsumeQueue文件中讀取需要的數(shù)據(jù)以及這個(gè)數(shù)據(jù)附近的數(shù)據(jù)一起加載到pageCache中,這樣后續(xù)的讀取就是走緩存,效率自然上去了,這種磁盤預(yù)讀目標(biāo)數(shù)據(jù)的附近數(shù)據(jù)就是我們常說的局部性原理。而commitLog隨機(jī)性比較強(qiáng)特定情況下讀寫性能會(huì)相對(duì)差一些,所以在操作系統(tǒng)層面IO讀寫調(diào)度算法可以改為deadline并選用SSD盤以保證操作系統(tǒng)在指定時(shí)間完成數(shù)據(jù)讀寫保證性能。
  • 零拷貝技術(shù):這是MQ基于NIO的FileChannel模型的一種直接將物理文件映射到用戶態(tài)內(nèi)存地址的一種技術(shù),通過MappedByteBuffer,它的工作機(jī)制是直接建立內(nèi)存映射,文件數(shù)據(jù)并沒有經(jīng)過JVM和操作系統(tǒng)直接復(fù)制的過程,相當(dāng)于直接操作內(nèi)存,所以效率就非常高??梢詤⒖? 能不能給我講講零拷貝:https://mp.weixin.qq.com/s/zS2n2a4h3YQifBYKFgcUCA

消息刷盤如何實(shí)現(xiàn)呢?

兩種方式分別是同步刷盤和異步刷盤

  • 同步刷盤: producer發(fā)送的消息經(jīng)過broker后必須寫入到物理磁盤commitLog后才會(huì)返回成功。
  • 異步刷盤:producer發(fā)送的消息到達(dá)broker之后,直接返回成功,刷盤的邏輯交給一個(gè)異步線程實(shí)現(xiàn)。

而上面說的刷盤都是通過MappedByteBuffer.force() 這個(gè)方法完成的,需要補(bǔ)充異步刷盤是通過一個(gè)異步線程FlushCommitLogService實(shí)現(xiàn)的,其底層通過MappedFileQueue針對(duì)內(nèi)存中的隊(duì)列消息調(diào)用flush進(jìn)行刷盤從而完成消息寫入:

public boolean flush(final int flushLeastPages) {
        boolean result = true;
        //拉取文件處理偏移量信息
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            //基于mmap零拷貝技術(shù)進(jìn)行刷盤
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            //如果刷盤后的進(jìn)度和預(yù)期一樣說明刷盤成功
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                //維護(hù)處理時(shí)間
                this.storeTimestamp = tmpTimeStamp;
            }
        }

        return result;
    }

RocketMQ負(fù)載均衡

MQ中負(fù)載均衡的主要是體現(xiàn)在生產(chǎn)者端和消費(fèi)者端,Producer負(fù)載均衡算法在上述中有序消費(fèi)中的源碼已經(jīng)說明,這里就不多做贅述,本質(zhì)上就是通過底層的selector進(jìn)行輪詢投遞:

Message<Order> message = MessageBuilder.withPayload(order).build();
        rocketMQTemplate.syncSendOrderly("ORDER_ADD", message, order.getOrderNo());

再來consumer負(fù)載均衡算法,mq客戶端啟動(dòng)時(shí)會(huì)開啟一個(gè)負(fù)載均衡服務(wù)執(zhí)行負(fù)載均衡隊(duì)列輪詢邏輯,通過負(fù)載均衡算法得出每個(gè)消費(fèi)者應(yīng)該處理的隊(duì)列信息后生產(chǎn)拉取消息的請求,交由有MQ客戶端去拉取消息:

默認(rèn)情況下,負(fù)載均衡算法選定隊(duì)列后拉取消息進(jìn)行消費(fèi),默認(rèn)情況下它會(huì)根據(jù)隊(duì)列數(shù)和消費(fèi)者數(shù)決定如何進(jìn)行負(fù)載分擔(dān),按照平均算法:

  • 如果消費(fèi)者數(shù)大于隊(duì)列數(shù),則將隊(duì)列分配給有限的幾個(gè)消費(fèi)者。
  • 如果消費(fèi)者數(shù)小于隊(duì)列數(shù),默認(rèn)情況下會(huì)按照隊(duì)列數(shù)/消費(fèi)者數(shù)取下限+1進(jìn)行分配,例如隊(duì)列為4,消費(fèi)者為3,那么每個(gè)消費(fèi)者就會(huì)拿到2個(gè)隊(duì)列,其中第三個(gè)消費(fèi)者則沒有處理任何數(shù)據(jù)。

對(duì)應(yīng)的我們給出MQ客戶端初始化的代碼RebalanceService的run方法,可以看到它會(huì)調(diào)用mqClientFactory執(zhí)行負(fù)載均衡方法doRebalance:

@Override
    public void run() {
     //.......

        while (!this.isStopped()) {
           //.......
            //客戶端執(zhí)行負(fù)載均衡
            this.mqClientFactory.doRebalance();
        }

       //.......
    }

步入其內(nèi)部邏輯會(huì)走到RebalanceImpl的doRebalance,它遍歷每個(gè)topic進(jìn)行負(fù)載均衡運(yùn)算:

public void doRebalance(final boolean isOrder) {
       //......
        if (subTable != null) {
            //遍歷每個(gè)topic
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    //計(jì)算該topic中當(dāng)前消費(fèi)者要處理的隊(duì)列
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                   //......
                }
            }
        }

       //......
    }

最終我們來到了核心邏輯rebalanceByTopic方法,可以看到它會(huì)基于我們查到的topic的隊(duì)列和消費(fèi)者通過策略模式找到對(duì)應(yīng)的消息分配策略AllocateMessageQueueStrategy從而算得當(dāng)前消費(fèi)者需要處理的隊(duì)列,然后在基于這份結(jié)果調(diào)用updateProcessQueueTableInRebalance生成pullRequest告知客戶端為該消費(fèi)者拉取消息:

private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
               //......
            }
            case CLUSTERING: {
                //根據(jù)主題獲取消息隊(duì)列
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                //根據(jù) topic 與 consumerGroup 獲取所有的 consumerId
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                //......

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    //// 排序后才能保證消費(fèi)者負(fù)載策略相對(duì)穩(wěn)定
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        //按負(fù)載策略進(jìn)行分配,返回當(dāng)前消費(fèi)者實(shí)際訂閱的messageQueue集合
                        allocateResult = strategy.allocate(
                                this.consumerGroup,
                                this.mQClientFactory.getClientId(),
                                mqAll,
                                cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                                e);
                        return;
                    }

                  //......

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    //......
                break;
            }
            default:
                break;
        }
    }

對(duì)應(yīng)的我們也給出負(fù)載均衡算法AllocateMessageQueueAveragely的源碼,大體算法和筆者上述說明的基本一致,讀者可以參考上圖講解了解一下:

public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
       //......
        //獲取消費(fèi)者id對(duì)應(yīng)的索引
        int index = cidAll.indexOf(currentCID);
        //基于隊(duì)列總數(shù)和客戶端總數(shù)進(jìn)行取模
        int mod = mqAll.size() % cidAll.size();
        /**
         *計(jì)算每個(gè)消費(fèi)者的可消費(fèi)的平均值:
         * 1. 如果消費(fèi)者多于隊(duì)列就取1
         * 2. 如果消費(fèi)者少于隊(duì)列就按照取模結(jié)果來計(jì)算
         */
        
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        //基于當(dāng)前客戶端的索引定位其處理的隊(duì)列位置
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        //獲取消費(fèi)者的隊(duì)列消費(fèi)范圍
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        //遍歷隊(duì)列存入結(jié)果集
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }

完成后基于這份結(jié)果生成pullRequest存入pullRequestQueue中:

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
                                                       final boolean isOrder) {
       //......

        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                 //......
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                     //......
                    } else {
                    //生成pullRequest
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
  //存入pullRequestQueue中
        this.dispatchPullRequest(pullRequestList);

        return changed;
    }

最后消費(fèi)者的PullMessageService這個(gè)線程就會(huì)從隊(duì)列中取出該請求向MQ發(fā)起消息拉取請求:

@Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                //獲取一個(gè)拉消息的請求pullRequest 
                PullRequest pullRequest = this.pullRequestQueue.take();
                //拉消息
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

RocketMQ消息長輪詢

消費(fèi)者獲取消息大體有兩種方案:

  • 消息隊(duì)列主動(dòng)push:由消息隊(duì)列主動(dòng)去推送消息給消費(fèi)者,高并發(fā)場景下,對(duì)于服務(wù)端性能開銷略大。
  • 消費(fèi)者定期pull消息:由客戶端主動(dòng)去拉取消息,但是需要客戶端設(shè)置好拉取的間隔,太頻繁對(duì)于消息隊(duì)列開銷還是很大,間隔太長消息實(shí)時(shí)性又無法保證。

對(duì)此RocketMQ采用長輪詢機(jī)制保證了實(shí)時(shí)性同時(shí)又降低了服務(wù)端的開銷,總的來說,它的整體思路為:

  • 消費(fèi)者發(fā)起一個(gè)消費(fèi)請求,內(nèi)容傳入topic、queueId和客戶端socket、pullFromThisOffset等數(shù)據(jù)。
  • 服務(wù)端收到請求后查看該隊(duì)列是否有數(shù)據(jù),若沒有則掛起。
  • 在一個(gè)最大超時(shí)時(shí)間內(nèi)定時(shí)輪詢,如果有則將結(jié)果返回給客戶端。
  • 反之處理超時(shí),也直接告知客戶端超時(shí)了也沒有消息。

對(duì)應(yīng)的我們再次給出消費(fèi)者拉取消費(fèi)的源碼PullMessageService的run方法,可以看到其內(nèi)部不斷從阻塞隊(duì)列中拉取請求并發(fā)起消息拉?。?/p>

@Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                //獲取一個(gè)拉消息的請求
                PullRequest pullRequest = this.pullRequestQueue.take();
                //拉消息
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

服務(wù)端的PullMessageProcessor的processRequest就是處理請求的入口,可以看到該方法如果發(fā)現(xiàn)broker沒有看到新的消息就會(huì)調(diào)用suspendPullRequest將客戶端連接hold?。?/p>

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
        throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
          //.......

            switch (response.getCode()) {
                case ResponseCode.SUCCESS:
     //.......
                    } else {
                        //.......
                    break;
                case ResponseCode.PULL_NOT_FOUND://沒拉取到消息

                    if (brokerAllowSuspend && hasSuspendFlag) {
                        long pollingTimeMills = suspendTimeoutMillisLong;
                        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                        }
                        //定位請求的topic以及offset和隊(duì)列id
                        String topic = requestHeader.getTopic();
                        long offset = requestHeader.getQueueOffset();
                        int queueId = requestHeader.getQueueId();
                        //基于上述數(shù)據(jù)生成pullRequest并調(diào)用suspendPullRequest將其hold住
                        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                        response = null;
                        break;
                    }

                case ResponseCode.PULL_RETRY_IMMEDIATELY:
                    break;
                case ResponseCode.PULL_OFFSET_MOVED:
                    //.......
                    break;
                default:
                    assert false;
            }
        } else {
           //.......
        }

         //.......
        return response;
    }

然后PullRequestHoldService就會(huì)基于上述一部掛起的數(shù)據(jù)定時(shí)檢查是否有新消息到來,直到超期:

@Override
    public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                //等待
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    this.waitForRunning(5 * 1000);
                } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }

                long beginLockTimestamp = this.systemClock.now();
                //基于超時(shí)時(shí)限內(nèi)定時(shí)查看topic中的隊(duì)列是否有新消息,如果有或者超時(shí)則返回
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                }
            } catch (Throwable e) {
                //......
            }
        }

       //......
    }

這里我們也給出checkHoldRequest的調(diào)用可以看到,如果查到隊(duì)列offset大于用戶傳的說明就有新消息則返回,超時(shí)則直接返回:

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
        //......
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            List<PullRequest> requestList = mpr.cloneListAndClear();
            if (requestList != null) {
              //......

                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }
                    //拉取到新消息就返回
                    if (newestOffset > request.getPullFromThisOffset()) {
                        //......

                        if (match) {
                            try {
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());
                            } catch (Throwable e) {
                                log.error("execute request when wakeup failed.", e);
                            }
                            continue;
                        }
                    }
                    //超時(shí)也返回
                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }

                    replayList.add(request);
                }

                //......
            }
        }
    }


責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2011-03-29 13:23:54

CACTI

2009-10-15 16:55:05

綜合布線系統(tǒng)測試

2009-09-23 17:52:16

Hibernate概念Hibernate常見

2009-10-26 11:11:22

接入網(wǎng)常見問題

2010-05-13 10:22:45

綜合布線系統(tǒng)測試

2009-07-07 10:13:57

Servlet學(xué)習(xí)

2010-07-21 09:10:02

Perl常見問題

2013-11-14 15:47:29

SDN問題答疑

2011-04-01 13:55:24

Java

2011-05-06 15:39:55

硒鼓

2010-07-01 17:18:02

UML包圖

2010-08-06 09:30:03

思科IOS升級(jí)

2018-03-08 14:00:02

2010-05-19 11:35:13

SVN

2010-03-25 09:08:43

CentOS配置

2010-05-13 13:27:23

2009-11-02 17:25:04

ADSL常見問題

2011-02-22 14:00:16

vsftpd

2009-12-31 09:58:51

Ubuntu常見問題

2009-09-22 09:22:03

.NET常見問題
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 亚洲福利一区二区 | 精品欧美乱码久久久久久1区2区 | 2019精品手机国产品在线 | 亚洲国产片 | 久久国产综合 | 国产午夜精品一区二区三区在线观看 | 男女啪啪高潮无遮挡免费动态 | 毛片大全 | 日韩在线观看 | 一区欧美 | 久久九七| 中文字幕av中文字幕 | 在线观看免费毛片 | 久久男人 | 天天草草草 | 欧美久久久久久 | 国产精品一二三区在线观看 | www在线| 国产精品久久久久无码av | 看av电影| 成人高清在线视频 | 男人的天堂久久 | 亚洲精品18 | 日韩免费网站 | 亚洲精品二区 | 成人网在线 | 91精品国产色综合久久不卡蜜臀 | 精品久久久久久久久久久久 | 真人一级毛片 | 91精品国产91久久久久久最新 | h在线免费观看 | 久久一及片 | 99这里只有精品视频 | 亚洲国产成人精品久久 | 国产一区中文 | 久久久妇女国产精品影视 | 久久网站黄| 99综合网| 久久国产精品亚洲 | 亚洲日本乱码在线观看 | 粉嫩一区二区三区国产精品 |