成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

SpringBoot整合分布式消息平臺(tái)Pulsar

開(kāi)發(fā) 前端 分布式
從 SpringBoot 整合 Java 客戶端使用來(lái)看,Pulsar 的 api 是非常友好的,使用起來(lái)方便簡(jiǎn)潔。Consumer 的使用需要考慮多一些,需要考慮到批量、異步以及訂閱類型。

大家好,我是君哥。

作為優(yōu)秀的消息流平臺(tái),Pulsar 的使用越來(lái)越多,這篇文章講解 Pulsar 的 Java 客戶端。

部署 Pulsar

Pulsar 的部署方式主要有 3 種,本地安裝二進(jìn)制文件、docker 部署、在 Kubernetes 上部署。

本文采用 docker 部署一個(gè)單節(jié)點(diǎn)的 Pulsar 集群。實(shí)驗(yàn)環(huán)境是 2 核 CPU 和 4G 內(nèi)存。

部署命令如下:

  1. docker run -it -p 6650:6650  -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.9.1 bin/pulsar standalone 

安裝過(guò)程可能會(huì)出現(xiàn)下面的錯(cuò)誤:

  1. unknown flag: --mount 
  2. See 'docker run --help'

這是因?yàn)?docker 版本低,不支持 mount 參數(shù),把 docker 版本升級(jí)到 17.06 以上就可以了。

部署過(guò)程中可能會(huì)因?yàn)榫W(wǎng)絡(luò)的原因失敗,多試幾次就可以成功了。如果看到下面的日志,就說(shuō)明啟動(dòng)成功了。

  1. 2022-01-08T22:27:58,726+0000 [main] INFO  org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = 8080, broker url= pulsar://localhost:6650, cluster=standalone 

本地單節(jié)點(diǎn)集群?jiǎn)?dòng)后,會(huì)創(chuàng)建一個(gè) namespace,名字叫 public/default

Pulsar 客戶端

目前 Pulsar 支持多種語(yǔ)言的客戶端,包括:

Java 客戶端Go 客戶端Python 客戶端C++ 客戶端Node.js 客戶端WebSocket 客戶端C# 客戶端

SpringBoot 配置

使用 SpringBoot 整合 Pulsar 客戶端,首先引入 Pulsar 客戶端依賴,代碼如下:

  1. <dependency> 
  2.     <groupId>org.apache.pulsar</groupId> 
  3.     <artifactId>pulsar-client</artifactId> 
  4.     <version>2.9.1</version> 
  5. </dependency> 

然后在 properties 文件中添加配置:

  1. # Pulsar 地址 
  2. pulsar.url=pulsar://192.168.59.155:6650 
  3. # topic 
  4. pulsar.topic=testTopic 
  5. # consumer group 
  6. pulsar.subscription=topicGroup 

創(chuàng)建 Client

創(chuàng)建客戶端非常簡(jiǎn)單,代碼如下:

  1. client = PulsarClient.builder() 
  2.                 .serviceUrl(url) 
  3.                 .build(); 

上面的 url 就是 properties 文件中定義的 pulsar.url 。

創(chuàng)建 Client 時(shí),即使集群沒(méi)有啟成功,程序也不會(huì)報(bào)錯(cuò),因?yàn)檫@時(shí)還沒(méi)有真正地去連接集群。

