成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

消息隊列 RocketMQ 入門指南

開發
本文我們將從RocketMQ的基本概念出發,逐步講解其核心功能,并通過簡單的實踐示例,幫助你快速上手。

在當今的分布式系統中,消息隊列(Message Queue)作為解耦、異步通信和流量削峰的重要組件,扮演著不可或缺的角色。而RocketMQ,作為阿里巴巴開源的一款高性能、高可靠的分布式消息中間件,憑借其強大的功能和穩定的性能,成為了眾多開發者和企業的首選。

無論你是剛剛接觸消息隊列的新手,還是希望深入了解RocketMQ的開發者,這篇文章都將為你提供一個清晰的入門指南。我們將從RocketMQ的基本概念出發,逐步講解其核心功能,并通過簡單的實踐示例,幫助你快速上手。

一、詳解RocketMQ基礎概念

1. 為什么要用RocketMQ

相比于市場上的各種消息隊列,它有如下優勢:

  • 性能好。
  • 穩定可靠。
  • 中文社區活躍。

當然缺點也是有那么一些些的,兼容性確實不太行。

2. RocketMQ優缺點是什么

優點:

  • 單機吞吐量為10w級。
  • 可用性很高,支持分布式架構。
  • 擴展性好。
  • 支持10億級別的消息堆積,而且不會因為堆積導致性能下降。
  • 源碼是用Java寫的,對于Java程序員來說非常方便改造。
  • 參數優化配置,消息基本可以做到0丟失。
  • 使用于對可靠性要求高的金融行業。

缺點:

  • 目前只支持Java、C++客戶端,而且C++還不算完善。
  • 沒有在MQ核心實現JMS相關接口,有些遷移改造就比較麻煩了。

3. 消息隊列使用場景

解耦: 例如用戶完成下單除了必要的庫存扣減和訂單狀態更新外,我們還需要處理一些積分系統、推送系統的無關緊要的業務處理,如果全部順序執行,等待時間就會變得很漫長,所以我們需要借助MQ將邊角業務從業務模塊中解耦開來。

  • 異步: 這點不必多說,上述的解耦方案就會使得積分系統、促銷系統、推送系統任務異步執行。
  • 削峰: 可以理解為一個漏斗,例如我們的某個服務只能抗住10wQPS,可是當前請求卻達到20w的QPS,那么我們就可以將請求全部先扔到MQ中,讓服務慢慢消化處理。

二、RocketMQ基礎安裝與實踐

1. 安裝并啟動RocketMQ

在編寫業務代碼之前,我們必須完成一下RocketMQ的部署,首先我們自然要下載一下RocketMQ,下載地址如下,筆者下載的是rocketmq-all-4.8.0-bin-release這個版本:https://rocketmq.apache.org/download/

完成完成后,我們將其解壓到自定義的路徑,鍵入sudo vim /etc/profile配置MQ環境變量,完成后鍵入source /etc/profile使之生效,對應的配置內容如下所示:

export ROCKETMQ_HOME=/home/sharkchili/rocketmq-all-4.8.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin

需要注意的是筆者本次采用WSL的Ubuntu子系統時啟動時腳本會拋出runserver.sh: 70: [[: Exec format error錯誤,嘗試格式化和指令配置后都沒有很好的解決,于是循著報錯找到runserver.sh這行對應的腳本內容,該括弧本質上就是基于JDK內容配置對應的GC算法:

以筆者為里系統是jdk8,所以直接去掉判斷用走JDK8的配置即可:

choose_gc_options()
{

      JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFractinotallow=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
      JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
      JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
 
}

完成后鍵入./mqnamesrv &將MQ啟動,如果彈窗輸出下面這條結果,則說明mq的NameServer啟動成功。

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

然后我們再鍵入./mqbroker -n 127.0.0.1:9876啟動broker,需要注意的是默認情況下broker占用堆內存差不多是4g,所以讀者本地部署時建議修改一下runbroker.sh的堆內存,如下圖所示:

若彈窗輸出下面所示的文字,則說明broker啟動成功,自此mq就在windows環境部署成功了。我們就可以開始編碼工作了。

The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

2. 訂單系統改造

本次的示例是關于訂單系統改造,用戶下單完成后,服務器需要進行庫存扣減、訂單狀態更新、以及優惠券、積分等邊邊角角的業務,如果順序執行這些邏輯+網絡開銷,接口耗時對于用戶體驗是非常不友好的。

所以我們在將非核心業務邏輯從接口串行調用中抽出,下單業務只需關注完成我們庫存扣減、訂單狀態更新就行了,剩下的業務用MQ發個消息給積分系統、促銷系統告知他們自己處理一下就行了:

首先我們引入MQ依賴腳手架:

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

同時這里我們也給出配置信息:

# mq地址端口
rocketmq.name-server=127.0.0.1:9876
# 生產者配置
rocketmq.producer.isOnOff=on
# 發送同一類消息設置為同一個group,保證唯一
rocketmq.producer.group=rocketmq-group
rocketmq.producer.groupName=rocketmq-group
# namesrv地址
rocketmq.producer.namesrvAddr=127.0.0.1:9876
# 設置消息最大長度 4M
rocketmq.producer.maxMessageSize=4096
# 消息發送超時時間
rocketmq.producer.sendMsgTimeout=3000
# 消息發送失敗重試次數
rocketmq.producer.retryTimesWhenSendFailed=2

隨后我們設置監聽處理關于訂單創建的topic消息:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("收到訂單,訂單信息:[{}],進行積分系統、促銷系統、推送系統業務處理.....", JSONUtil.toJsonStr(order));
    }
}

