成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

RocketMQ Tag在實際業務中有什么作用?

開發
本文分析了 RocketMQ 的 Tag 功能,它在消息過濾和分類處理方面提供了極大的便利,適用于各種需要高效、低延遲消息傳遞的場景。

Tag 是 RocketMQ 提供的一種消息過濾機制,允許生產者在發送消息時指定一個或多個標簽,消費者則可以根據這些標簽來選擇性地消費消息。這篇文章,我們將詳細介紹 RocketMQ 中 Tag 的原理、源碼分析以及示例。

Tag 的原理

在 RocketMQ 中,Tag 主要用于消息過濾。每個消息可以攜帶一個 Tag,消費者可以根據 Tag 來訂閱特定的消息,從而實現消息的過濾和分類處理。

(1) 消息發送階段

生產者在發送消息時,可以指定一個 Tag。這個 Tag 會被附加到消息的元數據中,并存儲在 RocketMQ 的消息存儲系統中。

(2) 消息存儲階段

消息被存儲在 RocketMQ 的 Broker 中,消息的元數據(包括 Tag)也會被存儲。

(3) 消息消費階段

消費者在訂閱消息時,可以指定要消費的 Tag。Broker 會根據消費者訂閱的 Tag,將符合條件的消息投遞給消費者。

(4) 源碼分析

為了更好的理解 Tag的原理,我們通過 RocketMQ 中Tag 相關的幾個主要代碼片段進行演示。

生產者發送消息時的代碼:

// 創建消息實例,并指定Topic和Tag
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());

// 發送消息
SendResult sendResult = producer.send(msg);

在 Message 類中,Tag 是通過構造函數傳遞的,并存儲在 Message 對象的 tags 字段中。

消費者訂閱消息時的代碼:

// 創建消費者實例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");

// 訂閱Topic,并指定Tag
consumer.subscribe("TopicTest", "TagA");

// 注冊消息監聽器
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

// 啟動消費者
consumer.start();

在 DefaultMQPushConsumer 類中,通過 subscribe 方法指定要訂閱的 Topic 和 Tag,RocketMQ 內部會根據訂閱的 Tag 進行消息過濾。

示例

下面是一個完整的示例,演示如何使用 RocketMQ 的 Tag 功能。

(1) 生產者代碼

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 創建生產者實例
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
        producer.setNamesrvAddr("localhost:9876");

        // 啟動生產者
        producer.start();

        // 發送消息
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes());
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        // 關閉生產者
        producer.shutdown();
    }
}

(2) 消費者代碼

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 創建消費者實例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        consumer.setNamesrvAddr("localhost:9876");

        // 訂閱Topic,并指定Tag
        consumer.subscribe("TopicTest", "TagA");

        // 注冊消息監聽器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 啟動消費者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

盡管 RocketMQ 的 Tag 功能在消息過濾和分類處理方面提供了極大的便利,但也有其優缺點。下面詳細分析一下:

優點

  • 簡單易用:Tag 的使用非常簡單,生產者只需在發送消息時指定 Tag,消費者在訂閱消息時指定相應的 Tag 即可。
  • 高效過濾:通過 Tag 進行消息過濾,減少了消費者處理不相關消息的開銷,從而提高了系統的性能。
  • 靈活性高:支持一個 Topic 下多個 Tag,使得消息的分類和過濾更加靈活。
  • 低延遲:Tag 過濾是在 Broker 端進行的,不會顯著增加消息傳遞的延遲。
  • 減少網絡帶寬:消費者只會接收到自己感興趣的消息,減少了不必要的網絡傳輸,從而節省了帶寬。

