成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Rocketmq-Spring:實戰與源碼解析一網打盡

開發 項目管理
這篇文章會介紹 Spring Boot 項目使用 rocketmq-spring SDK 實現消息收發的操作流程,同時筆者會從開發者的角度解讀 SDK 的設計邏輯。

RocketMQ 是大家耳熟能詳的消息隊列,開源項目 rocketmq-spring 可以幫助開發者在 Spring Boot 項目中快速整合 RocketMQ。

這篇文章會介紹 Spring Boot 項目使用 rocketmq-spring SDK 實現消息收發的操作流程,同時筆者會從開發者的角度解讀 SDK 的設計邏輯。

圖片

一、SDK 簡介

圖片

項目地址:??https://github.com/apache/rocketmq-spring??

rocketmq-spring 的本質是一個 Spring Boot starter 。

Spring Boot 基于“約定大于配置”(Convention over configuration)這一理念來快速地開發、測試、運行和部署 Spring 應用,并能通過簡單地與各種啟動器(如 spring-boot-web-starter)結合,讓應用直接以命令行的方式運行,不需再部署到獨立容器中。

Spring Boot starter 構造的啟動器使用起來非常方便,開發者只需要在 pom.xml 引入 starter 的依賴定義,在配置文件中編寫約定的配置即可。

下面我們看下 rocketmq-spring-boot-starter 的配置:

1、引入依賴

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>

2、約定配置

圖片

接下來,我們分別按照生產者和消費者的順序,詳細的講解消息收發的操作過程。

二、生產者

首先我們添加依賴后,進行如下三個步驟:

1、配置文件中配置如下

rocketmq:
name-server: 127.0.0.1:9876
producer:
group: platform-sms-server-group
# access-key: myaccesskey
# secret-key: mysecretkey
topic: sms-common-topic

生產者配置非常簡單,主要配置名字服務地址和生產者組。

2、需要發送消息的類中注入 RcoketMQTemplate

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Value("${rocketmq.topic}")
private String smsTopic;

3、發送消息,消息體可以是自定義對象,也可以是 Message 對象

rocketMQTemplate 類包含多鐘發送消息的方法:

  • 同步發送 syncSend
  • 異步發送 asyncSend
  • 順序發送 syncSendOrderly
  • oneway發送 sendOneWay

下面的代碼展示如何同步發送消息。

String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult =
rocketMQTemplate.syncSend(
destination,
MessageBuilder.withPayload(messageContent).
setHeader(MessageConst.PROPERTY_KEYS, uniqueId).
build()
);
if (sendResult != null) {
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
// send message success ,do something
}
}

syncSend 方法的第一個參數是發送的目標,格式是:topic + ":" + tags ,

第二個參數是:spring-message 規范的 message 對象 ,而 MessageBuilder 是一個工具類,方法鏈式調用創建消息對象。

三、消費者

1、配置文件中配置如下

rocketmq:
name-server: 127.0.0.1:9876
consumer1:
group: platform-sms-worker-common-group
topic: sms-common-topic

2、實現消息監聽器

@Component
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer1.group}", //消費組
topic = "${rocketmq.consumer1.topic}" //主題
)
public class SmsMessageCommonConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
System.out.println("普通短信:" + message);
}
}

消費者實現類也可以實現 RocketMQListener<MessageExt>, 在 onMessage 方法里通過 RocketMQ 原生消息對象 MessageExt 獲取更詳細的消息數據 。

public void onMessage(MessageExt message) {
try {
String body = new String(message.getBody(), "UTF-8");
logger.info("普通短信:" + message);
} catch (Exception e) {
logger.error("common onMessage error:", e);
}
}

四、源碼概覽

圖片

最新源碼中,我們可以看到源碼中包含四個模塊:

1、rocketmq-spring-boot-parent

該模塊是父模塊,定義項目所有依賴的 jar 包。

2、rocketmq-spring-boot

核心模塊,實現了 starter 的核心邏輯。

3、rocketmq-spring-boot-starter

SDK 模塊,簡單封裝,外部項目引用。

4、rocketmq-spring-boot-samples

示例代碼模塊。這個模塊非常重要,當用戶使用 SDK 時,可以參考示例快速開發。

五、starter 實現

我們重點分析下 rocketmq-spring-boot 模塊的核心源碼:

圖片


spring-boot-starter 實現需要包含如下三個部分:

1、定義 Spring 自身的依賴包和 RocketMQ 的依賴包 ;

2、定義spring.factories 文件

在 resources 包下創建 META-INF 目錄后,新建 spring.factories 文件,并在文件中定義自動加載類,文件內容是:

org.springframework.boot.autoconfigure.EnableAutoCnotallow=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

spring boot 會根據文件中配置的自動化配置類來自動初始化相關的 Bean、Component 或 Service。

3、實現自動加載類

在 RocketMQAutoConfiguration 類的具體實現中,我們重點分析下生產者和消費者是如何分別啟動的。

▍生產者發送模板類:RocketMQTemplate

RocketMQAutoConfiguration 類定義了兩個默認的 Bean :

