成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

萬字聊一聊RocketMQ一條消息短暫而又精彩的一生

開發 前端
當Broker在啟動的過程中,Broker就會往NameServer注冊自己這個Broker的信息,這些信息就包括自身所在服務器的ip和端口,還有就是自己這個Broker有哪些topic和對應的隊列信息,這些信息就是路由信息,后面就統一稱為路由表。

大家好,我是三友~~

這篇文章我準備來聊一聊RocketMQ消息的一生。

不知你是否跟我一樣,在使用RocketMQ的時候也有很多的疑惑:

  • 消息是如何發送的,隊列是如何選擇的?
  • 消息是如何存儲的,是如何保證讀寫的高性能?
  • RocketMQ是如何實現消息的快速查找的?
  • RocketMQ是如何實現高可用的?
  • 消息是在什么時候會被清除?
  • ...

本文就通過探討上述問題來探秘消息在RocketMQ中短暫而又精彩的一生。

核心概念

  • NameServer:可以理解為是一個注冊中心,主要是用來保存topic路由信息,管理Broker。在NameServer的集群中,NameServer與NameServer之間是沒有任何通信的。
  • Broker:核心的一個角色,主要是用來保存消息的,在啟動時會向NameServer進行注冊。Broker實例可以有很多個,相同的BrokerName可以稱為一個Broker組,每個Broker組只保存一部分消息。
  • topic:可以理解為一個消息的集合的名字,一個topic可以分布在不同的Broker組下。
  • 隊列(queue):一個topic可以有很多隊列,默認是一個topic在同一個Broker組中是4個。如果一個topic現在在2個Broker組中,那么就有可能有8個隊列。
  • 生產者:生產消息的一方就是生產者。
  • 生產者組:一個生產者組可以有很多生產者,只需要在創建生產者的時候指定生產者組,那么這個生產者就在那個生產者組。
  • 消費者:用來消費生產者消息的一方。
  • 消費者組:跟生產者一樣,每個消費者都有所在的消費者組,一個消費者組可以有很多的消費者,不同的消費者組消費消息是互不影響的。

消息誕生與發送

我們都知道,消息是由業務系統在運行過程產生的,當我們的業務系統產生了消息,我們就可以調用RocketMQ提供的API向RocketMQ發送消息,就像下面這樣:

DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
//指定NameServer的地址
producer.setNamesrvAddr("localhost:9876");
//啟動生產者
producer.start();
//省略代碼。。
Message msg = new Message("sanyouTopic", "TagA", "三友的java日記 ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發送消息并得到消息的發送結果,然后打印
SendResult sendResult = producer.send(msg);

雖然代碼很簡單,我們不經意間可能會思考如下問題:

  • 代碼中只設置了NameServer的地址,那么生產者是如何知道Broker所在機器的地址,然后向Broker發送消息的?
  • 一個topic會有很多隊列,那么生產者是如何選擇哪個隊列發送消息?
  • 消息一旦發送失敗了怎么辦?

路由表

當Broker在啟動的過程中,Broker就會往NameServer注冊自己這個Broker的信息,這些信息就包括自身所在服務器的ip和端口,還有就是自己這個Broker有哪些topic和對應的隊列信息,這些信息就是路由信息,后面就統一稱為路由表。

Broker向NameServer注冊Broker向NameServer注冊

當生產者啟動的時候,會從NameServer中拉取到路由表,緩存到本地,同時會開啟一個定時任務,默認是每隔30s從NameServer中重新拉取路由信息,更新本地緩存。

隊列的選擇

好了通過上一節我們就明白了,原來生產者會從NameServer拉取到Broker的路由表的信息,這樣生產者就知道了topic對應的隊列的信息了。

但是由于一個topic可能會有很多的隊列,那么應該將消息發送到哪個隊列上呢?

圖片圖片

面對這種情況,RocketMQ提供了兩種消息隊列的選擇算法。

  • 輪詢算法
  • 最小投遞延遲算法

輪詢算法 就是一個隊列一個隊列發送消息,這些就能保證消息能夠均勻分布在不同的隊列底下,這也是RocketMQ默認的隊列選擇算法。

但是由于機器性能或者其它情況可能會出現某些Broker上的Queue可能投遞延遲較嚴重,這樣就會導致生產者不能及時發消息,造成生產者壓力過大的問題。所以RocketMQ提供了最小投遞延遲算法。

最小投遞延遲算法 每次消息投遞的時候會統計投遞的時間延遲,在選擇隊列的時候會優先選擇投遞延遲時間小的隊列。這種算法可能會導致消息分布不均勻的問題。

如果你想啟用最小投遞延遲算法,只需要按如下方法設置一下即可。

producer.setSendLatencyFaultEnable(true);

當然除了上述兩種隊列選擇算法之外,你也可以自定義隊列選擇算法,只需要實現MessageQueueSelector接口,在發送消息的時候指定即可。

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        //從mqs中選擇一個隊列
        return null;
    }
}, new Object());

