成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Stream幫你無感知切換消息中間件

開發 前端
這篇文章根據 Spring Cloud Stream 的官方文檔,對Stream做了一個整體的介紹,包括設計目標,應用場景,業務模型以及對外開放的注解,希望大家能夠學以致用。

哈嘍,大家好,我是了不起。

在實際的企業開發中,消息中間件是至關重要的組件之一。如常見的RabbitMQ和Kafka,這些中間件的差異性導致我們實際項目開發給我們造成了一定的困擾,這時候 Spring Cloud Stream 給我們提供了一種解耦合的方式。

簡介

Spring Cloud Stream 由一個中間件中立的核組成。

應用通過 Spring Cloud Stream 插入的Input(相當于消費者Consumer,它是從隊列中接收消息的)和Output(相當于生產者Producer,它是從隊列中發送消息的。)通道與外界交流。

通道通過指定中間件的Binder實現與外部代理連接。

業務開發者不再關注具體消息中間件,只需關注Binder對應用程序提供的抽象概念來使用消息中間件實現業務即可。

詳細介紹

核心概念

Spring Cloud Stream 為各大消息中間件產品提供了個性化的自動化配置實現,引用了發布-訂閱、消費組、分區的三個核心概念。

Spring Cloud Stream 提供了很多抽象和基礎組件來簡化消息驅動型微服務應用。包含以下內容:

  • Spring Cloud Stream的應用模型
  • 綁定抽象
  • 持久化發布/訂閱支持
  • 消費者組支持
  • 分片支持(Partitioning Support)
  • 可插拔API

應用模型

Spring Cloud Stream由一個中立的中間件內核組成。Spring Cloud Stream會注入輸入和輸出的channels,應用程序通過這些channels與外界通信,而channels則是通過一個明確的中間件Binder與外部brokers連接。

圖片圖片

各大消息中間件的綁定抽象

Spring Cloud Stream 提供對Kafka、Rabbit MQ、Redis、Gemfire的Binder實現。Spring Cloud Stream還包括了一個TestSupportBinder、TestSupportBinder預留一個未更改的channel以便于直接地、可靠地和channels通信。

分區支持

分區在有狀態處理中是一個很重要的概念,其重要性體現在性能和一致性上,要確保所有相關數據被一并處理,例如,在時間窗平均計算的例子中,給定傳感器測量結果應該都由同一應用實例進行計算。

Spring Cloud Stream支持在一個應用程序的多個實例之間數據分區,在分區的情況下,物理通信介質(例如,topic代理)被視為多分區結構。一個或多個生產者應用程序實例將數據發送給多個消費應用實例,并保證共同的特性的數據由相同的消費者實例處理。

Spring Cloud Stream 提供了一個通用的抽象,用于統一方式進行分區處理,因此分區可以用于自帶分區的代理(如Kafka)或者不帶分區的代理(如RabbieMQ)

編程模型

Spring Cloud Stream 提供了一些預定義的注解,用于綁定輸入和輸出channels,以及如何監聽channels。

通過@EnableBinding觸發綁定

將@EnableBinding注解添加到應用的配置類,就可以把一個spring應用轉換成Spring Cloud Stream應用,@EnableBinding注解本身就包含@Configuration注解,會觸發Spring Cloud Stream 基本配置。

@Import(...)
@Configuration
@EnableIntegration
public @interface EnableBinding {
    ...
    Class<?>[] value() default {};
}

@Input 與 @Output

一個Spring Cloud Stream應用可以有任意數目的input和output通道,后者通過@Input和@Output注解在接口中定義。

@StreamListener

定義在方法中,被修飾的方法注冊為消息中間件上數據流的事件監聽器,注解中屬性值對應了監聽的消息通道名。

Source、Sink和Processor

Spring Cloud Stream提供了三個開箱即用的預定義接口。

  • Source用于有單個輸出(outbound)通道的應用。
public interface Source {

  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();

}
  • Sink用于有單個輸入(inbound)通道的應用。
public interface Sink {

  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();

}
  • Processor用于單個應用同時包含輸入和輸出通道的情況。
public interface Processor extends Source, Sink {
}

極簡實例

下面是一個非常簡單的 SpringBootApplication應用,通過依賴Spring Cloud Stream,從Input通道監聽消息然后返回應答到Output通道,只要添加配置文件就可以應用。