缺點

  • 單一維度過濾:Tag 只能提供單一維度的消息過濾,無法進行更復雜的多維度過濾。如果需要多維度過濾,需要結合其他機制(如消息屬性)來實現。
  • 有限的靈活性:Tag 的數量和種類在設計階段需要規劃好,靈活性有限。如果后期需要添加新的 Tag,可能需要重新設計和部署。
  • 不支持復雜邏輯:Tag 過濾支持的邏輯較為簡單,只能進行基于字符串匹配的過濾,無法支持復雜的過濾邏輯。
  • 管理復雜性:隨著系統規模的增大,Tag 的管理和維護可能變得復雜,尤其是在多個應用共享同一個 Topic 的情況下。
  • 潛在的性能瓶頸:雖然 Tag 過濾在大多數場景下性能良好,但在極端情況下(如大量不同 Tag 的消息和高并發消費),可能會帶來性能瓶頸。

適用場景

  • 日志和監控:不同類型的日志和監控數據可以通過 Tag 進行分類和過濾。
  • 電商系統:不同類型的訂單、商品信息等可以通過 Tag 進行分類和過濾,消費者只處理自己感興趣的消息。
  • 金融系統:不同類型的交易、通知等可以通過 Tag 進行分類和過濾,提高系統的處理效率。
  • 社交平臺:不同類型的消息(如評論、點贊、私信等)可以通過 Tag 進行分類和過濾,提供更精準的消息推送。

總結

本文分析了 RocketMQ 的 Tag 功能,它在消息過濾和分類處理方面提供了極大的便利,適用于各種需要高效、低延遲消息傳遞的場景。然而,它也有一些局限性,如單一維度過濾、管理復雜性等。

在實際應用中,需要根據具體需求和系統設計,合理使用 Tag 功能,結合其他機制來實現更復雜的消息過濾和處理。

責任編輯:趙寧寧 來源: 猿java
相關推薦

2023-06-12 07:02:53

物聯網數據決策

2019-04-28 17:39:06

大數據區塊鏈數據隱私安全

2018-11-06 10:51:07

Redis開發存儲系統

2022-03-02 14:08:35

區塊鏈供應鏈技術

2024-11-28 08:15:44

LLM大型語言模型人工智能

2010-02-25 17:22:39

WCF服務行為

2009-12-03 18:21:15

軟路由技術

2010-01-14 10:35:34

VB.NET指針

2010-01-08 18:02:33

VB.NET事件

2010-01-15 13:30:53

VB.NET Tool

2023-06-25 11:38:31

2010-01-07 16:16:03

VB.NET變量作用域

2009-11-19 15:14:43

路由器系統

2010-01-19 15:21:55

VB.NET區域性

2009-11-23 17:56:45

業務路由器

2022-03-21 08:55:53

RocketMQ客戶端過濾機制

2010-01-20 18:34:46

VB.NET Syst

2016-02-17 09:15:37

蘋果

2021-04-11 18:09:57

機器學習業務價值人工智能
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久99精品国产麻豆婷婷 | 在线观看亚洲专区 | 91人人视频在线观看 | 久久乐国产精品 | 久久午夜精品福利一区二区 | 日韩成人专区 | 欧美一区二区三区 | 日本人爽p大片免费看 | 亚洲视频网 | 日韩一二区| 欧美aⅴ片 | 国产午夜在线观看 | 国产区一区 | 国产精品久久久亚洲 | 亚洲国产精品一区二区第一页 | 久久中文免费视频 | 精品一区二区电影 | 亚洲一区二区三区 | 中文字幕1区2区3区 亚洲国产成人精品女人久久久 | 在线播放国产一区二区三区 | 国产精品爱久久久久久久 | 国产成人精品免费 | 操人网站 | 一级a爱片性色毛片免费 | 日韩成人免费 | 日韩播放| 91高清免费 | 天天综合久久 | 美女一级毛片 | 国产精品久久精品 | 亚洲视频免费在线播放 | 午夜欧美 | 欧美日韩在线一区二区 | 国产综合精品一区二区三区 | 日韩欧美一区二区三区四区 | 福利社午夜影院 | 久久久久久国产免费视网址 | 久久毛片 | 色综合天天天天做夜夜夜夜做 | 91成人精品 | 亚洲精品一区二区另类图片 |