MessageQueueSelector RocketMQ也提供了三種實現:

  • 隨機算法
  • Hash算法
  • 根據機房選擇算法(空實現)

其它特殊情況處理

發送異常處理

終于,不論是通過RocketMQ默認的隊列選擇算法也好,又或是自定義隊列選擇算法也罷,終于選擇到了一個隊列,那么此時就可以跟這個隊列所在的Broker機器建立網絡連接,然后通過網絡請求將消息發送到Broker上。

但是不幸的事發生了,Broker掛了,又或者是機器負載太高了,發送消息超時了,那么此時RockerMQ就會進行重試。

RockerMQ重試其實很簡單,就是重新選擇其它Broker機器中的一個隊列進行消息發送,默認會重試兩次。

當然如果你的機器比較多,可以將設置重試次數設置大點。

producer.setRetryTimesWhenSendFailed(10);
消息過大的處理

一般情況下,消息的內容都不會太大,但是在一些特殊的場景中,消息內容可能會出現很大的情況。

遇到這種消息過大的情況,比如在默認情況下消息大小超過4k的時候,RocketMQ是會對消息進行壓縮之后再發送到Broker上,這樣在消息發送的時候就可以減少網絡資源的占用。

消息存儲

好了,經過以上環節Broker終于成功接收到了生產者發送的消息了,但是為了能夠保證Broker重啟之后消息也不丟失,此時就需要將消息持久化到磁盤。

如何保證高性能讀寫

由于涉及到消息持久化操作,就涉及到磁盤數據的讀寫操作,那么如何實現文件的高性能讀寫呢?這里就不得不提到的一個叫零拷貝的技術。

傳統IO讀寫方式

說零拷貝之前,先說一下傳統的IO讀寫方式。

比如現在需要將磁盤文件通過網絡傳輸出去,那么整個傳統的IO讀寫模型如下圖所示:

圖片圖片

傳統的IO讀寫其實就是read + write的操作,整個過程會分為如下幾步:

  • 用戶調用read()方法,開始讀取數據,此時發生一次上下文從用戶態到內核態的切換,也就是圖示的切換1。
  • 將磁盤數據通過DMA拷貝到內核緩存區。
  • 將內核緩存區的數據拷貝到用戶緩沖區,這樣用戶,也就是我們寫的代碼就能拿到文件的數據。
  • read()方法返回,此時就會從內核態切換到用戶態,也就是圖示的切換2。
  • 當我們拿到數據之后,就可以調用write()方法,此時上下文會從用戶態切換到內核態,即圖示切換3。
  • CPU將用戶緩沖區的數據拷貝到Socket緩沖區。
  • 將Socket緩沖區數據拷貝至網卡。
  • write()方法返回,上下文重新從內核態切換到用戶態,即圖示切換4。

整個過程發生了4次上下文切換和4次數據的拷貝,這在高并發場景下肯定會嚴重影響讀寫性能。

所以為了減少上下文切換次數和數據拷貝次數,就引入了零拷貝技術。