創(chuàng)建 Producer

  1. producer = client.newProducer() 
  2.                 .topic(topic) 
  3.                 .compressionType(CompressionType.LZ4) 
  4.                 .sendTimeout(0, TimeUnit.SECONDS) 
  5.                 .enableBatching(true
  6.                 .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) 
  7.                 .batchingMaxMessages(1000) 
  8.                 .maxPendingMessages(1000) 
  9.                 .blockIfQueueFull(true
  10.                 .roundRobinRouterBatchingPartitionSwitchFrequency(10) 
  11.                 .batcherBuilder(BatcherBuilder.DEFAULT
  12.                 .create(); 

創(chuàng)建 Producer,會(huì)真正的連接集群,這時(shí)如果集群有問(wèn)題,就會(huì)報(bào)連接錯(cuò)誤。

下面解釋一下創(chuàng)建 Producer 的參數(shù):

topic:Producer 要寫(xiě)入的 topic。

compressionType:壓縮策略,目前支持 4 種策略 (NONE、LZ4、ZLIB、ZSTD),從 Pulsar2.3 開(kāi)始,只有 Consumer 的版本在 2.3 以上,這個(gè)策略才會(huì)生效。

sendTimeout:超時(shí)時(shí)間,如果 Producer 在超時(shí)時(shí)間為收到 ACK,會(huì)進(jìn)行重新發(fā)送。

enableBatching:是否開(kāi)啟消息批量處理,這里默認(rèn) true,這個(gè)參數(shù)只有在異步發(fā)送 (sendAsync) 時(shí)才能生效,選擇同步發(fā)送會(huì)失效。

batchingMaxPublishDelay:批量發(fā)送消息的時(shí)間段,這里定義的是 10ms,需要注意的是,設(shè)置了批量時(shí)間,就不會(huì)受消息數(shù)量的影響。批量發(fā)送會(huì)把要發(fā)送的批量消息放在一個(gè)網(wǎng)絡(luò)包里發(fā)送出去,減少網(wǎng)絡(luò) IO 次數(shù),大大提高網(wǎng)卡的發(fā)送效率。

batchingMaxMessages:批量發(fā)送消息的最大數(shù)量。

maxPendingMessages:等待從 broker 接收 ACK 的消息隊(duì)列最大長(zhǎng)度。如果這個(gè)隊(duì)列滿了,producer 所有的 sendAsync 和 send 都會(huì)失敗,除非設(shè)置了 blockIfQueueFull 值是 true。

blockIfQueueFull:Producer 發(fā)送消息時(shí)會(huì)把消息先放入本地 Queue 緩存,如果緩存滿了,就會(huì)阻塞消息發(fā)送。

roundRobinRouterBatchingPartition-SwitchFrequency:如果發(fā)送消息時(shí)沒(méi)有指定 key,那默認(rèn)采用 round robin 的方式發(fā)送消息,使用 round robin 的方式,切換 partition 的周期是 (frequency * batchingMaxPublishDelay)。

創(chuàng)建 Consumer

Pulsar 的消費(fèi)模型如下圖:

從圖中可以看到,Consumer 要綁定一個(gè) subscription 才能進(jìn)行消費(fèi)。

  1. consumer = client.newConsumer() 
  2.         .topic(topic) 
  3.         .subscriptionName(subscription) 
  4.         .subscriptionType(SubscriptionType.Shared) 
  5.         .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) 
  6.         .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS) 
  7.         .receiverQueueSize(1000) 
  8.         .subscribe(); 

下面解釋一下創(chuàng)建 Consumer 的參數(shù):

topic:Consumer 要訂閱的 topic。

subscriptionName:consumer 要關(guān)聯(lián)的 subscription 名字。

subscriptionType:訂閱類型,Pulsar 支持四種類型訂閱:

Exclusive:獨(dú)占模式,同一個(gè) Topic 只能有一個(gè)消費(fèi)者,如果多個(gè)消費(fèi)者,就會(huì)出錯(cuò)。Failover:災(zāi)備模式,同一個(gè) Topic 可以有多個(gè)消費(fèi)者,但是只能有一個(gè)消費(fèi)者消費(fèi),其他消費(fèi)者作為故障轉(zhuǎn)移備用,如果當(dāng)前消費(fèi)者出了故障,就從備用消費(fèi)者中選擇一個(gè)進(jìn)行消費(fèi)。如下圖:

Shared:共享模式,同一個(gè) Topic 可以由多個(gè)消費(fèi)者訂閱和消費(fèi)。消息通過(guò) round robin 輪詢機(jī)制分發(fā)給不同的消費(fèi)者,并且每個(gè)消息僅會(huì)被分發(fā)給一個(gè)消費(fèi)者。當(dāng)消費(fèi)者斷開(kāi),如果發(fā)送給它消息沒(méi)有被消費(fèi),這些消息會(huì)被重新分發(fā)給其它存活的消費(fèi)者。如下圖:

Key_Shared:消息和消費(fèi)者都會(huì)綁定一個(gè)key,消息只會(huì)發(fā)送給綁定同一個(gè)key的消費(fèi)者。如果有新消費(fèi)者建立連接或者有消費(fèi)者斷開(kāi)連接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好處是既可以讓消費(fèi)者并發(fā)地消費(fèi)消息,又能保證同一Key下的消息順序。如下圖:

subscriptionInitialPosition:創(chuàng)建新的 subscription 時(shí)從哪里開(kāi)始消費(fèi),有兩個(gè)選項(xiàng):

Latest:從最新的消息開(kāi)始消費(fèi)Earliest:從最早的消息開(kāi)始消費(fèi)

negativeAckRedeliveryDelay:消費(fèi)失敗后間隔多久 broker 重新發(fā)送。

receiverQueueSize:在調(diào)用 receive 方法之前,最多能累積多少條消息??梢栽O(shè)置為 0,這樣每次只從 broker 拉取一條消息。在 Shared 模式下,receiverQueueSize 設(shè)置為 0,可以防止批量消息多發(fā)給一個(gè) Consumer 而導(dǎo)致其他 Consumer 空閑。

Consumer 接收消息有四種方式:同步單條、同步批量、異步單條和異步批量,代碼如下:

  1. Message message = consumer.receive() 
  2. CompletableFuture<Message> message = consumer.receiveAsync(); 
  3. Messages message = consumer.batchReceive(); 
  4. CompletableFuture<Messages> message = consumer.batchReceiveAsync(); 

對(duì)于批量接收,也可以設(shè)置批量接收的策略,代碼如下:

  1. consumer = client.newConsumer() 
  2.     .topic(topic) 
  3.     .subscriptionName(subscription) 
  4.         .batchReceivePolicy(BatchReceivePolicy.builder() 
  5.         .maxNumMessages(100) 
  6.         .maxNumBytes(1024 * 1024) 
  7.         .timeout(200, TimeUnit.MILLISECONDS) 
  8.         .build()) 
  9.     .subscribe(); 

代碼中的參數(shù)說(shuō)明如下:

maxNumMessages:批量接收的最大消息數(shù)量。maxNumBytes:批量接收消息的大小,這里是 1MB。

測(cè)試

