成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

自研 Pulsar Starter:Winfun-Pulsar-Spring-Boot-Starter

云計算
Pulsar 作為新生代云原生消息隊列,越來越受到開發(fā)者的熱愛;而我們現(xiàn)在基本上的項目都是基于 SpringBoot 上開發(fā)的,但是我們可以發(fā)現(xiàn),至今都沒有比較大眾和成熟的關(guān)于 Pulsar 的 Starter,所以我們需要自己整一個,從而避免常規(guī)使用 Pulsar API 時產(chǎn)生大量的重復代碼。

 [[420613]]

里程碑

版本 功能點 作者 完成
1.0.0 支持PulsarTemplate發(fā)送消息&支持自定義注解實例化Consumer監(jiān)聽消息 howinfun
1.1.0 支持動態(tài)開啟/關(guān)閉Consumer消費線程池、支持自定義配置Consuemr消費線程池參數(shù) howinfun
1.2.0 支持Spring容器停止時,釋放Pulsar所有相關(guān)資源 howinfun TODO
1.3.0 支持多Pulsar數(shù)據(jù)源 howinfun TODO
 

一、背景

Pulsar 作為新生代云原生消息隊列,越來越受到開發(fā)者的熱愛;而我們現(xiàn)在基本上的項目都是基于 SpringBoot 上開發(fā)的,但是我們可以發(fā)現(xiàn),至今都沒有比較大眾和成熟的關(guān)于 Pulsar 的 Starter,所以我們需要自己整一個,從而避免常規(guī)使用 Pulsar API 時產(chǎn)生大量的重復代碼。

二、設(shè)計思路

由于是第一版的設(shè)計,所以我們是從簡單開始,不會一開始就設(shè)計得很復雜,盡量保留 Pulsar API 原生的功能。

2.1、PulsarClient

我們都知道,不管是 Producer 還是 Consumer,都是由 PulsarClient 創(chuàng)建的。

當然了,PulsarClient 可以根據(jù)業(yè)務(wù)需要自定義很多參數(shù),但是第一版的設(shè)計只會支持比較常用的參數(shù)。

我們這個組件支持下面功能點:

  • 支持 PulsarClient 參數(shù)配置外部化,參數(shù)可配置在 applicatin.properties 中。
  • 支持 applicatin.properties 提供配置提示信息。
  • 讀取外部配置文件,根據(jù)參數(shù)實例化 PulsarClient,并注入到 IOC 容器中。

2.2、Producer

Producer是發(fā)送消息的組件。

  • 這里我們提供一個模版類,可以根據(jù)需求創(chuàng)建對應(yīng)的 Producer 實例。
  • 支持將 Topic<->Producer 關(guān)系緩存起來,避免重復創(chuàng)建 Producer 實例。
  • 支持同步/異步發(fā)送消息。

2.3、Consumer

Consumer是消費消息的組件。

  • 這里我們提供一個抽象類,開發(fā)者只需要集成此實現(xiàn)類并實現(xiàn) doReceive 方法即可,即消費消息的邏輯方法。
  • 接著還提供一個自定義注解,自定義注解支持自定義 Consmuer 配置,例如Topic、Tenant、Namespace等。
  • 實現(xiàn)類加入上述自定義注解后,組件將會自動識別并且生成對應(yīng)的 Consumer 實例。
  • 支持同步/線程池異步消費。

三、使用例子

3.1、引入依賴

  1. <dependency> 
  2.     <groupId>io.github.howinfun</groupId> 
  3.     <artifactId>winfun-pulsar-spring-boot-starter</artifactId> 
  4.     <version>1.1.0</version> 
  5. </dependency> 

 

3.2、加入配置

  1. pulsar.service-url=pulsar://127.0.0.1:6650 
  2. pulsar.tenant=winfun 
  3. pulsar.namespace=study 
  4. pulsar.operation-timeout=30 
  5. pulsar.io-threads=10 
  6. pulsar.listener-threads=10 