零拷貝

零拷貝技術是一個思想,指的是指計算機執行操作時,CPU不需要先將數據從某處內存復制到另一個特定區域。

實現零拷貝的有以下幾種方式:

  • mmap()
  • sendfile()
mmap()

mmap(memory map)是一種內存映射文件的方法,即將一個文件或者其它對象映射到進程的地址空間,實現文件磁盤地址和進程虛擬地址空間中一段虛擬地址的一一對映關系。

簡單地說就是內核緩沖區和應用緩沖區共享,從而減少了從讀緩沖區到用戶緩沖區的一次CPU拷貝。

比如基于mmap,上述的IO讀寫模型就可以變成這樣。

圖片圖片

基于mmap IO讀寫其實就變成mmap + write的操作,也就是用mmap替代傳統IO中的read操作。

當用戶發起mmap調用的時候會發生上下文切換1,進行內存映射,然后數據被拷貝到內核緩沖區,mmap返回,發生上下文切換2;隨后用戶調用write,發生上下文切換3,將內核緩沖區的數據拷貝到Socket緩沖區,write返回,發生上下文切換4。

整個過程相比于傳統IO主要是不用將內核緩沖區的數據拷貝到用戶緩沖區,而是直接將數據拷貝到Socket緩沖區。上下文切換的次數仍然是4次,但是拷貝次數只有3次,少了一次CPU拷貝。

在Java中,提供了相應的api可以實現mmap,當然底層也還是調用Linux系統的mmap()實現的。

