成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

非常簡單的SpringCloudStream集成Kafka教程!

開發(fā) 架構(gòu)
本章初步介紹了Spring Cloud Stream 集成Kafka的簡單示例,實(shí)現(xiàn)了簡單的發(fā)布-訂閱功能。但是Spring Cloud Stream肯定還有更多的功能,我們后續(xù)還將繼續(xù)深入學(xué)習(xí)更多Stream的功能。

哈嘍,大家好,我是指北君。

開發(fā)中,服務(wù)與服務(wù)之間通信通常會用到消息中間件,如果我們使用了某一個MQ,那么消息中間件與我們的系統(tǒng)算是高耦合。將來有一天,要替換成另外的MQ,我們的改動就會比較大。為了解決這個問題,我們可以使用Spring Cloud Stream 來整合我們的消息中間件,降低耦合度,使服務(wù)可以更多關(guān)注自己的業(yè)務(wù)邏輯等。

今天為大家?guī)硪粋€人人可實(shí)操的SpringCloudStream集成Kafka的快速入門示例。

1.前言

SpringCloudStream是一個構(gòu)建高擴(kuò)展性的事件消息驅(qū)動的微服務(wù)框架。簡單點(diǎn)說就是幫助你操作MQ,可以與底層MQ框架解耦。將來想要替換MQ框架的時候會比較容易。

Kafka是一個分布式發(fā)布 - 訂閱消息系統(tǒng),源于LinkedIn的一個項(xiàng)目,2011年成為開源Apache項(xiàng)目。

ZooKeeper 是 Apache 軟件基金會的一個軟件項(xiàng)目,它為大型分布式計(jì)算提供開源的分布式配置服務(wù)、同步服務(wù)和命名注冊,Kafka的實(shí)現(xiàn)同時也依賴于zookeeper。

2.Windows搭建簡單的Kafka

2.1 啟動zookeeper

使用Kafka首先需要啟動zookeeper,windows中搭建zookeeper也很簡單。以下幾步即可完成:

  • 下載zookeeper (本文使用3.7.0版本,下載鏈接在文章末尾。)
  • 配置基本環(huán)境變量:

將conf文件夾下面的 zoo_sample.cfg 重命名zoo.cfg。并修改其工作目錄dataDir。

bin文件夾下面有zkEnv.cmd有zookeeper相關(guān)的配置,其中就包括JAVA_HOME,所以系統(tǒng)環(huán)境變量需要配置JAVA_HOME,或者直接用Java的路徑來替換。

  • 啟動,在bin目錄下運(yùn)行zkServer.cmd腳本啟動zookeeper。

默認(rèn)啟動端口2181為。

正常啟動如下:

2.2 搭建Kafka

本地使用kafka同樣也是如下的幾個步驟:

  • 下載Kafka(本文使用2.11版本,下載鏈接見文章末尾)。
  • 環(huán)境變量配置:

查看config文件下面的 server.properties配置文件中的zookeeper的配置

zookeeper.connect=localhost:2181

在bin/windows文件夾下面kafka-run-class.bat文件中有JAVA_HOME的配置,同樣也可以直接改成系統(tǒng)的Java路徑

  • 在kafka根目錄下使用如下命令啟動kafka,并在zookeeper中注冊。
# .\bin\windows\kafka-server-start.bat .\config\server.properties
  • 創(chuàng)建topic,在bin\windows目錄下使用如下命令。創(chuàng)建名稱為“test”的topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic test
  • 使用windows命令窗口的producer和consumer,在bin\windows目錄下使用如下命令
#test topic的消息生產(chǎn)者
kafka-console-producer.bat --broker-list localhost:9092 --topic test
#test topic的消息消費(fèi)者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
#test topic的消息消費(fèi)者(從頭消費(fèi))
kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic

kafka啟動windows界面如下:

3 SpringCloudStream集成Kafka

3.1 引入依賴

由于我們直接使用Spring Cloud Stream 集成Kafka,官方也已經(jīng)有現(xiàn)成的starter。

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>

3.2 關(guān)于kafka的配置

spring:
application:
name: shop-server
cloud:
stream:
bindings:
#配置自己定義的通道與哪個中間件交互
input: #MessageChannel里Input和Output的值
destination: test #目標(biāo)主題 相當(dāng)于kafka的topic
output:
destination: test1 #本例子創(chuàng)建了另外一個topic (test1)用于區(qū)分不同的功能區(qū)分。
default-binder: kafka #默認(rèn)的binder是kafka
kafka:
binder:
zk-nodes: localhost:2181
bootstrap-servers: localhost:9092 #kafka服務(wù)地址,集群部署的時候需要配置多個,
consumer:
group-id: consumer1
producer:
key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
client-id: producer1
server:
port: 8100

3.3 消費(fèi)者示例

首先需要定義SubscribableChannel 接口方法使用Input注解。

public interface Sink {
String INPUT = "input";

@Input("input")
SubscribableChannel input();
}

然后簡單的使用 StreamListener 監(jiān)聽某一通道的消息。

@Service
@EnableBinding(Sink.class)
public class MessageSinkHandler {

@StreamListener(Sink.INPUT)
public void handler(Message<String> msg){
System.out.println(" received message : "+msg);

}
}

