Spring Cloud Stream使用詳解及部分重點(diǎn)源碼分析
環(huán)境:Springboot2.3.12.RELEASE + Spring Cloud Hoxton.SR12 + RabbitMQ3.8.12
簡(jiǎn)介
Spring Cloud Stream是一個(gè)框架,用于構(gòu)建與MQ連接的高度可伸縮的事件驅(qū)動(dòng)微服務(wù)。其目的是為了簡(jiǎn)化消息在 Spring Cloud 應(yīng)用程序中的開發(fā)。屏蔽了各種MQ之間的差異,使得在更換MQ的時(shí)候不需要修改代碼。
Spring Cloud Stream支持多種綁定器實(shí)現(xiàn),如下:
- RabbitMQ。
- Apache Kafka。
- Kafka Streams。
- Amazon Kinesis。
- Google PubSub (partner maintained)。
- Solace PubSub+ (partner maintained)。
- Azure Event Hubs (partner maintained)。
- AWS SQS (partner maintained)。
- AWS SNS (partner maintained)。
- Apache RocketMQ (partner maintained)。
詳細(xì)查看官方文檔,對(duì)應(yīng)每一個(gè)MQ都有一個(gè)Github地址。
Spring Cloud Stream的核心構(gòu)建塊是:
- 目標(biāo)綁定器(Destination Binders):負(fù)責(zé)與MQ集成的組件。
- 目標(biāo)綁定(Destination Bindings):MQ中間件與最終用戶提供的應(yīng)用程序代碼(生產(chǎn)者/消費(fèi)者)之間的橋梁。
- 消息(Message):生產(chǎn)者和消費(fèi)者用來與目標(biāo)綁定器(以及通過MQ與其他應(yīng)用程序)通信的規(guī)范數(shù)據(jù)結(jié)構(gòu)。
Stream 核心組件關(guān)系圖
快速入門
依賴:
<properties>
<spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
應(yīng)用配置:
spring:
rabbitmq:
host: localhost
virtual-host: bus
port: 5672
username: xxx
password: xxx
---
spring:
cloud:
stream:
bindings:
#自定義輸入輸出
myInput:
#指定輸入通道對(duì)應(yīng)的主題名
destination: demo
myOutput:
destination: demo
創(chuàng)建消息通道綁定的接口:
public interface StreamBinding {
String INPUT = "myInput";
String OUTPUT = "myOutput";
(StreamBinding.INPUT)
SubscribableChannel input();
(StreamBinding.OUTPUT)
MessageChannel output();
}
通過 @Input和 @Output注解定義輸入通道和輸出通道名稱,這里的名稱與上面配置文件中的是對(duì)應(yīng)的。
當(dāng)定義輸出通道的時(shí)候,需要返回 MessageChannel 接口對(duì)象,該接口定義了向消息通道發(fā)送消息的方法;定義輸入通道時(shí),需要返回 SubscribableChannel 接口對(duì)象,該接口集成自 MessageChannel 接口,它定義了維護(hù)消息通道訂閱者的方法。
這里的Input,Output兩個(gè)方法容器會(huì)分別創(chuàng)建一個(gè)Bean對(duì)象。
創(chuàng)建消費(fèi)者:
(value = {StreamBinding.class})
public class StreamReceiver {
private Logger logger = LoggerFactory.getLogger(StreamReceiver.class);
(StreamBinding.INPUT)
public void receive(String message) {
logger.info("接收到消息: {}", message);
}
}
@EnableBinding 注解用來指定一個(gè)或多個(gè)定義了 @Input 或 @Output 注解的接口,以此實(shí)現(xiàn)對(duì)消息通道(Channel)的綁定。上面我們通過 @EnableBinding(value = {StreamClient.class}) 綁定了 StreamClient 接口,該接口是我們自己實(shí)現(xiàn)的對(duì)輸入輸出消息通道綁定的定義。
@StreamListener,主要定義在方法上,作用是將被修飾的方法注冊(cè)為消息中間件上數(shù)據(jù)流的事件監(jiān)聽器,注解中的屬性值對(duì)應(yīng)了監(jiān)聽的消息通道名。上面我們將 receive 方法注冊(cè)為 myInput 消息通道的監(jiān)聽處理器,當(dāng)我們往這個(gè)消息通道發(fā)送信息的時(shí)候,receiver 方法會(huì)執(zhí)行。
消息發(fā)送接口:
private StreamBinding streamBinding;
("/send")
public void send() {
streamBinding.output().send(MessageBuilder.withPayload("First Message...").build());
}
啟動(dòng)服務(wù):
查看RabbitMQ
自動(dòng)為我們創(chuàng)建了一個(gè)隊(duì)列,隊(duì)列的名稱是以我們?cè)谂渲梦募信渲玫拈_頭,后面是隨機(jī)生成的。這個(gè)隊(duì)列會(huì)自動(dòng)刪除AD,服務(wù)關(guān)閉后就自動(dòng)刪除隊(duì)列;Excl:排他的,存在該隊(duì)列就不會(huì)在創(chuàng)建了。
修改端口后,再啟動(dòng)一個(gè)服務(wù):
創(chuàng)建了2個(gè)隊(duì)列,使用其中一個(gè)發(fā)送消息:
兩個(gè)服務(wù)都收到了消息。
消費(fèi)者組
上面啟動(dòng)了2個(gè)服務(wù)都能收到消息,在集群的環(huán)境下這樣肯定會(huì)帶來問題,如果是業(yè)務(wù)方面的就會(huì)出現(xiàn)重復(fù)數(shù)據(jù),這時(shí)候我們可以通過設(shè)置分組的解決此問題。修改配置:
spring:
cloud:
stream:
bindings:
myInput:
#指定輸入通道對(duì)應(yīng)的主題名
destination: demo
#指定一個(gè)組;指定分組以后,不管你啟動(dòng)多少個(gè)實(shí)例,所有的實(shí)例都監(jiān)聽這一個(gè)隊(duì)列
#多個(gè)實(shí)例會(huì)輪詢的接收消息
group: g_test
myOutput:
destination: demo
再次啟動(dòng)服務(wù)后,兩個(gè)服務(wù)會(huì)輪詢的接收到消息。
啟動(dòng)服務(wù)后,兩個(gè)服務(wù)都同時(shí)監(jiān)聽同一個(gè)隊(duì)列。隊(duì)列也不是隨機(jī)生成的了,并且隊(duì)列是持久化的,服務(wù)斷開后隊(duì)列也不會(huì)自動(dòng)刪除。
消息分區(qū)
通過消費(fèi)組的設(shè)置,雖然能保證同一消息只被一個(gè)消費(fèi)者進(jìn)行接收和處理,但是對(duì)于特殊業(yè)務(wù)情況,除了要保證單一實(shí)例消費(fèi)之外,還希望那些具備相同特征的消息都能被同一個(gè)實(shí)例消費(fèi),這個(gè)就可以使用 Spring Cloud Stream 提供的消息分區(qū)功能。修改配置。
spring:
cloud:
stream:
bindings:
myInput:
#指定輸入通道對(duì)應(yīng)的主題名
destination: demo
#指定一個(gè)組;指定分組以后,不管你啟動(dòng)多少個(gè)實(shí)例,所有的實(shí)例都監(jiān)聽這一個(gè)隊(duì)列
#多個(gè)實(shí)例會(huì)輪詢的接收消息
group: g_test
consumer:
#通過該參數(shù)開啟消費(fèi)者分區(qū)功能
partitioned: true
myOutput:
destination: demo
producer:
#這里的配置也可以是SpEL表達(dá)式,比如:headers['partition']通過消息header獲取屬性
#這里會(huì)通過表達(dá)式及消息對(duì)象進(jìn)行計(jì)算得到一個(gè)Key,然后獲取key的hashCode
# 得到hashCode以后會(huì)與partitionCount進(jìn)行取模運(yùn)算得到具體的分區(qū)
partitionKeyExpression: '1' #我這里給的值就是對(duì)應(yīng)的instanceIndex的值,你希望誰接收就設(shè)置誰配置的值即可
partitionCount: 2
#實(shí)例總數(shù)
instanceCount: 2
#該參數(shù)設(shè)置了當(dāng)前實(shí)例的索引號(hào),從 0 開始
instanceIndex: 0
計(jì)算分區(qū)源碼:
最后得到分區(qū)信息后會(huì)在消息頭中放入一個(gè)scst_partition為key,partition為值的頭信息。
啟動(dòng)多個(gè)實(shí)例后,測(cè)試發(fā)現(xiàn)所有的消息都只是同一個(gè)實(shí)例收到消息。
交換機(jī)分別與每一個(gè)服務(wù)進(jìn)行綁定使用不同的Routing Key這樣在發(fā)送消息的時(shí)候就可以根據(jù)計(jì)算處理的分區(qū)進(jìn)行定向發(fā)送消息了。
通過源碼查看:
這里通過我們的配置交換機(jī)為demo。接著是獲取路由key了。
這里會(huì)從消息header中獲取key = scst_partition的頭信息。
這樣針對(duì)使用RabbitMQ的中間件發(fā)送消息所需要的交換機(jī)及路由key就確定下來了。