FileChannel fileChannel = new RandomAccessFile("test.txt", "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileChannel.size());

如上代碼拿到MappedByteBuffer,之后就可以基于MappedByteBuffer去讀寫。

sendfile()

sendfile()跟mmap()一樣,也會減少一次CPU拷貝,但是它同時也會減少兩次上下文切換。

圖片圖片

如圖,用戶在發起sendfile()調用時會發生切換1,之后數據通過DMA拷貝到內核緩沖區,之后再將內核緩沖區的數據CPU拷貝到Socket緩沖區,最后拷貝到網卡,sendfile()返回,發生切換2。

同樣地,Java也提供了相應的api,底層還是操作系統的sendfile()。

FileChannel channel = FileChannel.open(Paths.get("./test.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
//調用transferTo方法向目標數據傳輸
channel.transferTo(position, len, target);

通過FileChannel的transferTo方法即可實現。

transferTo方法(sendfile)主要是用于文件傳輸,比如將文件傳輸到另一個文件,又或者是網絡。

在如上代碼中,并沒有文件的讀寫操作,而是直接將文件的數據傳輸到target目標緩沖區,也就是說,sendfile是無法知道文件的具體的數據的;但是mmap不一樣,他是可以修改內核緩沖區的數據的。假設如果需要對文件的內容進行修改之后再傳輸,只有mmap可以滿足。

通過上面的一些介紹,主要就是一個結論,那就是基于零拷貝技術,可以減少CPU的拷貝次數和上下文切換次數,從而可以實現文件高效的讀寫操作。

RocketMQ內部主要是使用基于mmap實現的零拷貝(其實就是調用上述提到的api),用來讀寫文件,這也是RocketMQ為什么快的一個很重要原因。

RocketMQ中使用mmap代碼RocketMQ中使用mmap代碼

CommitLog

前面提到消息需要持久化到磁盤文件中,而CommitLog其實就是存儲消息的文件的一個稱呼,所有的消息都存在CommitLog中,一個Broker實例只有一個CommitLog。

由于消息數據可能會很大,同時兼顧內存映射的效率,不可能將所有消息都寫到同一個文件中,所以CommitLog在物理磁盤文件上被分為多個磁盤文件,每個文件默認的固定大小是1G。

圖片圖片

當生產者將消息發送過來的時候,就會將消息按照順序寫到文件中,當文件空間不足時,就會重新建一個新的文件,消息寫到新的文件中。

圖片圖片

消息在寫入到文件時,不僅僅會包含消息本身的數據,也會包含其它的對消息進行描述的數據,比如這個消息來自哪臺機器、消息是哪個topic的、消息的長度等等,這些數據會和消息本身按照一定的順序同時寫到文件中,所以圖示的消息其實是包含消息的描述信息的。

刷盤機制

RocketMQ在將消息寫到CommitLog文件中時并不是直接就寫到文件中,而是先寫到PageCache,也就是前面說的內核緩存區,所以RocketMQ提供了兩種刷盤機制,來將內核緩存區的數據刷到磁盤。

異步刷盤

異步刷盤就是指Broker將消息寫到PageCache的時候,就直接返回給生產者說消息存儲成功了,然后通過另一個后臺線程來將消息刷到磁盤,這個后臺線程是在RokcetMQ啟動的時候就會開啟。異步刷盤方式也是RocketMQ默認的刷盤方式。

其實RocketMQ的異步刷盤也有兩種不同的方式,一種是固定時間,默認是每隔0.5s就會刷一次盤;另一種就是頻率會快點,就是每存一次消息就會通知去刷盤,但不會去等待刷盤的結果,同時如果0.5s內沒被通知去刷盤,也會主動去刷一次盤。默認的是第一種固定時間的方式。

同步刷盤

同步刷盤就是指Broker將消息寫到PageCache的時候,會等待異步線程將消息成功刷到磁盤之后再返回給生產者說消息存儲成功。

同步刷盤相對于異步刷盤來說消息的可靠性更高,因為異步刷盤可能出現消息并沒有成功刷到磁盤時,機器就宕機的情況,此時消息就丟了;但是同步刷盤需要等待消息刷到磁盤,那么相比異步刷盤吞吐量會降低。所以同步刷盤適合那種對數據可靠性要求高的場景。

如果你需要使用同步刷盤機制,只需要在配置文件指定一下刷盤機制即可。

高可用

在說高可用之前,先來完善一下前面的一些概念。

在前面介紹概念的時候也說過,一個RokcetMQ中可以有很多個Broker實例,相同的BrokerName稱為一個組,同一個Broker組下每個Broker實例保存的消息是一樣的,不同的Broker組保存的消息是不一樣的。

圖片圖片

如圖所示,兩個BrokerA實例組成了一個Broker組,兩個BrokerB實例也組成了一個Broker組。

前面說過,每個Broker實例都有一個CommitLog文件來存儲消息的。那么兩個BrokerA實例他們CommitLog文件存儲的消息是一樣的,兩個BrokerB實例他們CommitLog文件存儲的消息也是一樣的。

那么BrokerA和BrokerB存的消息不一樣是什么意思呢?

其實很容易理解,假設現在有個topicA存在BrokerA和BrokerB上,那么topicA在BrokerA和BrokerB默認都會有4個隊列。

前面在說發消息的時候需要選擇一個隊列進行消息的發送,假設第一次選擇了BrokerA上的隊列發送消息,那么此時這條消息就存在BrokerA上,假設第二次選擇了BrokerB上的隊列發送消息,那么那么此時這條消息就存在BrokerB上,所以說BrokerA和BrokerB存的消息是不一樣的。

那么為什么同一個Broker組內的Broker存儲的消息是一樣的呢?其實比較容易猜到,就是為了保證Broker的高可用,這樣就算Broker組中的某個Broker掛了,這個Broker組依然可以對外提供服務。

那么如何實現同Broker組的Broker存的消息數據相同的呢?這就不得不提到Broker的高可用模式。

RocketMQ提供了兩種Broker的高可用模式:

  • 主從同步模式
  • Dledger模式

主從同步模式

在主從同步模式下,在啟動的時候需要在配置文件中指定BrokerId,在同一個Broker組中,BrokerId為0的是主節點(master),其余為從節點(slave)。

當生產者將消息寫入到主節點是,主節點會將消息內容同步到從節點機器上,這樣一旦主節點宕機,從節點機器依然可以提供服務。

主從同步主要同步兩部分數據

  • topic等數據
  • 消息

topic等數據是從節點每隔10s鐘主動去主節點拉取,然后更新本身緩存的數據。

消息是主節點主動推送到從節點的。當主節點收到消息之后,會將消息通過兩者之間建立的網絡連接發送出去,從節點接收到消息之后,寫到CommitLog即可。

圖片圖片

從節點有兩種方式知道主節點所在服務器的地址,第一種就是在配置文件指定;第二種就是從節點在注冊到NameServer的時候會返回主節點的地址。

主從同步模式有一個比較嚴重的問題就是如果集群中的主節點掛了,這時需要人為進行干預,手動進行重啟或者切換操作,而非集群自己從從節點中選擇一個節點升級為主節點。

為了解決上述的問題,所以RocketMQ在4.5.0就引入了Dledger模式。

Dledger模式

在Dledger模式下的集群會基于Raft協議選出一個節點作為leader節點,當leader節點掛了后,會從follower中自動選出一個節點升級成為leader節點。所以Dledger模式解決了主從模式下無法自動選擇主節點的問題。

在Dledger集群中,leader節點負責寫入消息,當消息寫入leader節點之后,leader會將消息同步到follower節點,當集群中過半數(節點數/2 +1)節點都成功寫入了消息,這條消息才算真正寫成功。

至于選舉的細節,這里就不多說了,有興趣的可以自行谷歌,還是挺有意思的。

消息消費

終于,在生產者成功發送消息到Broker,Broker在成功存儲消息之后,消費者要消費消息了。

消費者在啟動的時候會從NameSrever拉取消費者訂閱的topic的路由信息,這樣就知道訂閱的topic有哪些queue,以及queue所在Broker的地址信息。

為什么消費者需要知道topic對應的哪些queue呢?

其實主要是因為消費者在消費消息的時候是以隊列為消費單元的,消費者需要告訴Broker拉取的是哪個隊列的消息,至于如何拉到消息的,后面再說。

圖片圖片

消費的兩種模式

前面說過,消費者是有個消費者組的概念,在啟動消費者的時候會指定該消費者屬于哪個消費者組。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");

一個消費者組中可以有多個消費者,不同消費者組之間消費消息是互不干擾的。

在同一個消費者組中,消息消費有兩種模式。

  • 集群模式
  • 廣播模式

集群模式

同一條消息只能被同一個消費組下的一個消費者消費,也就是說,同一條消息在同一個消費者組底下只會被消費一次,這就叫集群消費。

集群消費的實現就是將隊列按照一定的算法分配給消費者,默認是按照平均分配的。

圖片圖片

如圖所示,將每個隊列分配只分配給同一個消費者組中的一個消費者,這樣消息就只會被一個消費者消費,從而實現了集群消費的效果。

RocketMQ默認是集群消費的模式。

廣播模式

廣播模式就是同一條消息可以被同一個消費者組下的所有消費者消費。

其實實現也很簡單,就是將所有隊列分配給每個消費者,這樣每個消費者都能讀取topic底下所有的隊列的數據,就實現了廣播模式。

圖片圖片

如果你想使用廣播模式,只需要在代碼中指定即可。

consumer.setMessageModel(MessageModel.BROADCASTING);

ConsumeQueue

上一節我們提到消費者是從隊列中拉取消息的,但是這里不經就有一個疑問,那就是消息明明都存在CommitLog文件中的,那么是如何去隊列中拉的呢?難道是去遍歷所有的文件,找到對應隊列的消息進行消費么?

答案是否定的,因為這種每次都遍歷數據的效率會很低,所以為了解決這種問題,引入了ConsumeQueue的這個概念,而消費實際是從ConsumeQueue中拉取數據的。

用戶在創建topic的時候,Broker會為topic創建隊列,并且每個隊列其實會有一個編號queueId,每個隊列都會對應一個ConsumeQueue,比如說一個topic在某個Broker上有4個隊列,那么就有4個ConsumeQueue。

前面說過,消息在發送的時候,會根據一定的算法選擇一個隊列,之后再發送消息的時候會攜帶選擇隊列的queueId,這樣Broker就知道消息屬于哪個隊列的了。當消息被存到CommitLog之后,其實還會往這條消息所在的隊列的ConsumeQueue插一條數據。

ConsumeQueue也是由多個文件組成,每個文件默認是存30萬條數據。

插入ConsumeQueue中的每條數據由20個字節組成,包含3部分信息,消息在CommitLog的起始位置(8個字節),消息在CommitLog存儲的長度(8個字節),還有就是tag的hashCode(4個字節)。

圖片圖片

所以當消費者從Broker拉取消息的時候,會告訴Broker拉取哪個隊列(queueId)的消息、這個隊列的哪個位置的消息(queueOffset)。

queueOffset就是指上圖中ConsumeQueue一條數據的編號,單調遞增的。

Broker在接受到消息的時候,找個指定隊列的ConsumeQueue,由于每條數據固定是20個字節,所以可以輕易地計算出queueOffset對應的那條數據在哪個文件的哪個位置上,然后讀出20個字節,從這20個字節中在解析出消息在CommitLog的起始位置和存儲的長度,之后再到CommitLog中去查找,這樣就找到了消息,然后在進行一些處理操作返回給消費者。

到這,我們就清楚的知道消費者是如何從隊列中拉取消息的了,其實就是先從這個隊列對應的ConsumeQueue中找到消息所在CommmitLog中的位置,然后再從CommmitLog中讀取消息的。

RocketMQ如何實現消息的順序性

這里插入一個比較常見的一個面試,那么如何保證保證消息的順序性。

其實要想保證消息的順序只要保證以下三點即可

  • 生產者將需要保證順序的消息發送到同一個隊列
  • 消息隊列在存儲消息的時候按照順序存儲
  • 消費者按照順序消費消息

圖片圖片

第一點如何保證生產者將消息發送到同一個隊列?

上文提到過RocketMQ生產者在發送消息的時候需要選擇一個隊列,并且選擇算法是可以自定義的,這樣我們只需要在根據業務需要,自定義隊列選擇算法,將順序消息都指定到同一個隊列,在發送消息的時候指定該算法,這樣就實現了生產者發送消息的順序性。

第二點,RocketMQ在存消息的時候,是按照順序保存消息在ConsumeQueue中的位置的,由于消費消息的時候是先從ConsumeQueue查找消息的位置,這樣也就保證了消息存儲的順序性。

第三點消費者按照順序消費消息,這個RocketMQ已經實現了,只需要在消費消息的時候指定按照順序消息消費即可,如下面所示,注冊消息的監聽器的時候使用MessageListenerOrderly這個接口的實現。

consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        //按照順序消費消息記錄
        return null;
    }
});