3.3、發(fā)送消息

  1. /** 
  2.  * 發(fā)送消息 
  3.  * @author: winfun 
  4.  **/ 
  5. @RestController 
  6. @RequestMapping("msg"
  7. public class MessageController { 
  8.  
  9.     @Autowired 
  10.     private PulsarTemplate pulsarTemplate; 
  11.     @Autowired 
  12.     private PulsarProperties pulsarProperties; 
  13.  
  14.     /*** 
  15.      * 往指定topic發(fā)送消息 
  16.      * @author winfun 
  17.      * @param topic topic 
  18.      * @param msg msg 
  19.      * @return {@link String } 
  20.      **/ 
  21.     @GetMapping("/{topic}/{msg}"
  22.     public String send(@PathVariable("topic") String topic,@PathVariable("msg") String msg) throws Exception { 
  23.         this.pulsarTemplate.createBuilder().persistent(Boolean.TRUE
  24.                 .tenant(this.pulsarProperties.getTenant()) 
  25.                 .namespace(this.pulsarProperties.getNamespace()) 
  26.                 .topic(topic) 
  27.                 .send(msg); 
  28.         return "success"
  29.     } 

3.4、消費消息

  1. /** 
  2.  * @author: winfun 
  3.  * @date: 2021/8/20 8:13 下午 
  4.  **/ 
  5. @Slf4j 
  6. @PulsarListener(topics = {"test-topic2"}, 
  7.                 threadPool = @ThreadPool( 
  8.                                         coreThreads = 2, 
  9.                                         maxCoreThreads = 3,  
  10.                                         threadPoolName = "test-thread-pool")) 
  11. public class ConsumerListener extends BaseMessageListener { 
  12.  
  13.     /** 
  14.      * 消費消息 
  15.      * @param consumer 消費者 
  16.      * @param msg 消息 
  17.      */ 
  18.     @Override 
  19.     protected void doReceived(Consumer<String> consumer, Message<String> msg) { 
  20.         log.info("成功消費消息:{}",msg.getValue()); 
  21.         try { 
  22.             consumer.acknowledge(msg); 
  23.         } catch (PulsarClientException e) { 
  24.             e.printStackTrace(); 
  25.         } 
  26.     } 
  27.  
  28.     /*** 
  29.      * 是否開啟異步消費 
  30.      * @return {@link Boolean } 
  31.      **/ 
  32.     @Override 
  33.     public Boolean enableAsync() { 
  34.         return Boolean.TRUE
  35.     } 

四、源碼

源碼就不放在這里分析了,大家可到Github上看看,如果有什么代碼上面的建議或意見,歡迎大家提MR。

責任編輯:武曉燕 來源: 不送花的程序猿
相關(guān)推薦

2021-07-26 11:09:43

NacosSpring Boot配置

2019-01-31 13:43:48

Spring BootStarter開發(fā)

2023-03-27 08:28:57

spring代碼,starter

2020-09-27 11:35:16

Spring BootStarterJava

2019-11-12 10:50:13

Spring BootstarterJava

2023-10-10 09:07:23

2021-02-03 09:04:11

解密Spring配置

2023-02-26 10:14:51

Spring第三方庫

2023-02-26 00:00:01

Spring數(shù)據(jù)庫組件

2024-01-29 08:09:21

ApacheLTS版本

2009-07-01 08:34:31

微軟Windows 7Starter

2024-09-06 17:55:27

Springboot開發(fā)

2024-01-16 08:08:12

SSD磁盤占用率

2021-08-24 07:57:26

KafkaRocketMQPulsar

2009-03-31 09:01:43

Windows 7微軟操作系統(tǒng)

2009-04-23 08:50:30

Windows 7微軟操作系統(tǒng)

2022-10-08 00:00:00

Spring數(shù)據(jù)庫項目

2009-08-13 09:30:06

2021-02-01 07:20:51

KafkaPulsar搜索

2021-09-14 08:44:11

負載均衡Bundle
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 中文一区| 国产精品一区二区在线 | 中文字幕欧美日韩一区 | 成人欧美一区二区三区在线播放 | 日本午夜精品 | 国产一区二区不卡 | 狠狠躁夜夜躁人人爽天天高潮 | 久久久www| 中文天堂在线一区 | 国产乱码精品一品二品 | 亚洲欧美在线一区 | 国产精品毛片久久久久久 | 国产高清精品一区 | 99精品在线 | 日本视频一区二区 | 国产一区二区三区久久久久久久久 | 欧美综合国产精品久久丁香 | 国产在线精品一区二区 | 神马福利 | 国产我和子的乱视频网站 | 亚洲国产精品日韩av不卡在线 | 成人一区av偷拍 | 中文字幕亚洲视频 | 欧美激情综合 | 欧美成人专区 | 久久精品99 | 亚洲理论在线观看电影 | 91精品欧美久久久久久久 | 国产特级毛片aaaaaa喷潮 | 亚洲精品一区二区三区中文字幕 | 综合色影院 | 九一精品 | 久久免费资源 | 一区二区国产精品 | 一区天堂 | a免费视频 | 日本精品久久久一区二区三区 | 欧美日韩高清 | 成人免费小视频 | 日韩一区二区三区在线 | 亚洲国产中文字幕 |