首先編寫(xiě) Producer 發(fā)送消息的代碼,如下:

  1. public void sendMsg(String key, String data) { 
  2.     CompletableFuture<MessageId> future = producer.newMessage() 
  3.         .key(key
  4.         .value(data.getBytes()).sendAsync(); 
  5.     future.handle((v, ex) -> { 
  6.         if (ex == null) { 
  7.             logger.info("發(fā)送消息成功, key:{}, msg: {}"key, data); 
  8.         } else { 
  9.             logger.error("發(fā)送消息失敗, key:{}, msg: {}"key, data); 
  10.         } 
  11.         return null
  12.     }); 
  13.     future.join(); 
  14.     logger.info("發(fā)送消息完成, key:{}, msg: {}"key, data); 

然后編寫(xiě)一個(gè) Consumer 消費(fèi)消息的代碼,如下:

  1. public void start() throws Exception{ 
  2.     while (true) { 
  3.         Message message = consumer.receive(); 
  4.         String key = message.getKey(); 
  5.         String data = new String(message.getData()); 
  6.         String topic = message.getTopicName(); 
  7.         if (StringUtils.isNotEmpty(data)) { 
  8.             try{ 
  9.                 logger.info("收到消息, topic:{}, key:{}, data:{}", topic, key, data); 
  10.             }catch(Exception e){ 
  11.                 logger.error("接收消息異常,topic:{}, key:{}, data:{}", topic, key, data, e); 
  12.             } 
  13.         } 
  14.         consumer.acknowledge(message); 
  15.     } 

最后編寫(xiě)一個(gè) Controller 類,調(diào)用 Producer 發(fā)送消息,代碼如下:

  1. @RequestMapping("/send"
  2. @ResponseBody 
  3. public String send(@RequestParam String key, @RequestParam String data) { 
  4.     logger.info("收到消息發(fā)送請(qǐng)求, key:{}, value:{}"key, data); 
  5.     pulsarProducer.sendMsg(key, data); 
  6.     return "success"

調(diào)用 Producer 發(fā)送一條消息,key=key1,data=data1,具體操作為在瀏覽器中輸入下面的 url 后回車:

  1. http://192.168.157.1:8083/pulsar/send?key=key1&data=data1 

可以看到控制臺(tái)輸出下面日志:

  1. 2022-01-08 22:42:33,199 [pulsar-client-io-6-1] [INFO] boot.pulsar.PulsarProducer - 發(fā)送消息成功, key:key1, msg: data1 
  2. 2022-01-08 22:42:33,200 [http-nio-8083-exec-1] [INFO] boot.pulsar.PulsarProducer - 發(fā)送消息完成, key:key1, msg: data1 
  3. 2022-01-08 22:42:33,232 [Thread-22] [INFO] boot.pulsar.PulsarConsumer - 收到消息, topic:persistent://public/default/testTopic, key:key1, data:data1 
  4. 2022-01-08 22:43:14,498 [pulsar-timer-5-1] [INFO] org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [testTopic] [topicGroup] [7def6] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0 
  5. 2022-01-08 22:43:14,961 [pulsar-timer-9-1] [INFO] org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [testTopic] [standalone-9-0] Pending messages: 0 --- Publish throughput: 0.02 msg/s --- 0.00 Mbit/s --- Latency: med: 69.000 ms - 95pct: 69.000 ms - 99pct: 69.000 ms - 99.9pct: 69.000 ms - max: 69.000 ms --- Ack received rate: 0.02 ack/s --- Failed messages: 0 

從日志中看到,這里使用的 namespace 就是創(chuàng)建集群時(shí)生成的public/default。

總結(jié)

從 SpringBoot 整合 Java 客戶端使用來(lái)看,Pulsar 的 api 是非常友好的,使用起來(lái)方便簡(jiǎn)潔。Consumer 的使用需要考慮多一些,需要考慮到批量、異步以及訂閱類型。

 

責(zé)任編輯:姜華 來(lái)源: 君哥聊技術(shù)
相關(guān)推薦

2023-01-13 07:39:07

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)

2023-07-18 07:23:46

分布式消息工具

2023-01-04 09:23:58

2017-07-27 14:32:05

大數(shù)據(jù)分布式消息Kafka

2023-02-28 07:01:11

分布式緩存平臺(tái)

2020-05-18 14:00:01

Dubbo分布式架構(gòu)

2022-12-13 09:19:26

分布式消息隊(duì)列

2019-09-05 09:02:45

消息系統(tǒng)緩存高可用

2024-09-12 14:50:08

2022-06-28 08:37:07

分布式服務(wù)器WebSocket

2024-11-14 11:56:45

2021-07-14 07:17:37

Springboot分布式UIDGenerato

2025-01-06 08:53:37

2017-08-30 16:47:49

Kafka設(shè)計(jì)原理

2023-07-26 07:28:55

WebSocket服務(wù)器方案

2025-04-29 04:00:00

分布式事務(wù)事務(wù)消息

2025-03-10 00:15:00

Axon開(kāi)源框架

2019-10-10 09:16:34

Zookeeper架構(gòu)分布式
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 日韩国产精品一区二区三区 | 免费性视频 | 国产精品揄拍一区二区 | 久久大 | 国产成人jvid在线播放 | 精品熟人一区二区三区四区 | 成人在线a| 日本不卡一区二区三区在线观看 | 欧美一区免费 | 免费在线一区二区 | 一区二区三区视频 | 久久精品亚洲精品 | 国产在线网址 | 狠狠草视频 | www.国产日本 | 亚洲精品在线视频 | www.99久久.com| 好姑娘影视在线观看高清 | 久久久网 | 久久久女女女女999久久 | 男女羞羞视频在线看 | 欧美一区二区三区在线观看 | 午夜影院在线观看免费 | 91精品国产综合久久小仙女图片 | 国产一二区视频 | 日韩中文一区 | 99久久免费精品国产男女高不卡 | 2021狠狠干| 四虎成人av | 色屁屁在线观看 | 免费视频一区二区三区在线观看 | 日韩成人免费 | 久草网站 | 一区二区三区视频在线免费观看 | 成人午夜视频在线观看 | 日韩三级电影一区二区 | 国产一区二区在线免费 | 亚洲综合无码一区二区 | 国产一区中文字幕 | 网站黄色在线免费观看 | 美女131mm久久爽爽免费 |