消息清理

由于消息是存磁盤的,但是磁盤空間是有限的,所以對于磁盤上的消息是需要清理的。

當出現以下幾種情況下時就會觸發消息清理:

  • 手動執行刪除
  • 默認每天凌晨4點會自動清理過期的文件
  • 當磁盤空間占用率默認達到75%之后,會自動清理過期文件
  • 當磁盤空間占用率默認達到85%之后,無論這個文件是否過期,都會被清理掉

上述過期的文件是指文件最后一次修改的時間超過72小時(默認情況下),當然如果你的老板非常有錢,服務器的磁盤空間非常大,可以將這個過期時間修改的更長一點。

有的小伙伴肯定會有疑問,如果消息沒有被消息,那么會被清理么?

答案是會被清理的,因為清理消息是直接刪除CommitLog文件,所以只要達到上面的條件就會直接刪除CommitLog文件,無論文件內的消息是否被消費過。

當消息被清理完之后,消息也就結束了它精彩的一生。

消息的一生總結

為了更好地理解本文,這里再來總結一下RokcetMQ消息一生的各個環節。

消息發送

  • 生產者產生消息
  • 生產者在發送消息之前會拉取topic的路由信息
  • 根據隊列選擇算法,從topic眾多的隊列中選擇一個隊列
  • 跟隊列所在的Broker機器建立網絡連接,將消息發送到Broker上