完成后我們基于CommandLineRunner 測試一下消息發送:

@Component
@Slf4j
public class MQTest implements CommandLineRunner {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    @Override
    public void run(String... args) throws Exception {

        Order order = new Order();
        order.setOrderNo("20221217001002003");
        order.setUserId(1);
        order.setPrice(500.00);

        rocketMQTemplate.asyncSend("ORDER_ADD", MessageBuilder.withPayload(order).build(), getDefaultSendCallBack());

    }

    /**
     * 消息處理默認回調
     * @return
     */
    private SendCallback getDefaultSendCallBack() {
        return new SendCallback() {

            @Override
            public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
                log.info("MQ消息發送成功。result:{}", JSONUtil.toJsonStr(sendResult));
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("MQ消息發送失敗,失敗原因:{}" + throwable.getMessage());
            }
        };
    }

}

日志如下,可以看到消息消費成功了:

2025-02-11 10:03:14.577  INFO 14420 --- [MessageThread_1] com.sharkChili.config.OrderMsgListener   : 收到訂單,訂單信息:[{"userId":1,"orderNo":"20221217001002003","price":500}],進行積分系統、促銷系統、推送系統業務處理.....
2025-02-11 10:03:14.577  INFO 14420 --- [ublicExecutor_2] com.sharkChili.runner.MQTest             : MQ消息發送成功。result:{"sendStatus":"SEND_OK","msgId":"AC1E1001385418B4AAC235A7E0190000","messageQueue":{"topic":"ORDER_ADD","brokerName":"DESKTOP-DC9PSUS","queueId":2},"queueOffset":1,"offsetMsgId":"AC15733800002A9F0000000000000558","regionId":"DefaultRegion","traceOn":true}

3. 如何實現消息過濾

設置tag消息的方式常見的是有兩種,一種是基于tag標簽過濾,如下代碼所示,我們希望發送訂單業務即ORDER_ADD這個主題下tag標簽為tagA的用戶收到消息,那么我們就可以通過ORDER_ADD:tagA針對topic進行更進一步劃分:

//創建訂單消息
        Order order = new Order();
        order.setUserId(1);
        order.setOrderNo(UUID.randomUUID().toString());
        order.setPrice(500);
        //生成消息
        Message<Order> message = MessageBuilder.withPayload(order)
                .build();
        //同步發送
        rocketMQTemplate.syncSend("ORDER_ADD:tagA", message);

對應的監聽者通過selectorExpression 指定標簽即可:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
        topic = "ORDER_ADD",
        selectorExpression = "tagA"http://訂閱tagA的消息
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("收到訂單,訂單信息:[{}],進行積分系統、促銷系統、推送系統業務處理.....", JSONUtil.toJsonStr(order));
    }
}

還有一種就是基于SQL過濾,因為表達式靈活,相對更強大一些,例如我們的消費者只處理userId為10以內的數據,那么消費者的監聽就可以按照如下姿勢進行配置:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
        topic = "ORDER_ADD",
        selectorType = SelectorType.SQL92,//指令類型為sql表達式
        selectorExpression = "userId<10"http://過濾出id小于10的用戶的訂單
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("收到訂單,訂單信息:[{}],進行積分系統、促銷系統、推送系統業務處理.....", JSONUtil.toJsonStr(order));
    }
}

