深入理解 RocketMQ 廣播消費
這篇文章我們聊聊廣播消費,因為廣播消費在某些場景下真的有奇效。筆者會從基礎概念、實現機制、實戰案例三個方面一一展開,希望能幫助到大家。
1 基礎概念
RocketMQ 支持兩種消息模式:集群消費( Clustering )和廣播消費( Broadcasting )。
集群消費:
同一 Topic 下的一條消息只會被同一消費組中的一個消費者消費。也就是說,消息被負載均衡到了同一個消費組的多個消費者實例上。
圖片
廣播消費:
當使用廣播消費模式時,每條消息推送給集群內所有的消費者,保證消息至少被每個消費者消費一次。
圖片
2 源碼解析
首先下圖展示了廣播消費的代碼示例。
public class PushConsumer {
public static final String CONSUMER_GROUP = "myconsumerGroup";
public static final String DEFAULT_NAMESRVADDR = "localhost:9876";
public static final String TOPIC = "mytest";
public static final String SUB_EXPRESSION = "TagA || TagC || TagD";
public static void main(String[] args) throws InterruptedException, MQClientException {
// 定義 DefaultPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
// 定義名字服務地址
consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// 定義消費讀取位點
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 定義消費模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 訂閱主題信息
consumer.subscribe(TOPIC, SUB_EXPRESSION);
// 訂閱消息監聽器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
for (MessageExt messageExt : msgs) {
System.out.println(new String(messageExt.getBody()));
}
}catch (Exception e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
和集群消費不同的點在于下面的代碼:
consumer.setMessageModel(MessageModel.BROADCASTING);
接下來,我們從源碼角度來看看廣播消費和集群消費有哪些差異點 ?
首先進入 DefaultMQPushConsumerImpl 類的 start 方法 , 分析啟動流程中他們兩者的差異點:
圖片
▍ 差異點1:拷貝訂閱關系
private void copySubscription() throws MQClientException {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
// 注意下面的代碼 , 集群模式下自動訂閱重試主題
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
在集群模式下,會自動訂閱重試隊列,而廣播模式下,并沒有這段代碼。也就是說廣播模式下,不支持消息重試。
▍ 差異點2:本地進度存儲
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
我們可以看到消費進度存儲的對象是:LocalFileOffsetStore , 進度文件存儲在如下的主目錄 /{用戶主目錄}/.rocketmq_offsets。
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
進度文件是 /mqClientId/{consumerGroupName}/offsets.json 。
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator + this.groupName + File.separator + "offsets.json";
筆者創建了一個主題 mytest , 包含4個隊列,進度文件內容如下:
圖片
消費者啟動后,我們可以將整個流程簡化如下圖,并繼續整理差異點:
圖片
▍ 差異點3:負載均衡消費該主題的所有 MessageQueue
進入負載均衡抽象類 RebalanceImpl 的rebalanceByTopic方法 。
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
// 省略代碼
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
// 省略代碼
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
// 省略日志打印代碼
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
//省略代碼
}
break;
}
default:
break;
}
}
從上面代碼我們可以看到消息模式為廣播消費模式時,消費者會消費該主題下所有的隊列,這一點也可以從本地的進度文件 offsets.json 得到印證。
▍ 差異點4:不支持順序消息
我們知道消費消息順序服務會向 Borker 申請鎖 。消費者根據分配的隊列 messageQueue ,向 Borker 申請鎖 ,如果申請成功,則會拉取消息,如果失敗,則定時任務每隔 20 秒會重新嘗試。
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
} catch (Throwable e) {
log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
}
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
但是從上面的代碼,我們發現只有在集群消費的時候才會定時申請鎖,這樣就會導致廣播消費時,無法為負載均衡的隊列申請鎖,導致拉取消息服務一直無法獲取消息數據。
筆者修改消費例子,在消息模式為廣播模式的場景下,將消費模式從并發消費修改為順序消費。
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
try {
for (MessageExt messageExt : msgs) {
System.out.println(new String(messageExt.getBody()));
}
}catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
});
圖片
通過 IDEA DEBUG 圖,筆者觀察到因為負載均衡后的隊列無法獲取到鎖,所以拉取消息的線程無法發起拉取消息請求到 Broker , 也就不會走到消費消息的流程。
因此,廣播消費模式并不支持順序消息。
▍ 差異點5:并發消費消費失敗時,沒有重試
進入并發消息消費類ConsumeMessageConcurrentlyService 的處理消費結果方法 processConsumeResult。
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
消費消息失敗后,集群消費時,消費者實例會通過 CONSUMER_SEND_MSG_BACK 請求,將失敗消息發回到 Broker 端。
但在廣播模式下,僅僅是打印了消息信息。因此,廣播模式下,并沒有消息重試。
3 實戰案例
廣播消費主要用于兩種場景:消息推送和緩存同步。
3.1 消息推送
筆者第一次接觸廣播消費的業務場景是神州專車司機端的消息推送。
用戶下單之后,訂單系統生成專車訂單,派單系統會根據相關算法將訂單派給某司機,司機端就會收到派單推送。
圖片
推送服務是一個 TCP 服務(自定義協議),同時也是一個消費者服務,消息模式是廣播消費。
司機打開司機端 APP 后,APP 會通過負載均衡和推送服務創建長連接,推送服務會保存 TCP 連接引用 (比如司機編號和 TCP channel 的引用)。
派單服務是生產者,將派單數據發送到 MetaQ , 每個推送服務都會消費到該消息,推送服務判斷本地內存中是否存在該司機的 TCP channel , 若存在,則通過 TCP 連接將數據推送給司機端。
肯定有同學會問:假如網絡原因,推送失敗怎么處理 ?有兩個要點:
- 司機端 APP 定時主動拉取派單信息;
- 當推送服務沒有收到司機端的 ACK 時 ,也會一定時限內再次推送,達到閾值后,不再推送。
3.2 緩存同步
高并發場景下,很多應用使用本地緩存,提升系統性能 。
本地緩存可以是 HashMap 、ConcurrentHashMap ,也可以是緩存框架 Guava Cache 或者 Caffeine cache 。
圖片
如上圖,應用A啟動后,作為一個 RocketMQ 消費者,消息模式設置為廣播消費。為了提升接口性能,每個應用節點都會將字典表加載到本地緩存里。
當字典表數據變更時,可以通過業務系統發送一條消息到 RocketMQ ,每個應用節點都會消費消息,刷新本地緩存。
4 總結
集群消費和廣播消費模式下,各功能的支持情況如下:
功能 | 集群消費 | 廣播消費 |
順序消息 | 支持 | 不支持 |
重置消費位點 | 支持 | 不支持 |
消息重試 | 支持 | 不支持 |
消費進度 | 服務端維護 | 客戶端維護 |
廣播消費主要用于兩種場景:消息推送和緩存同步。
參考資料 :