消息存儲

  • Broker接收到生產者的消息將消息存到CommitLog中
  • 在CosumeQueue中存儲這條消息在CommitLog中的位置

由于CommitLog和CosumeQueue都涉及到磁盤文件的讀寫操作,為了提高讀寫效率,RokcetMQ使用到了零拷貝技術,其實就是調用了一下Java提供的api。。

高可用

如果是集群模式,那么消息會被同步到從節點,從節點會將消息存到自己的CommitLog文件中。這樣就算主節點掛了,從節點仍然可以對外提供訪問。

消息消費

  • 消費者會拉取訂閱的Topic的路由信息,根據集群消費或者廣播消費的模式來選擇需要拉取消息的隊列
  • 與隊列所在的機器建立連接,向Broker發送拉取消息的請求
  • Broker在接收到請求知道,找到隊列對應的ConsumeQueue,然后計算出拉取消息的位置,再解析出消息在CommitLog中的位置
  • 根據解析出的位置,從CommitLog中讀出消息的內容返回給消費者

消息清理

由于消息是存在磁盤的,而磁盤的空間是有限的,所以RocketMQ會根據一些條件去清理CommitLog文件。

最后

最后,如果有對RocketMQ源碼感興趣的小伙伴可以從如下地址中拉取RocketMQ源碼,里面我已經對RocketMQ一些源碼進行了注釋。

