為什么kafka性能下降這么快,我用RocketMQ的時候不會這樣子
?Rocketmq和kafka這兩個消息隊列大家應該都比較熟悉吧,哪怕不是很熟悉,應該也聽說過的吧,你別告訴我,作為一個資深的程序員,你沒聽過這兩門技術。
我之前使用這兩個消息隊列的時候就遇到一個很奇怪的問題,就是在kafka里面弄了比較多的topic,性能下降的速度賊快,不知道大家遇到過沒,而同樣的場景切換到消息隊列rocketmq中,下降速度卻沒有那么快。
不熟悉這倆消息隊列結構的朋友,一聽這個肯定還是不太清楚的,今天我來給大家分析分析這其中的原因,給大家解惑。
rocketmq的結構
NameServer:主要是對元數據的管理,包括Topic和路由信息的管理,底層由netty實現,是一個提供路由管理、路由注冊和發現的無狀態節點,類似于ZooKeeper
Broker:消息中轉站,負責收發消息、持久化消息
Producer:消息的生產者,一般由業務系統來產生消息供消費者消費
Consumer:消息的消費者,一般由業務系統來異步消費消息
在RocketMQ中的每一條消息,都有一個Topic,用來區分不同的消息。一個主題一般會有多個消息的訂閱者,當生產者發布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生產者寫入的新消息。
在Topic中有分為了多個Queue,這其實是我們發送/讀取消息通道的最小單位,我們發送消息都需要指定某個寫入某個Queue,拉取消息的時候也需要指定拉取某個Queue,所以我們的順序消息可以基于我們的Queue維度保持隊列有序,如果想做到全局有序那么需要將Queue大小設置為1,這樣所有的數據都會在Queue中有序。
我們同一組Consumer會根據一些策略來選Queue,常見的比如平均分配或者一致性Hash分配。
要注意的是當Consumer出現下線或者上線的時候,這里需要做重平衡,也就是Rebalance,RocketMQ的重平衡機制如下:
定時拉取broker,topic的最新信息,每隔20s做重平衡,隨機選取當前Topic的一個主Broker,這里要注意的是不是每次重平衡所有主Broker都會被選中,因為會存在一個Broker再多個Broker的情況。
獲取當前Broker,當前ConsumerGroup的所有機器ID。然后進行策略分配。
由于重平衡是定時做的,所以這里有可能會出現某個Queue同時被兩個Consumer消費,所以會出現消息重復投遞。
Queue讀寫數量不一致
在RocketMQ中Queue被分為讀和寫兩種,在最開始接觸RocketMQ的時候一直以為讀寫隊列數量配置不一致不會出現什么問題的,比如當消費者機器很多的時候我們配置很多讀的隊列,但是實際過程中發現會出現消息無法消費和根本沒有消息消費的情況。
當寫的隊列數量大于讀的隊列的數量,當大于讀隊列這部分ID的寫隊列的數據會無法消費,因為不會將其分配給消費者。
當讀的隊列數量大于寫的隊列數量,那么多的隊列數量就不會有消息被投遞進來。
rocketmq中的存儲機制
RocketMQ憑借其強大的存儲能力和強大的消息索引能力,以及各種類型消息和消息的特性脫穎而出,于是乎,我們這些有夢想的程序員學習RocketMQ的存儲原理也變得尤為重要
而要說起這個存儲原理,則不得不說的就是RocketMQ的消息存儲文件commitLog文件,消費方則是憑借著巧妙的設計Consumerqueue文件來進行高性能并且不混亂的消費,還有RocketMQ的強大的支持消息索引的特性,靠的就是indexfile索引文件
我們這篇文章就從這commitLog、Consumerqueue、indexfile這三個神秘的文件說起,搞懂這三個文件,RocketMQ的核心就被你掏空了
先上個圖,寫入commitLog文件時commitLog和Consumerqueue、indexfile文件三者的關系
commitLog
RocketMQ中的消息存儲文件放在${ROCKET_HOME}/store 目錄下,當生產者發送消息時,broker會將消息存儲到Commit文件夾下,文件夾下面會有一個commitLog文件,但是并不是意味著這個文件叫這個,文件命名是根據消息的偏移量來決定的。
文件有自己的生成規則,每個commitLog文件的大小是1G,一般情況下第一個 CommitLog 的起始偏移量為 0,第二個 CommitLog 的起始偏移量為 1073741824 (1G = 1073741824byte)。
commitLog文件的最大的一個特點就是消息的順序寫入,隨機讀寫,所有的topic的消息都存儲到commitLog文件中,順序寫入可以充分的利用磁盤順序減少了IO爭用數據存儲的性能,kafka也是通過硬盤順序存盤的。
大家都常說硬盤的速度比內存慢,其實這句話也是有歧義的,當硬盤順序寫入和讀取的時候,速度不比內存慢,甚至比內存速度快,這種存儲方式就好比數組,我們如果知道數組的下標,則可以直接通過下標計算出位置,找到內存地址,眾所周知,數組的讀取是很快的,但是數組的缺點在于插入數據比較慢,因為如果在中間插入數據需要將后面的數據往后移動。
而對于數組來說,如果我們只會順序的往后添加,數組的速度也是很快的,因為數組沒有后續的數據的移動,這一操作很耗時。
回到RocketMQ中的commitLog文件,也是同樣的道理,順序的寫入文件也就不需要太多的去考慮寫入的位置,直接找到文件往后放就可以了,而取數據的時候,也是和數組一樣,我們可以通過文件的大小去精準的定位到哪一個文件,然后再精準的定位到文件的位置。
consumerqueue文件
RocketMQ是分為多個topic,消息所屬主題,屬于消息類型,每一個topic有多個queue,每個queue放著不同的消息,在同一個消費者組下的消費者,可以同時消費同一個topic下的不同queue隊列的消息。不同消費者下的消費者,可以同時消費同一個topic下的相同的隊列的消息。而同一個消費者組下的消費者,不可以同時消費不同topic下的消息。
而每個topic下的queue隊列都會對應一個Consumerqueue文件,這個存儲的就是對應的commitLog文件中的索引位置,而不用存儲真實的消息。
consumequeue存放在store文件里面,里面的consumequeue文件里面按照topic排放,然后每個topic默認4個隊列,里面存放的consumequeue文件。
ConsumeQueue中并不需要存儲消息的內容,而存儲的是消息在CommitLog中的offset。也就是說ConsumeQueue其實是CommitLog的一個索引文件。
consumequeue是定長結構,每個記錄固定大小20個字節,單個consumequeue文件默認包含30w個條目,所以單個文件大小大概6M左右。
很顯然,Consumer消費消息的時候,要讀2次:先讀ConsumeQueue得到offset,再通過offset找到CommitLog對應的消息內容。
IndexFile
RocketMQ還支持通過MessageID或者MessageKey來查詢消息,使用ID查詢時,因為ID就是用broker+offset生成的(這里msgId指的是服務端的),所以很容易就找到對應的commitLog文件來讀取消息。
對于用MessageKey來查詢消息,MessageStore通過構建一個index來提高讀取速度。
indexfile文件存儲在store目錄下的index文件里面,里面存放的是消息的hashcode和index內容,文件由一個文件頭組成:長40字節。500w個hashslot,每個4字節。2000w個index條目,每個20字節。
所以這里我們可以估算每個indexfile的大小為:40+500w4+2000w20個字節,大約400M左右。
每放入一個新消息的index進來,首先會取MessageKey的HashCode,然后用Hashcode對slot的總數進行取模,決定該消息key的位置,slot的總數默認是500W個。
只要取hash就必然面臨著hash沖突的問題,indexfile也是采用鏈表結構來解決hash沖突,這一點和HashMap一樣的,不過這個不存在紅黑樹轉換這一說,個人猜測這個的沖突數量也達不到很高的級別,所以進行這方面的設計也沒啥必要,甚至變成了強行增加indexfile的文件結構難度。
還有,在indexfile中的slot中放的是最新的index的指針,因為一般查詢的時候大概率是優先查詢最近的消息。
每個slot中放的指針值是索引在indexfile中的偏移量,也就是后面index的位置,而index中存放的就是該消息在commitlog文件中的offset,每個index的大小是20字節,所以根據當前索引是這個文件中的第幾個偏移量,也就很容易定位到索引的位置,根據前面的固定大小可以很快把真實坐標算出來,以此類推,形成一個鏈表的結構。
kafka的結構
Broker:消息中間件處理節點(服務器),一個節點就是一個broker,一個Kafka集群由一個或多個broker組成。
Topic:Kafka對消息進行歸類,發送到集群的每一條消息都要指定一個topic。
Partition:物理上的概念,每個topic包含一個或多個partition,一個partition對應一個文件夾,這個文件夾下存儲partition的數據和索引文件,每個partition內部是有序的。
Producer:生產者,負責發布消息到broker。
Consumer:消費者,從broker讀取消息。
ConsumerGroup:每個consumer屬于一個特定的consumer group,可為每個consumer指定group name,若不指定,則屬于默認的group,一條消息可以發送到不同的consumer group,但一個consumer group中只能有一個consumer能消費這條消息。
kafka存儲機制
我們的生產者會決定發送到哪個 Partition,如果沒有 Key 值則進行輪詢發送。
如果有 Key 值,對 Key 值進行 Hash,然后對分區數量取余,保證了同一個 Key 值的會被路由到同一個分區。(所有系統的partition都是同一個路數)。
總所周知,topic在物理層面以partition為分組,一個topic可以分成若干個partition,那么topic以及partition又是怎么存儲的呢?
其實partition還可以細分為logSegment,一個partition物理上由多個logSegment組成,那么這些segment又是什么呢?
LogSegment 文件由兩部分組成,分別為“.index”文件和“.log”文件,分別表示為 Segment 索引文件和數據文件。
這兩個文件的命令規則為:partition全局的第一個segment從0開始,后續每個segment文件名為上一個segment文件最后一條消息的offset值,數值大小為64位,20位數字字符長度,沒有數字用0填充,如下:
第一個segment
00000000000000000000.index
00000000000000000000.log
第二個segment,文件命名以第一個segment的最后一條消息的offset組成
00000000000000170410.index
00000000000000170410.log
第三個segment,文件命名以上一個segment的最后一條消息的offset組成
00000000000000239430.index
00000000000000239430.log
“.index”索引文件存儲大量的元數據,“.log”數據文件存儲大量的消息,索引文件中的元數據指向對應數據文件中message的物理偏移地址。
kafka和rocketmq的比較
RocketMQ和Kafka的存儲核心設計有很大的不同,所以其在寫入性能方面也有很大的差別,這是16年阿里中間件團隊對RocketMQ和Kafka不同Topic下做的性能測試:
從圖上可以看出:
Kafka在Topic數量由64增長到256時,吞吐量下降了98.37%。
RocketMQ在Topic數量由64增長到256時,吞吐量只下降了16%。
這是為什么呢?
kafka一個topic下面的所有消息都是以partition的方式分布式的存儲在多個節點上。同時在kafka的機器上,每個Partition其實都會對應一個日志目錄,在目錄下面會對應多個日志分段。
所以如果Topic很多的時候Kafka雖然寫文件是順序寫,但實際上文件過多,會造成磁盤IO競爭非常激烈。
那RocketMQ為什么在多Topic的情況下,依然還能很好的保持較多的吞吐量呢?我們首先來看一下RocketMQ中比較關鍵的文件:
rocketmq中的消息主體數據并沒有像Kafka一樣寫入多個文件,而是寫入一個文件,這樣我們的寫入IO競爭就非常小,可以在很多Topic的時候依然保持很高的吞吐量。
有人可能說這里的ConsumeQueue寫是在不停的寫入呢,并且ConsumeQueue是以Queue維度來創建文件,那么文件數量依然很多,在這里ConsumeQueue的寫入的數據量很小,每條消息只有20個字節,30W條數據也才6M左右,所以其實對我們的影響相對Kafka的Topic之間影響是要小很多的。
再順便提一嘴,一個topic分了一萬個partition和一萬個topic每個topic都是單partition對于kafka的負載是一樣的。