cloud stream配置中綁定了對應(yīng)的Kafka topic,如下:

cloud:
stream:
bindings:
#配置自己定義的通道與哪個中間件交互
input: #SubscribableChannel里Input值
destination: test #目標(biāo)主題

我們使用Kafka console producer 生產(chǎn)消息。

kafka-console-producer.bat --broker-list localhost:9092 --topic test

同時啟動我們的示例SpringBoot項(xiàng)目,使用producer推送幾條消息。

我們同時啟動一個Kafka console consumer

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

消費(fèi)結(jié)果如下:

Spring Boot 項(xiàng)目消費(fèi)消息如下:

3.4 生產(chǎn)者示例

首先需要定義生產(chǎn)者M(jìn)essageChannel,這里會用到Output注解

public interface KafkaSource {
String OUTPUT = "output";

@Output(KafkaSource.OUTPUT)
MessageChannel output();
}

使用MessageChannel 發(fā)送消息。

@Component
public class MessageService {

@Autowired
private KafkaSource source;

public Object sendMessage(Object msg) {
source.output().send(MessageBuilder.withPayload(msg).build());
return msg;
}

定義一個Rest API 來觸發(fā)消息發(fā)送

@RestController
public class MessageController {

@Autowired
private MessageService messageService;

@GetMapping(value = "/sendMessage/{msg}")
public String sendMessage(@PathVariable("msg") String msg){
messageService.sendMessage("messageService send out : " + msg + LocalDateTime.now());
return "sent message";
}
}

配置中關(guān)于producer的配置如下:

cloud:
stream:
bindings:
input:
destination: test
output:
destination: test1 #目標(biāo)topic

啟動SpringBoot App, 并觸發(fā)如下API call

??http://localhost:8100/sendMessage/JavaNorthProducer??

我們同時啟動一個Kafka console consumer,這里我們使用另一個test1 topic

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1

console consumer消費(fèi)消息如下:

總結(jié)

本章初步介紹了Spring Cloud Stream 集成Kafka的簡單示例,實(shí)現(xiàn)了簡單的發(fā)布-訂閱功能。但是Spring Cloud Stream肯定還有更多的功能,我們后續(xù)還將繼續(xù)深入學(xué)習(xí)更多Stream的功能。

以上示例倉庫:https://github.com/javatechnorth/java-study-note/tree/master/kafka

下載鏈接:

??https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz??

??https://kafka.apache.org/downloads??

責(zé)任編輯:武曉燕 來源: Java技術(shù)指北
相關(guān)推薦

2018-08-02 15:13:35

2021-12-01 12:30:43

NiceUmiJS前端

2020-09-29 15:08:47

Go UI框架開發(fā)

2010-02-05 13:56:56

Ubuntu Linu

2010-03-11 16:22:08

Python教程

2009-06-24 10:58:21

jQuery插件教程

2014-07-17 11:36:27

Android Stu使用教程

2019-12-03 11:00:08

spring bootspring-kafkJava

2014-04-24 13:35:11

OpenGL ES2.iOSAndroid

2011-05-11 15:10:21

jQueryCSS導(dǎo)航欄

2019-05-27 17:01:02

PHPPDO編程語言

2011-07-07 09:01:52

HTML 5

2024-08-05 08:45:35

SpringKafkaSCRAM

2010-07-06 11:09:52

Server 2008

2009-09-29 10:40:12

政府應(yīng)急指揮平臺

2024-10-31 11:49:41

Kafka管理死信隊(duì)列

2011-08-30 15:32:08

QtQuickQML

2020-02-21 17:33:17

SparkKafka數(shù)據(jù)

2009-07-06 14:43:30

JSP元素

2023-01-11 15:11:36

SpringEhcache
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 欧美日日| 奇米影视首页 | 国产欧美一区二区精品久导航 | 美女日批免费视频 | 免费视频久久久久 | 欧美韩一区二区 | 在线观看www高清视频 | 网址黄 | 中文字幕一区二区不卡 | 欧美一区二区三区在线播放 | 中文字幕在线观看一区 | 密桃av | 四季久久免费一区二区三区四区 | 国产96在线 | 成人免费淫片aa视频免费 | 天天拍天天草 | 亚洲国产成人av好男人在线观看 | 欧美在线日韩 | 国产欧美在线一区二区 | 亚洲精品视频在线观看视频 | 成人国产精品免费观看视频 | 国产在线网址 | 天天躁日日躁性色aⅴ电影 免费在线观看成年人视频 国产欧美精品 | 久久精品国产一区 | 欧美一区二区视频 | 午夜综合 | 蜜桃视频在线观看www社区 | 欧美v日韩 | 99国产精品久久久久久久 | www四虎影视 | 精品久久香蕉国产线看观看亚洲 | 国产美女高潮 | 亚洲欧美在线观看 | 欧美日韩亚 | 伊人免费在线观看 | 成人综合在线视频 | 在线免费观看a级片 | 日韩成人精品在线观看 | 国产一级片在线播放 | 亚洲第一免费播放区 | 91在线观看免费 |