https://github.com/sanyou3/rocketmq.git
責任編輯:武曉燕 來源: 三友的java日記
相關推薦

2023-01-10 08:20:55

RocketMQ消息源碼

2018-07-30 16:32:25

應屆生認知誤區

2024-10-16 15:11:58

消息隊列系統設計

2023-07-06 13:56:14

微軟Skype

2020-09-08 06:54:29

Java Gradle語言

2023-09-22 17:36:37

2021-01-28 22:31:33

分組密碼算法

2020-05-22 08:16:07

PONGPONXG-PON

2018-06-07 13:17:12

契約測試單元測試API測試

2021-08-01 09:55:57

Netty時間輪中間件

2023-09-27 16:39:38

2024-10-28 21:02:36

消息框應用程序

2021-12-06 09:43:01

鏈表節點函數

2021-07-16 11:48:26

模型 .NET微軟

2023-09-20 23:01:03

Twitter算法

2021-03-01 18:37:15

MySQL存儲數據

2019-02-13 14:15:59

Linux版本Fedora

2021-08-04 09:32:05

Typescript 技巧Partial

2021-01-29 08:32:21

數據結構數組

2021-02-06 08:34:49

函數memoize文檔
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲精品国产电影 | 国产日本精品视频 | 久久国产欧美日韩精品 | 激情国产视频 | 久久久精品 | 成人亚洲网 | 在线观看免费黄色片 | 毛片毛片毛片毛片毛片 | 日韩电影中文字幕在线观看 | 亚洲国产成人av好男人在线观看 | 欧美日韩三级 | 中文字幕av在线 | 日韩在线免费播放 | 久久久影院 | 亚洲日本视频 | 成人精品在线观看 | av免费看片 | 在线国产一区 | 成人亚洲精品久久久久软件 | 在线观看黄视频 | 播放一级黄色片 | 狠狠伊人 | 国产三级大片 | 中文字幕国产高清 | 色又黄又爽网站www久久 | 欧美一区二区三区大片 | 国产一级黄色网 | 久久综合爱 | 亚洲在线一区二区 | 久久久.com | 日韩中文在线 | 久久精品网 | 在线观看中文字幕视频 | 中文亚洲视频 | se婷婷| 久久成人18免费网站 | 999热精品 | 日本人爽p大片免费看 | 国产午夜精品久久久久免费视高清 | 国产精品久久久久久久久久免费看 | 午夜激情在线视频 |