成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Spring Cloud Stream使用詳解及部分重點(diǎn)源碼分析

開發(fā) 前端
Spring Cloud Stream是一個(gè)框架,用于構(gòu)建與MQ連接的高度可伸縮的事件驅(qū)動(dòng)微服務(wù)。其目的是為了簡(jiǎn)化消息在 Spring Cloud 應(yīng)用程序中的開發(fā)。

環(huán)境:Springboot2.3.12.RELEASE + Spring Cloud Hoxton.SR12 + RabbitMQ3.8.12

簡(jiǎn)介

Spring Cloud Stream是一個(gè)框架,用于構(gòu)建與MQ連接的高度可伸縮的事件驅(qū)動(dòng)微服務(wù)。其目的是為了簡(jiǎn)化消息在 Spring Cloud 應(yīng)用程序中的開發(fā)。屏蔽了各種MQ之間的差異,使得在更換MQ的時(shí)候不需要修改代碼。

Spring Cloud Stream支持多種綁定器實(shí)現(xiàn),如下:

  • RabbitMQ。
  • Apache Kafka。
  • Kafka Streams。
  • Amazon Kinesis。
  • Google PubSub (partner maintained)。
  • Solace PubSub+ (partner maintained)。
  • Azure Event Hubs (partner maintained)。
  • AWS SQS (partner maintained)。
  • AWS SNS (partner maintained)。
  • Apache RocketMQ (partner maintained)。

詳細(xì)查看官方文檔,對(duì)應(yīng)每一個(gè)MQ都有一個(gè)Github地址。

Spring Cloud Stream的核心構(gòu)建塊是:

  • 目標(biāo)綁定器(Destination Binders):負(fù)責(zé)與MQ集成的組件。
  • 目標(biāo)綁定(Destination Bindings):MQ中間件與最終用戶提供的應(yīng)用程序代碼(生產(chǎn)者/消費(fèi)者)之間的橋梁。
  • 消息(Message):生產(chǎn)者和消費(fèi)者用來與目標(biāo)綁定器(以及通過MQ與其他應(yīng)用程序)通信的規(guī)范數(shù)據(jù)結(jié)構(gòu)。

Stream 核心組件關(guān)系圖

快速入門

依賴:

<properties>
<spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

應(yīng)用配置:

spring:
rabbitmq:
host: localhost
virtual-host: bus
port: 5672
username: xxx
password: xxx
---
spring:
cloud:
stream:
bindings:
#自定義輸入輸出
myInput:
#指定輸入通道對(duì)應(yīng)的主題名
destination: demo
myOutput:
destination: demo

創(chuàng)建消息通道綁定的接口:

public interface StreamBinding { 
String INPUT = "myInput";
String OUTPUT = "myOutput";
@Input(StreamBinding.INPUT)
SubscribableChannel input();
@Output(StreamBinding.OUTPUT)
MessageChannel output();
}

通過 @Input和 @Output注解定義輸入通道和輸出通道名稱,這里的名稱與上面配置文件中的是對(duì)應(yīng)的。

當(dāng)定義輸出通道的時(shí)候,需要返回 MessageChannel 接口對(duì)象,該接口定義了向消息通道發(fā)送消息的方法;定義輸入通道時(shí),需要返回 SubscribableChannel 接口對(duì)象,該接口集成自 MessageChannel 接口,它定義了維護(hù)消息通道訂閱者的方法。

這里的Input,Output兩個(gè)方法容器會(huì)分別創(chuàng)建一個(gè)Bean對(duì)象。

創(chuàng)建消費(fèi)者:

@Component
@EnableBinding(value = {StreamBinding.class})
public class StreamReceiver {
private Logger logger = LoggerFactory.getLogger(StreamReceiver.class);
@StreamListener(StreamBinding.INPUT)
public void receive(String message) {
logger.info("接收到消息: {}", message);
}
}

@EnableBinding 注解用來指定一個(gè)或多個(gè)定義了 @Input 或 @Output 注解的接口,以此實(shí)現(xiàn)對(duì)消息通道(Channel)的綁定。上面我們通過 @EnableBinding(value = {StreamClient.class}) 綁定了 StreamClient 接口,該接口是我們自己實(shí)現(xiàn)的對(duì)輸入輸出消息通道綁定的定義。