發送消息時,通過headers 指定本消息條件并通過convertAndSend發送即可:

//創建訂單消息
        Order order = new Order();
        order.setUserId(1);
        order.setOrderNo(UUID.randomUUID().toString());
        order.setPrice(500);
        //通過header攜帶條件告知當前userId為1
        Map<String, Object> headers = new HashMap<>();
        headers.put("userId", 1);
        //生成消息
        Message<Order> message = MessageBuilder.withPayload(order)
                .build();
        //發送
        rocketMQTemplate.convertAndSend("ORDER_ADD", message, headers);

需要注意的是默認情況下,MQ是不支持SQL表達過濾,我們需要到conf目錄下的broker.conf文件,添加enablePropertyFilter=true,然后鍵入如下指令降broker啟動:

./mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true -c ../conf/broker.conf

4. 如何提交延時消息

延遲消息即需要消費者過一段時間后才能消費的消息,例如我們現在有個消息要求消費者10s后才能消費,那么我們就可以使用延遲消息,如下代碼所示:

// 創建延遲消息
        Message<String> rocketMessage = MessageBuilder.withPayload("this is delay msg").build();
        // 發送延遲消息,timeout設置為10000即10s,delayLevel表示延遲等級,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,所以 3為10s
        rocketMQTemplate.syncSend("delay_topic", rocketMessage, 10000,3);
        log.info("延遲消息發送完成");

消費者代碼:

@Component
@RocketMQMessageListener(consumerGroup = "delay_msg_group", topic = "delay_topic")
@Slf4j
public class DelayMsgListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        log.info("收到延遲消息,消息內容:{}", JSONUtil.toJsonStr(msg));
    }
}

輸出結果,可以看到確實是10s后消費者采納看到消息并消費:

2025-02-11 10:56:58.300  INFO 18568 --- [           main] com.sharkChili.runner.MQTest             : 延遲消息發送完成
2025-02-11 10:57:08.307  INFO 18568 --- [MessageThread_1] com.sharkChili.config.DelayMsgListener   : 收到延遲消息,消息內容:this is delay msg
責任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關推薦

2022-09-21 21:50:18

Dapr消息隊列

2017-07-11 15:26:57

LocalMQ RocketMQ高性能

2024-10-08 08:52:59

2017-10-11 15:08:28

消息隊列常見

2024-10-29 08:34:27

RocketMQ消息類型事務消息

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2025-06-04 01:35:00

RocketMQ異步消息

2022-12-22 10:03:18

消息集成

2023-09-18 08:27:20

RabbitMQRocketMQKafka

2023-07-18 09:03:01

RocketMQ場景消息

2022-06-02 08:21:07

RocketMQ消息中間件

2023-07-17 08:34:03

RocketMQ消息初體驗

2023-08-17 10:20:18

RabbitMQ系統

2023-11-20 09:33:43

開發指南

2023-12-21 08:01:41

RocketMQ消息堆積

2022-03-31 08:26:44

RocketMQ消息排查

2024-09-13 08:49:45

2021-04-07 08:43:09

SpringBootRocketMQ開發技術

2020-11-13 16:40:05

RocketMQ延遲消息架構
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 中文av字幕| 狠狠爱综合 | 日韩一区二区av | 精品国产伦一区二区三区观看说明 | 麻豆国产一区二区三区四区 | 黄网站在线播放 | 日韩av在线一区二区三区 | 国产午夜精品久久久久免费视高清 | 国产精品永久久久久久久www | 久久伊人精品 | 国产精品国产a | 亚洲成在线观看 | 成人国内精品久久久久一区 | 亚洲免费久久久 | 国产精品免费在线 | 精品国产一区二区三区在线观看 | 日本高清视频在线播放 | 亚洲成人一区二区三区 | 亚洲精品黄色 | 高清视频一区二区三区 | 91一区二区 | 午夜视频在线播放 | 福利二区 | 日韩一级免费观看 | 亚洲在线一区二区 | 免费能直接在线观看黄的视频 | 亚洲国产网址 | 亚洲精品一区二区三区 | 国产一区二区三区四区hd | 91久久国产综合久久 | 国产成人免费视频网站高清观看视频 | 久久久久久精 | 国产在线小视频 | 成人精品鲁一区一区二区 | 成人做爰www免费看视频网站 | 国产高清精品一区二区三区 | 免费看av大片 | 男女精品久久 | 国产精品久久久久久吹潮 | 成人3d动漫一区二区三区91 | 一区二区三区四区在线视频 |