@SpringBootApplication
@EnableBinding(Processor.class)
public class ServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyLoggerServiceApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public LogMessage enrichLogMessage(LogMessage log) {
        return new LogMessage(String.format("[1]: %s", log.getMessage()));
    }
}

下面解釋下這個示例中相關注解的應用:

  • @EnableBinding聲明了這個應用程序綁定了2個通道:INPUT和OUTPUT。這2個通道是在接口Processor中定義的(Spring Cloud Stream默認設置)。所有通道都是配置在一個具體的消息中間件或綁定器中。
  • @StreamListener(Processor.INPUT)表明這里在input中提取消息,并且處理。
  • @SendTo(Processor.OUTPUT)表明在output中返回消息。

其他特性

消息發送失敗的處理

消息發送失敗后悔發送到默認的一個“topic.errors"的channel中(topic是配置的destination)。要配置消息發送失敗的處理,需要將錯誤消息的channel打開。

消費者配置如下

spring:
  application:
    name: spring-cloud-stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              group: test
              sync: true
      bindings:
        output:
          destination: stream-test-topic
          content-type: text/plain # 內容格式。這里使用 JSON
          producer:
            errorChannelEnabled: true

在啟動類中配置錯誤消息的Channel信息

@Bean("stream-test-topic.errors")
MessageChannel testoutPutErrorChannel(){
    return new PublishSubscribeChannel();
}

新建異常處理service

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

@Service
public class ErrorProducerService {

    @ServiceActivator(inputChannel = "stream-test-topic.errors")
    public void receiveProducerError(Message message){
        System.out.println("receive error msg :"+message);
    }
}

當發生異常時,由于測試類中已經將異常捕獲,處理發送異常主要是在這里進行。

總結

這篇文章根據 Spring Cloud Stream 的官方文檔,對Stream做了一個整體的介紹,包括設計目標,應用場景,業務模型以及對外開放的注解,希望大家能夠學以致用。

責任編輯:武曉燕 來源: Java技術指北
相關推薦

2019-12-13 10:32:56

開源消息中間件

2023-10-24 07:50:18

消息中間件MQ

2023-06-29 10:10:06

Rocket MQ消息中間件

2022-11-02 10:08:46

分布式高并發消息中間件

2015-08-11 11:16:36

淘寶中間件

2021-12-14 10:39:12

中間件ActiveMQRabbitMQ

2022-08-09 08:31:29

RocketMQ消息中間件

2023-05-08 08:09:26

路由元信息謂詞

2022-10-21 10:48:17

消息中間件互聯網應用協議

2022-02-13 23:04:28

RedisRabbitMQKafka

2019-07-19 07:56:13

消息隊列消息代理消息中間件

2011-05-24 15:10:48

2021-02-11 08:21:02

中間件開發CRUD

2019-11-18 09:58:11

中間件投遞模式

2022-12-27 17:56:40

ack機制RocketMQ

2023-10-16 12:25:48

2024-07-11 11:17:00

消息隊列Java

2022-09-21 16:09:28

消息中間件

2022-10-28 13:33:05

Push模式互聯網高并發

2018-02-01 10:19:22

中間件服務器系統
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 免费观看成人性生生活片 | 欧美日产国产成人免费图片 | 国产女人与拘做视频免费 | 九色 在线| 欧美日本一区 | 精品久久久久一区二区国产 | 国产黄色免费网站 | 亚洲精品第一国产综合野 | a级片网站 | 天天操人人干 | 国产高清精品在线 | 九九久久精品视频 | 免费一级淫片aaa片毛片a级 | 日韩一区二区三区在线视频 | 日韩国产专区 | 激情在线视频网站 | 国产资源网| 无码日韩精品一区二区免费 | 免费在线观看一区二区 | 国产在线一区观看 | 国产女人第一次做爰毛片 | 国产视频1 | 午夜精品久久久久久久久久久久久 | 精品一区二区三区电影 | 999久久久久久久久6666 | 欧美国产中文字幕 | 亚洲国产一区二区三区在线观看 | 久久机热| 999国产视频 | 欧美色专区| 99福利 | 精品久久99 | 久久99精品久久久久 | 久久免费精品视频 | 欧美在线国产精品 | 久久99久久99精品免视看婷婷 | 天天爱av | 午夜精品久久久久久久99黑人 | 欧美精品区 | 黄色片网此 | 先锋影音资源网站 |