@StreamListener,主要定義在方法上,作用是將被修飾的方法注冊(cè)為消息中間件上數(shù)據(jù)流的事件監(jiān)聽器,注解中的屬性值對(duì)應(yīng)了監(jiān)聽的消息通道名。上面我們將 receive 方法注冊(cè)為 myInput 消息通道的監(jiān)聽處理器,當(dāng)我們往這個(gè)消息通道發(fā)送信息的時(shí)候,receiver 方法會(huì)執(zhí)行。

消息發(fā)送接口:

@Resource
private StreamBinding streamBinding;
@GetMapping("/send")
public void send() {
streamBinding.output().send(MessageBuilder.withPayload("First Message...").build());
}

啟動(dòng)服務(wù):

查看RabbitMQ

自動(dòng)為我們創(chuàng)建了一個(gè)隊(duì)列,隊(duì)列的名稱是以我們?cè)谂渲梦募信渲玫拈_頭,后面是隨機(jī)生成的。這個(gè)隊(duì)列會(huì)自動(dòng)刪除AD,服務(wù)關(guān)閉后就自動(dòng)刪除隊(duì)列;Excl:排他的,存在該隊(duì)列就不會(huì)在創(chuàng)建了。

修改端口后,再啟動(dòng)一個(gè)服務(wù):

創(chuàng)建了2個(gè)隊(duì)列,使用其中一個(gè)發(fā)送消息:

兩個(gè)服務(wù)都收到了消息。

消費(fèi)者組

上面啟動(dòng)了2個(gè)服務(wù)都能收到消息,在集群的環(huán)境下這樣肯定會(huì)帶來問題,如果是業(yè)務(wù)方面的就會(huì)出現(xiàn)重復(fù)數(shù)據(jù),這時(shí)候我們可以通過設(shè)置分組的解決此問題。修改配置:

spring:
cloud:
stream:
bindings:
myInput:
#指定輸入通道對(duì)應(yīng)的主題名
destination: demo
#指定一個(gè)組;指定分組以后,不管你啟動(dòng)多少個(gè)實(shí)例,所有的實(shí)例都監(jiān)聽這一個(gè)隊(duì)列
#多個(gè)實(shí)例會(huì)輪詢的接收消息
group: g_test
myOutput:
destination: demo

再次啟動(dòng)服務(wù)后,兩個(gè)服務(wù)會(huì)輪詢的接收到消息。

啟動(dòng)服務(wù)后,兩個(gè)服務(wù)都同時(shí)監(jiān)聽同一個(gè)隊(duì)列。隊(duì)列也不是隨機(jī)生成的了,并且隊(duì)列是持久化的,服務(wù)斷開后隊(duì)列也不會(huì)自動(dòng)刪除。

消息分區(qū)

通過消費(fèi)組的設(shè)置,雖然能保證同一消息只被一個(gè)消費(fèi)者進(jìn)行接收和處理,但是對(duì)于特殊業(yè)務(wù)情況,除了要保證單一實(shí)例消費(fèi)之外,還希望那些具備相同特征的消息都能被同一個(gè)實(shí)例消費(fèi),這個(gè)就可以使用 Spring Cloud Stream 提供的消息分區(qū)功能。修改配置。

spring:
cloud:
stream:
bindings:
myInput:
#指定輸入通道對(duì)應(yīng)的主題名
destination: demo
#指定一個(gè)組;指定分組以后,不管你啟動(dòng)多少個(gè)實(shí)例,所有的實(shí)例都監(jiān)聽這一個(gè)隊(duì)列
#多個(gè)實(shí)例會(huì)輪詢的接收消息
group: g_test
consumer:
#通過該參數(shù)開啟消費(fèi)者分區(qū)功能
partitioned: true
myOutput:
destination: demo
producer:
#這里的配置也可以是SpEL表達(dá)式,比如:headers['partition']通過消息header獲取屬性
#這里會(huì)通過表達(dá)式及消息對(duì)象進(jìn)行計(jì)算得到一個(gè)Key,然后獲取key的hashCode
# 得到hashCode以后會(huì)與partitionCount進(jìn)行取模運(yùn)算得到具體的分區(qū)
partitionKeyExpression: '1' #我這里給的值就是對(duì)應(yīng)的instanceIndex的值,你希望誰接收就設(shè)置誰配置的值即可
partitionCount: 2
#實(shí)例總數(shù)
instanceCount: 2
#該參數(shù)設(shè)置了當(dāng)前實(shí)例的索引號(hào),從 0 開始
instanceIndex: 0

