面試題:一個Consumer訂閱兩個Topic,其中一個Topic消息過多堆積了,會影響另一個Topic消費嗎?
?無意中在網上看到這么一個問題,一個consumer訂閱兩個topic,其中一個topic消息過多堆積了,會影響另一個topic消費嗎?
對于RocketMQ這種,看源碼如何方便,于是乎我就開始找相應的源碼,然后一頓思考。
先給大家上結論,看堵塞的原因,如果原因是生產者瞬時產生大量的消息,比如秒殺,導致的消息堆積,基本不會影響;如果是消費者出現故障,消費速度變得奇慢無比,那就會影響,不過并不會阻塞,只是會影響速率?。
接下來帶著大家一起看源碼。
/**
* Rebalance Service
* consumer負載均衡線程服務
*/
public class RebalanceService extends ServiceThread {}
大家先把目光聚焦到這個負載均衡的線程服務上來,這個大家也看到了,每個20秒執行一次,這個主要負載均衡的邏輯在doRebalance方法中。
我們進去這個方法看。
進來之后可以看到,對consumerTable的對象進行循環,這個存儲的是所有的消費者,然后循環調用doRebalance,繼續進去看。
繼續往里沖。
線程的創建來到這里,我們可以看到核心處理是這個rebalanceByTopic,傳入的參數就是我們這個消費者監聽的topic。
這里的mqSet是該topic的所有consumerqueue,也就是默認創建的那4個隊列,當然,這個數量可以改變 。
然后我們可以看到allocateMessageQueueStrategy,這個是一個分配策略對象,調用其中的allocate來進行分配該topic的消息隊列。
這個分配策略也有幾種實現方式,大家看一看,根據名字其實大家也可以猜個八九不離十了,感興趣的可以點進去看看詳細的處理機制。
分配好之后,將隊列賦值給allocateResultSet這個對象,這里為啥要用set集合存儲呢?
我的個人猜測是,防止出現queue數量的重新改變的情況下,可能導致這里出現重復,這里增加一層set防止這種極端情況的出現。
接下來分配好隊列之后,主要的處理就是updateProcessQueueTableInRebalance,這個就是負責更新消息隊列,其實呢,也可以認為成把這個消費者需要負責的這些隊列賦值給它,也就是這是你的責任了,你這個消費者需要處理這些隊列。
我們進來updateProcessQueueTableInRebalance這個方法之后,上面的那些我就折疊起來不給大家看了,這里的處理主要也是針對于某些機器突然宕機或者增加一些機器的情況。
這個方法的主要處理是在最后這個拉取請求這里,也就是dispatchPullRequest這個,傳入的參數是一個pullRequest的list。
線程的創建進來之后,循環處理pullRequest,哎,還沒找到最底層,繼續點進去。
線程的創建哎。終于找到你了,就是你這個家伙,最后就執行了一個put方法,放進去的就是一個LinkedBlockingQueue隊列。
這個是一個拉取消息的請求隊列,請求的對象就是pullRequest。
實際處理的時候,也就是拉取消息的時候,多個線程會從LinkedBlockingQueue中去take消息,然后按照放入的順序去進行消費。
家解釋一下這個流程,這里就是rocketmq首先對消息會進行一個負載均衡Rebalance的過程,這個就是將topic中的consumerqueue隊列按照consumer進行分配,分配策略就是上面看到的那幾種。
將pullRequest放入到這個LinkedBlockingQueue中,這里放的是topic、brokerName、queueID這些,這個時候已經排好了后面消費的順序了。
比如有10個request中,大概5個是topicTest1,另外5個是topicTest2。
所以呢,這個時候假如topicTest1消息堆積了,還是會照常去消費topicTest2的,此時我們需要看這個堆積的原因,如果堆積是因為秒殺一類的場景導致瞬時間產生大量消息,這樣消費者還是會正常消費topicTest1,所以不會影響topicTest2,
但是,如果topicTest1消費速度很慢,導致所有線程都處理很慢,都被占了,那樣就會稍微影響topicTest2的速度了,不過那也只是暫時的,不會阻塞topicTest2的,