圖片

圖片

首先SpringBoot項目中配置文件中的配置值會根據屬性條件綁定到 RocketMQProperties 對象 中,然后使用 RocketMQ 的原生 API 分別創建生產者 Bean 和拉取消費者 Bean , 分別將兩個 bean 設置到 RocketMQTemplate 對象中。

兩個重點需要強調:

  • 發送消息時,將 spring-message 規范下的消息對象封裝成 RocketMQ 消息對象

圖片

  • 默認拉取消費者 litePullConsumer 。拉取消費者一般用于大數據批量處理場景 。

圖片

  • 原生使用方式

RocketMQTemplate 類封裝了拉取消費者的receive方法,以方便開發者使用。

圖片

▍自定義消費者類

下圖是并發消費者的例子:

圖片

消費者示例代碼

那么 rocketmq-spring 是如何自動啟動消費者呢 ?

圖片

spring 容器首先注冊了消息監聽器后置處理器,然后調用 ListenerContainerConfiguration 類的 registerContainer 方法 。

對比并發消費者的例子,我們可以看到:DefaultRocketMQListenerContainer 是對 DefaultMQPushConsumer 消費邏輯的封裝。

圖片

封裝消費消息的邏輯,同時滿足 RocketMQListener 泛化接口支持不同參數,比如 String 、MessageExt 、自定義對象 。

首先DefaultRocketMQListenerContainer初始化之后, 獲取 onMessage 方法的參數類型 。

圖片

然后消費者調用 consumeMessage 處理消息時,封裝了一個 handleMessage 方法 ,將原生 RocketMQ 消息對象 MessageExt 轉換成 onMessage 方法定義的參數對象,然后調用 rocketMQListener 的 onMessage  方法。

圖片

mnjh9

上圖右側標紅的代碼也就是該方法的精髓:

rocketMQListener.onMessage(doConvertMessage(messageExt));

六、寫到最后

開源項目 rocketmq-spring 有很多值得學習的地方 ,我們可以從如下四個層面逐層進階:

1、學會如何使用 :參考 rocketmq-spring-boot-samples 模塊的示例代碼,學會如何發送和接收消息,快速編碼;

2、模塊設計:學習項目的模塊分層 (父模塊、SDK 模塊、核心實現模塊、示例代碼模塊);

3、starter 設計思路 :定義自動配置文件 spring.factories 、設計配置屬性類 、在 RocketMQ client 的基礎上實現優雅的封裝、深入理解 RocketMQ 源碼等;

4、舉一反三:當我們理解了 rocketmq-spring 的源碼,我們可以嘗試模仿該項目寫一個簡單的 spring boot starter。

責任編輯:武曉燕 來源: 勇哥java實戰分享
相關推薦

2024-04-26 00:25:52

Rust語法生命周期

2024-04-07 08:41:34

2021-08-05 06:54:05

流程控制default

2024-02-27 10:11:36

前端CSS@規則

2021-10-11 07:55:42

瀏覽器語法Webpack

2013-08-02 10:52:10

Android UI控件

2024-06-12 00:00:05

2024-08-26 10:01:50

2010-08-25 01:59:00

2011-12-02 09:22:23

網絡管理NetQos

2013-10-16 14:18:02

工具圖像處理

2023-04-06 09:08:41

BPM流程引擎

2021-10-26 16:15:26

Spring 事務隔離性

2022-09-15 10:47:19

數據庫事務工作單元

2024-02-23 08:14:01

項目開發Spring

2019-07-24 15:30:00

SQL注入數據庫

2021-05-20 11:17:49

加密貨幣區塊鏈印度

2021-10-29 09:32:33

springboot 靜態變量項目

2023-09-06 18:37:45

CSS選擇器符號

2020-02-21 08:45:45

PythonWeb開發框架
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久久久久亚洲精品 | 亚洲国产成人av好男人在线观看 | 欧美久久久久久 | 亚洲视频一区在线观看 | 欧美一区二区在线 | 一区二区三区免费在线观看 | 中文字幕在线一区 | 青娱乐一区二区 | 91偷拍精品一区二区三区 | 日韩乱码一二三 | 国产午夜精品久久久久免费视高清 | 在线观看成人免费视频 | 久久精品一区二区三区四区 | 在线视频国产一区 | 精品蜜桃一区二区三区 | 久草免费福利 | 欧美1区2区 | 91精品国产综合久久久密闭 | 国产欧美精品区一区二区三区 | 精品国产综合 | 中文在线a在线 | 日韩中文字幕 | www.精品国产 | 国产成人精品久久 | 亚洲精品欧美一区二区三区 | 日韩欧美高清 | 久久精品一区二区 | 久久中文视频 | 亚洲国产欧美在线 | 亚洲成人av| 美女视频网站久久 | 亚洲欧洲色视频 | 久久蜜桃资源一区二区老牛 | 国产黄色一级电影 | 国产视频福利在线观看 | 精品二区 | 久草欧美视频 | 久久日韩粉嫩一区二区三区 | 天天久久| 欧美情趣视频 | 日本精品网站 |