計(jì)算分區(qū)源碼:

最后得到分區(qū)信息后會(huì)在消息頭中放入一個(gè)scst_partition為key,partition為值的頭信息。

啟動(dòng)多個(gè)實(shí)例后,測(cè)試發(fā)現(xiàn)所有的消息都只是同一個(gè)實(shí)例收到消息。

交換機(jī)分別與每一個(gè)服務(wù)進(jìn)行綁定使用不同的Routing Key這樣在發(fā)送消息的時(shí)候就可以根據(jù)計(jì)算處理的分區(qū)進(jìn)行定向發(fā)送消息了。

通過源碼查看:

這里通過我們的配置交換機(jī)為demo。接著是獲取路由key了。

這里會(huì)從消息header中獲取key = scst_partition的頭信息。

這樣針對(duì)使用RabbitMQ的中間件發(fā)送消息所需要的交換機(jī)及路由key就確定下來了。

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2023-12-07 18:02:38

RabbitMQ異步通信

2021-04-28 06:26:11

Spring Secu功能實(shí)現(xiàn)源碼分析

2017-05-04 22:30:17

Zuul過濾器微服務(wù)

2019-02-25 15:44:16

開源RabbitMQSpring Clou

2023-08-26 19:04:40

配置write轉(zhuǎn)換器

2023-10-12 22:25:04

微服務(wù)Spring

2023-05-04 08:09:33

serviceId路徑謂詞中心注冊(cè)

2023-06-15 14:09:00

解析器Servlet容器

2024-01-29 08:28:01

Spring事務(wù)失效

2024-01-05 08:38:20

SpringBeanScope

2011-06-28 16:18:24

Qt QObject

2021-08-09 11:15:28

MybatisJavaSpring

2021-01-14 07:54:19

Spring Clou應(yīng)用路由

2021-01-07 07:40:31

驅(qū)動(dòng)微服務(wù)消息

2024-11-13 19:03:14

2015-04-24 09:33:11

Cloud Found組件分析PaaS

2021-08-31 06:45:19

IDC大數(shù)據(jù)數(shù)據(jù)中心

2012-02-23 12:53:40

JavaPlay Framew

2017-04-12 14:43:01

Spring ClouZuul過濾器

2017-07-03 08:29:42

Spring Clou服務(wù)詳解
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 四虎最新视频 | 国内av在线 | 羞羞涩涩在线观看 | 射欧美| 在线精品一区 | 一区二区三区日韩 | 免费在线一区二区 | 欧美一区二区三区,视频 | 亚洲国产一区二区三区四区 | 91精品国产一区二区三区 | 国产一区精品 | 日韩精品在线免费 | 久久综合国产 | 99精品免费久久久久久久久日本 | 精区3d动漫一品二品精区 | 欧美日韩久久 | 成人国产精品久久久 | 国产大学生情侣呻吟视频 | 精品综合久久久 | 国产精品成人一区二区三区夜夜夜 | 另类 综合 日韩 欧美 亚洲 | 国产9 9在线 | 中文 | 91av在线免费播放 | 日韩一二三区视频 | h视频免费看| 美女拍拍拍网站 | 亚洲a视频| 男女啪啪网址 | 国产在线一区二区 | 日韩成人在线观看 | 国产丝袜一区二区三区免费视频 | 日日骚视频| 91在线精品秘密一区二区 | 亚洲国产精品日韩av不卡在线 | 亚洲国产精久久久久久久 | wwww.xxxx免费 | av手机在线 | 2021狠狠天天天 | 久久久久久久久久久久久91 | 日本一区二区三区在线观看 | 天天人人精品 |