成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

一文帶你理解 RocketMQ 廣播模式實(shí)現(xiàn)機(jī)制

開發(fā) 架構(gòu)
本文主要講解了 RocketMQ 廣播消息的實(shí)現(xiàn)機(jī)制,理解廣播消息。

大家好,我是君哥。今天聊聊 RocketMQ 的廣播消息實(shí)現(xiàn)機(jī)制。

RocketMQ 有兩種消費(fèi)模式,集群模式和廣播模式。

集群模式是指 RocketMQ 中的一條消息只能被同一個消費(fèi)者組中的一個消費(fèi)者消費(fèi)。如下圖,Producer 向 TopicTest 這個 Topic 并發(fā)寫入 3 條新消息,分別被分配到了 MessageQueue1~MessageQueue3 這 3 個隊(duì)列,然后 Group 中的三個 Consumer 分別消費(fèi)了一條消息:

圖片

廣播模式是  RocketMQ 中的消息會被消費(fèi)組中的每個消費(fèi)者都消費(fèi)一次,如下圖:

圖片

使用 RocketMQ 的廣播模式時,需要在消費(fèi)端進(jìn)行定義,下面是一段官方示例:

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}

從代碼中可以看到,在定義 Consumer 時,通過 messageModel 這個屬性指定消費(fèi)模式,這里指定為 BROADCASTING,也就啟動了廣播模式的消費(fèi)者。

1、消費(fèi)者啟動

以 RocketMQ 推模式為例,看一下消費(fèi)者調(diào)用關(guān)系類圖:

圖片

DefaultMQPushConsumer 作為啟動入口類,它的 start 方法調(diào)用了 DefaultMQPushConsumerImpl 類的 start 方法,下面重點(diǎn)看一下這個方法。

(1)拷貝訂閱關(guān)系

start 方法中調(diào)用了 copySubscription 方法,代碼如下:

private void copySubscription() throws MQClientException {
try {
//拷貝訂閱關(guān)系
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}

這里的代碼有一點(diǎn)需要注意:集群模式會創(chuàng)建一個重試 Topic 的訂閱關(guān)系,而廣播模式是不會創(chuàng)建這個訂閱關(guān)系的。也就是說廣播模式不考慮重試。

(2)初始化偏移量

下面是初始化 offset 的代碼:

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}

從上面的代碼可以看到,廣播模式使用了 LocalFileOffsetStore,也就是說偏移量保存在客戶端本地,除了在內(nèi)存中會保存,在本地文件中也會保存。

2、消息拉取

ConsumeMessageService 是真正拉取消息的地方,消費(fèi)者初始化時會初始化 ConsumeMessageService,并且這里會區(qū)分并發(fā)消息還是順序消息。

(1)順序消息

在集群模式下,需要獲取到 processQueue 的鎖才會拉取消息,而在廣播模式下,不用獲取鎖,直接就可以拉取消息。判斷邏輯如下:

//ConsumeMessageOrderlyService.ConsumeRequest
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
}
}

這里有個疑問,對于順序消息,獲取鎖是必須的,這樣才能保證一個 processQueue 只能由一個線程進(jìn)行處理,從而保證消費(fèi)的順序性。那對于廣播模式,為什么不用獲取 processQueue 的鎖呢?難道廣播模式不支持順序消息?

(2)并發(fā)消息

對于并發(fā)消息,廣播模式不同的是,對消費(fèi)結(jié)果的處理。集群模式消費(fèi)失敗后需要把消息發(fā)送回 Broker 等待再次被拉取,而廣播模式則不需要重試。代碼如下:

//ConsumeMessageConcurrentlyService.rocessConsumeResult
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}

這再次說明,廣播模式是不支持消息重試的。

3、重平衡

在消費(fèi)者啟動過程中,會調(diào)用 RebalanceService 的 start 方法,進(jìn)行重平衡。從重平衡的代碼中可以看到,廣播模式消費(fèi)者會消費(fèi)所有 MessageQueue,而集群模式下會根據(jù)負(fù)載均衡策略選擇其中幾個 MessageQueue。代碼如下:

private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
//省略部分邏輯
} else {
}
break;
}
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
//省略部分邏輯
if (mqSet != null && cidAll != null) {
//省略部分邏輯
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
//省略部分邏輯
}
break;
}
default:
break;
}
}

上面 updateProcessQueueTableInRebalance 這個方法調(diào)用前,要獲取到需要消費(fèi)的 MessageQueue 集合。廣播模式下,直接取了訂閱的 Topic 下的所有集合元素,而集群模式下,則需要通過負(fù)責(zé)均衡獲取當(dāng)前消費(fèi)者自己要消費(fèi)的 MessageQueue 集合。

4、總結(jié)

本文主要講解了 RocketMQ 廣播消息的實(shí)現(xiàn)機(jī)制,理解廣播消息,要把握下面幾點(diǎn):

1.偏移量保存在消費(fèi)者本地內(nèi)存和文件中。

2.廣播消息不支持重試。

3.從源碼上看,廣播模式并不能支持順序消息。

4.廣播模式消費(fèi)者訂閱了 Topic 下的所有 MessageQueue,不會重平衡。

責(zé)任編輯:姜華 來源: 君哥聊技術(shù)
相關(guān)推薦

2019-10-11 08:41:35

JVM虛擬機(jī)語言

2022-06-27 11:04:24

RocketMQ順序消息

2021-09-08 17:42:45

JVM內(nèi)存模型

2023-07-17 10:45:03

向量數(shù)據(jù)庫NumPy

2020-03-18 13:40:03

Spring事數(shù)據(jù)庫代碼

2021-09-02 12:07:48

Swift 監(jiān)聽系統(tǒng)Promise

2024-10-16 10:11:52

2020-11-17 09:32:57

設(shè)計(jì)模式責(zé)任鏈

2023-07-31 08:18:50

Docker參數(shù)容器

2021-05-29 10:11:00

Kafa數(shù)據(jù)業(yè)務(wù)

2023-11-06 08:16:19

APM系統(tǒng)運(yùn)維

2022-11-11 19:09:13

架構(gòu)

2022-03-18 13:58:00

RocketMQ消息隊(duì)列

2022-06-13 11:05:35

RocketMQ消費(fèi)者線程

2023-12-26 08:08:02

Spring事務(wù)MySQL

2022-05-11 07:38:45

SpringWebFlux

2022-12-20 07:39:46

2023-11-20 08:18:49

Netty服務(wù)器

2023-12-21 17:11:21

Containerd管理工具命令行

2020-05-14 13:39:19

Java 垃圾回收機(jī)制
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 国产精品视频一 | 国产一二区免费视频 | 日本aⅴ中文字幕 | 在线国产中文字幕 | 久久久久国产精品一区三寸 | 亚洲国产精品成人无久久精品 | 天天摸天天干 | 韩日av在线 | 国产精品福利在线观看 | 四虎影院在线观看免费视频 | 国产精品中文字幕在线观看 | 精品电影 | 国产亚洲欧美日韩精品一区二区三区 | 国产精品电影网 | a级片在线观看 | 免费黄色片视频 | 91精品国模一区二区三区 | 国产精品99久久久久久久久久久久 | 福利网址 | 超碰在线97国产 | 亚洲人在线播放 | 欧美激情一区 | 懂色中文一区二区三区在线视频 | 亚洲成人免费 | 我要看免费一级毛片 | 天堂网中文字幕在线观看 | 青青青伊人 | 欧美影院久久 | 久久精品国产a三级三级三级 | 国产三级电影网站 | 91在线免费视频 | 亚洲精品观看 | 久久精品一级 | 国产这里只有精品 | 久久国产一区 | 最近最新中文字幕 | 亚洲精彩视频在线观看 | 中文字幕日韩在线 | 免费高清av | 国产精品视频999 | 精品九九久久 |