RocketMQ負(fù)載均衡機(jī)制解析!
消費(fèi)者在消費(fèi)消息的時(shí)候,需要知道從Broker的哪一個(gè)消息隊(duì)列中去獲取消息。
所以,在消費(fèi)者端必須要做負(fù)載均衡,即Broker端中多個(gè)消費(fèi)隊(duì)列分配給同一個(gè)消費(fèi)者組中的哪些消費(fèi)者消費(fèi)。
在RocketMQ中,在消費(fèi)者端有一個(gè):Rebalance負(fù)載均衡組件。
- 他負(fù)責(zé)相對(duì)均勻的給消費(fèi)者分配需要拉取的隊(duì)列信息。
消費(fèi)者負(fù)載均衡
指為消費(fèi)組下的每個(gè)消費(fèi)者分配訂閱主題下的消費(fèi)隊(duì)列,分配了消費(fèi)隊(duì)列消費(fèi)者就可以知道去消費(fèi)哪個(gè)消費(fèi)隊(duì)列上面的消息。
- 這里針對(duì)集群模式,因?yàn)閺V播模式,所有的消息隊(duì)列可以被消費(fèi)組下的每個(gè)消費(fèi)者消費(fèi)不涉及負(fù)載均衡。
而集群模式一個(gè)消息隊(duì)列同一時(shí)間只能分配給組內(nèi)的一個(gè)消費(fèi)者進(jìn)行消費(fèi)。
RocketMQ5.0以前是按照隊(duì)列粒度進(jìn)行負(fù)載均衡的,5.0以后提供了按消息粒度進(jìn)行負(fù)載均衡。
隊(duì)列粒度負(fù)載均衡
隊(duì)列粒度負(fù)載均衡策略中,同一消費(fèi)者組內(nèi)的多個(gè)消費(fèi)者將按照隊(duì)列粒度消費(fèi)消息,每個(gè)隊(duì)列只能被其中一個(gè)消費(fèi)者消費(fèi)。
隊(duì)列粒度負(fù)載均衡是在每個(gè)消費(fèi)者端進(jìn)行的,并不是由某個(gè)節(jié)點(diǎn)統(tǒng)一進(jìn)行負(fù)載均衡之后將分配結(jié)果通知到每個(gè)消費(fèi)者。
消費(fèi)者增加或者減少會(huì)影響消息隊(duì)列的分配,所以Broker需要感知消費(fèi)者的上下線(xiàn)情況。
消費(fèi)者在啟動(dòng)時(shí)會(huì)向所有的Broker發(fā)送心跳包進(jìn)行注冊(cè),通知Broker消費(fèi)者上線(xiàn),下線(xiàn)的時(shí)候也會(huì)向Broker發(fā)送取消注冊(cè)的請(qǐng)求。
Broker會(huì)維護(hù)消費(fèi)者信息的注冊(cè)信息,在消費(fèi)者發(fā)生變更時(shí)會(huì)通知消費(fèi)者進(jìn)行負(fù)載均衡。
Rebalance觸發(fā)時(shí)機(jī)
消費(fèi)者啟動(dòng)時(shí)觸發(fā):
消費(fèi)者在啟動(dòng)時(shí)會(huì)進(jìn)行一次負(fù)載均衡,為自己分配消息隊(duì)列。
Broker發(fā)現(xiàn)消費(fèi)組變更時(shí)觸發(fā):
處于以下兩種情況之一時(shí)會(huì)被判斷為消費(fèi)組發(fā)生了變化,需要進(jìn)行負(fù)載均衡:
- 某個(gè)消費(fèi)組內(nèi)有新的消費(fèi)者向Broker進(jìn)行了注冊(cè)。
- 比如某個(gè)消費(fèi)組原來(lái)有兩個(gè)消費(fèi)者,現(xiàn)在新增了一個(gè)消費(fèi)者,新增的消費(fèi)者啟動(dòng)時(shí)會(huì)向Broker發(fā)送注冊(cè)請(qǐng)求。
- 消費(fèi)組訂閱的主題信息發(fā)生了變化。
- 比如消費(fèi)組新增訂閱了某個(gè)主題或者取消某個(gè)主題的訂閱,會(huì)被判斷為主題訂閱信息發(fā)生了變化。
被判定為變化之后,會(huì)觸發(fā)變更事件,向該消費(fèi)者下的所有消費(fèi)者發(fā)送發(fā)送變更請(qǐng)求,通知組下每個(gè)消費(fèi)者進(jìn)行負(fù)載均衡。
Broker收到消費(fèi)者下線(xiàn)時(shí)觸發(fā):
如果有消費(fèi)者向Broker發(fā)送UNREGISTER_CLIENT取消注冊(cè)請(qǐng)求,并且開(kāi)啟了允許通知變更,會(huì)觸發(fā)變更事件。
變更事件同上,Broker會(huì)通知該消費(fèi)者組下的所有消費(fèi)者進(jìn)行一次負(fù)載均衡。
消費(fèi)者定時(shí)觸發(fā):
消費(fèi)者本身也會(huì)定時(shí)執(zhí)行負(fù)載均衡,默認(rèn)是20s執(zhí)行一次。
圖片
消息粒度負(fù)載均衡
在RocketMQ5.0之后,增加了消息粒度負(fù)載均衡策略,默認(rèn)且僅使用消息粒度負(fù)載均衡策略。
消息粒度負(fù)載均衡策略中,同一消費(fèi)組內(nèi)的多個(gè)消費(fèi)者將按照消息粒度平均分?jǐn)傊黝}中的所有消息。
- 即同一個(gè)隊(duì)列中的消息,可被平均分配給組內(nèi)多個(gè)消費(fèi)者共同消費(fèi)。
消息粒度負(fù)載均衡策略保證同一個(gè)隊(duì)列的消息可以被組內(nèi)多個(gè)消費(fèi)者共同處理。
但是該策略使用的消息分配算法結(jié)果是隨機(jī)的,不能指定消息被哪一個(gè)特定的消費(fèi)者處理。
- 當(dāng)消費(fèi)者獲取到某條消息后,服務(wù)端會(huì)對(duì)該消息加鎖,保證該消息對(duì)其他消費(fèi)者不可見(jiàn),直到消息消費(fèi)成功或者超時(shí)。
所以多個(gè)消費(fèi)者同時(shí)消費(fèi)同一個(gè)消息隊(duì)列中的消息,服務(wù)端也可以保證消息不會(huì)被多個(gè)消費(fèi)者重復(fù)消費(fèi)。
消息粒度負(fù)載均衡策略適用于絕大多數(shù)在線(xiàn)處理的業(yè)務(wù)場(chǎng)景,對(duì)于流式處理、聚合計(jì)算等場(chǎng)景,更適合隊(duì)列粒度的負(fù)載均衡策略。
執(zhí)行流程
負(fù)載均衡服務(wù)執(zhí)行邏輯在doRebalance函數(shù),里面會(huì)對(duì)每個(gè)消費(fèi)者組執(zhí)行負(fù)載均衡操作。
consumerTable這個(gè)map對(duì)象里存儲(chǔ)了消費(fèi)者組對(duì)應(yīng)的的消費(fèi)者實(shí)例。
private ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
public void doRebalance() {
//每個(gè)消費(fèi)者組都有負(fù)載均衡
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
由于每個(gè)消費(fèi)者組可能會(huì)消費(fèi)很多topic,每個(gè)topic都有自己的不同隊(duì)列,最終是按topic的維度進(jìn)行負(fù)載均衡。
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//按topic維度執(zhí)行負(fù)載均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
最終負(fù)載均衡邏輯處理的實(shí)現(xiàn)在:
- org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic。
其中分為廣播消息和集群消息模型兩種情況處理。
圖片
負(fù)載均衡核心功能的主流程,主要做了4件事情:
圖片
負(fù)載均衡策略原理
負(fù)載均衡策略頂層接口:
/**
* Strategy Algorithm for message allocating between consumers
*/
public interface AllocateMessageQueueStrategy {
/**
* Allocating by consumer id
* 給消費(fèi)者id分配消費(fèi)隊(duì)列
*/
List<MessageQueue> allocate(
final String consumerGroup, //消費(fèi)者組
final String currentCID, //當(dāng)前消費(fèi)者id
final List<MessageQueue> mqAll, //所有的隊(duì)列
final List<String> cidAll //所有的消費(fèi)者
);
}
他默認(rèn)共有7種負(fù)載均衡策略實(shí)現(xiàn)。
圖片
最常用的兩種平均分配算法:
AllocateMessageQueueAveragely
是用總數(shù)除以消費(fèi)者個(gè)數(shù),余數(shù)按消費(fèi)者順序分配給消費(fèi)者。
AlocateMessageQueueAveragelyByCircle
輪流一個(gè)一個(gè)分配。
參考:https://rocketmq.apache.org/zh/docs/featureBehavior/08consumerloadbalance/