我向《RocketMQ技術(shù)內(nèi)幕》一書的創(chuàng)始人請(qǐng)教了一個(gè)問(wèn)題
是這樣的,我在學(xué)習(xí)rocketmq的時(shí)候遇到了一個(gè)奇怪的問(wèn)題,就是同一個(gè)消費(fèi)者組內(nèi)的消費(fèi)者訂閱同一個(gè)主題topic,不同的tag的時(shí)候看到一個(gè)消息丟失的問(wèn)題
這個(gè)問(wèn)題我也是向《RocketMQ技術(shù)內(nèi)幕》一書的作者丁威大哥,然后他給我解釋了我對(duì)于這個(gè)問(wèn)題的困惑,我來(lái)給大家解釋一下
先給大家描述一下這個(gè)具體的內(nèi)容
兩個(gè)一樣的Consumer Group的Consumer訂閱同一個(gè)Topic,但是是不同的tag,Consumer1訂閱Topic的tag1,Consumer2訂閱Topic的tag2,然后分別啟動(dòng)。
這時(shí)候往Topic的tag1里發(fā)送10條數(shù)據(jù),Topic的tag2里發(fā)送10條。目測(cè)應(yīng)該是Consumer1和Consumer2分別收到對(duì)應(yīng)的10條消息。結(jié)果卻是只有Consumer2收到了消息,而且只收到了4-6條消息,不固定。
MQ底層數(shù)據(jù)結(jié)構(gòu)之精妙
RocketMQ專門按照Topic為每一個(gè)topic建立索引,方便消費(fèi)端按照topic進(jìn)行消費(fèi),其具體實(shí)現(xiàn)為消息隊(duì)列。
在RocketMQ中,ConsumeQueue的引入并不是為了提高消息寫入的性能,而是為消費(fèi)服務(wù)的。
消息消費(fèi)隊(duì)列中的每一個(gè)條目是一個(gè)定長(zhǎng)的,設(shè)計(jì)極具技巧性,其每個(gè)條目使用固定長(zhǎng)度(8字節(jié)commitlog物理偏移量、4字節(jié)消息長(zhǎng)度、8字節(jié)tag hashcode),這里不是存儲(chǔ)tag的原始字符串,而是存儲(chǔ)hashcode。
目的就是確保每個(gè)條目的長(zhǎng)度固定,可以使用訪問(wèn)類似數(shù)組下標(biāo)的方式來(lái)快速定位條目,極大的提高了ConsumeQueue文件的讀取性能,這樣根據(jù)消費(fèi)進(jìn)度去訪問(wèn)消息的方法為使用邏輯偏移量logicOffset * 20即可找到該條目的起始偏移量(consumequeue文件中的偏移量),然后讀取該偏移量后20個(gè)字節(jié)即得到了一個(gè)條目,無(wú)需遍歷consumequeue文件。
關(guān)于RocketMQ中的三個(gè)文件,來(lái)幫助RocketMQ完成如此高效率的偉業(yè),我也寫了一個(gè)文章來(lái)介紹這三個(gè)文件,大家可以看一下通過(guò)這三個(gè)文件徹底搞懂rocketmq的存儲(chǔ)原理
消息過(guò)濾實(shí)現(xiàn)機(jī)制
消費(fèi)端隊(duì)列存儲(chǔ)的是 tag 的 hashcode,眾所周知,不同的字符串得到的hashcode值可能一樣,故在服務(wù)端是無(wú)法精確對(duì)消息進(jìn)行過(guò)濾的,所以在RocketMQ中會(huì)進(jìn)行兩次消息過(guò)濾。
當(dāng)客戶端向服務(wù)端拉取消息時(shí),服務(wù)端在返回消息之前,會(huì)先根據(jù)hashcode進(jìn)行過(guò)濾,然后客戶端收到服務(wù)端的消息后,再根據(jù)消息的tag字符串進(jìn)行精確過(guò)濾。
上面的原理很好理解呀,那為什么會(huì)丟失消息呢?這其實(shí)和消息隊(duì)列負(fù)載機(jī)制有關(guān)。
在RocketMQ中使用集群模式消費(fèi)時(shí),同一個(gè)消費(fèi)組中的多個(gè)消費(fèi)者共同完成主題中的隊(duì)列的消費(fèi),即一個(gè)消費(fèi)者只會(huì)分配到其中某幾個(gè)隊(duì)列,并且同一時(shí)間,一個(gè)隊(duì)列只會(huì)分配給一個(gè)消費(fèi)者,這樣結(jié)合上面的的過(guò)濾機(jī)制,就會(huì)明顯有問(wèn)題,請(qǐng)看示例圖:
其問(wèn)題的核心關(guān)鍵是,同一個(gè)tag會(huì)分布在不同的隊(duì)列中,但消費(fèi)者C1分配到的隊(duì)列為q0,q1,q0,q1中有taga和tagb的消息,但tagb的消息會(huì)被消費(fèi)者C1過(guò)濾,但這部分消息卻不會(huì)被C2消費(fèi),造成了消息丟失。
所以在RocketMQ中,一個(gè)消費(fèi)組內(nèi)的所有消費(fèi)這,其訂閱關(guān)系必須保持一致。
我們?cè)賮?lái)回過(guò)頭看這個(gè)問(wèn)題
首先這是Broker決定的,而不是Consumer端決定的
Consumer端發(fā)心跳給Broker,Broker收到后存到consumerTable里(就是個(gè)Map),key是GroupName,value是ConsumerGroupInfo。
ConsumerGroupInfo里面是包含topic等信息的,但是問(wèn)題就出在上一步驟,key是groupName,你同GroupName的話Broker心跳最后收到的Consumer會(huì)覆蓋前者的。相當(dāng)于如下代碼:
map.put(groupName, ConsumerGroupInfo);
這樣同key,肯定產(chǎn)生了覆蓋。所以Consumer1不會(huì)收到任何消息,但是Consumer2為什么只收到了一半(不固定)消息呢?
那是因?yàn)椋耗闶羌耗J较M(fèi),它會(huì)負(fù)載均衡分配到各個(gè)節(jié)點(diǎn)去消費(fèi),所以一半消息(不固定個(gè)數(shù))跑到了Consumer1上,結(jié)果Consumer1訂閱的是tag1,所以不會(huì)任何輸出。
如果換成BROADCASTING,那絕逼后者會(huì)收到全部消息,而不是一半,因?yàn)閺V播是廣播全部Consumer。
/**
* Consumer信息
*/
public class ConsumerGroupInfo {
// 組名
private final String groupName;
// topic信息,比如topic、tag等
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
// 客戶端信息,比如clientId等
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
// PULL/PUSH
private volatile ConsumeType consumeType;
// 消費(fèi)模式:BROADCASTING/CLUSTERING
private volatile MessageModel messageModel;
// 消費(fèi)到哪了
private volatile ConsumeFromWhere consumeFromWhere;
}
/**
* 通過(guò)心跳將Consumer信息注冊(cè)到Broker端。
*/
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// consumerTable:維護(hù)所有的Consumer
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
// 如果沒(méi)有Consumer,則put到map里
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
// put到map里
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 更新Consumer信息,客戶端信息
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新訂閱Topic信息
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
從這一步可以看出消費(fèi)者信息是以groupName為key,ConsumerGroupInfo為value存到map(consumerTable)里的,那很明顯了,后者肯定會(huì)覆蓋前者的,因?yàn)閗ey是一樣的。
而后者的tag是tag2,那肯定覆蓋了前者的tag1,這部分是存到ConsumerGroupInfo的subscriptionTable里面的。
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
SubscriptionData包含了topic等信息
public class SubscriptionData implements Comparable<SubscriptionData> {
// topic
private String topic;
private String subString;
// tags
private Set<String> tagsSet = new HashSet<String>();
private Set<Integer> codeSet = new HashSet<Integer>();
}
其實(shí)到這里,這個(gè)問(wèn)題已經(jīng)算是解決了七八成了,等同于是后來(lái)的消費(fèi)者的注冊(cè)信息會(huì)把之前的消費(fèi)者的注冊(cè)信息覆蓋掉,這也就導(dǎo)致了上述出現(xiàn)的現(xiàn)象。
先啟動(dòng)訂閱了tag1的消費(fèi)者,然后啟動(dòng)了訂閱了tag2的消費(fèi)者,這時(shí)最新的心跳信息是來(lái)源于tag2的這個(gè)消費(fèi)者,這就導(dǎo)致了這個(gè)消費(fèi)者的訂閱信息會(huì)覆蓋掉之前的訂閱信息,這是因?yàn)樵赗ocketMQ中會(huì)認(rèn)為同一個(gè)消費(fèi)者組的消費(fèi)者的訂閱信息是需要保持一致的,如果不保持一致是不被允許的做法。
如果真有那種,你去新建一個(gè)topic不就好了,或者新建一個(gè)消費(fèi)者組不就好了,在使用的過(guò)程中一定要保持消費(fèi)者組的訂閱信息保持一致。
這也就導(dǎo)致了發(fā)送者發(fā)送的tag1的消息壓根不會(huì)被這個(gè)消費(fèi)者接收到,而兩個(gè)消費(fèi)者自然不會(huì)消費(fèi)這個(gè)的消息。
而為什么只收到tag2的部分消息
這是因?yàn)閞ocketMQ默認(rèn)采用的是集群消費(fèi)的模式,也就是生產(chǎn)者的消息會(huì)通過(guò)負(fù)載均衡將消息均勻的發(fā)送到多個(gè)consumerqueue隊(duì)列中,默認(rèn)是4個(gè),也就是我們啟動(dòng)的兩個(gè)消費(fèi)者會(huì)分別監(jiān)聽(tīng)兩個(gè)consumerqueue隊(duì)列
這也就意味著有大約一半的tag2的消息會(huì)被運(yùn)送到消費(fèi)者1的機(jī)器上消費(fèi),而消費(fèi)者1監(jiān)聽(tīng)的是tag1,不滿足消息的條件,所以監(jiān)聽(tīng)不到消息
topic和tag信息是如何覆蓋的
/**
* 其實(shí)很簡(jiǎn)單,就是以topic為key,SubscriptionData為value。而SubscriptionData里包含了tags信息,所以直接覆蓋掉
*/
public boolean updateSubscription(final Set<SubscriptionData> subList) {
for (SubscriptionData sub : subList) {
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
} else if (sub.getSubVersion() > old.getSubVersion()) {
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
}
本文參考文章:
https://codingw.blog.csdn.net/article/details/116299837。
https://dalin.blog.csdn.net/article/details/107241375。