Spring boot集成Kafka之spring-kafka深入探秘
前言
kafka是一個消息隊列產品,基于Topic partitions的設計,能達到非常高的消息發送處理性能。Spring創建了一個項目Spring-kafka,封裝了Apache 的Kafka-client,用于在Spring項目里快速集成kafka。除了簡單的收發消息外,Spring-kafka還提供了很多高級功能,下面我們就來一一探秘這些用法。
項目地址:https://github.com/spring-projects/spring-kafka
簡單集成
引入依賴
- <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.6.RELEASE</version></dependency>
添加配置
- spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
測試發送和接收
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/30
- */
- @SpringBootApplication
- @RestController
- public class Application {
- private final Logger logger = LoggerFactory.getLogger(Application.class);
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
- @Autowired
- private KafkaTemplate<Object, Object> template;
- @GetMapping("/send/{input}")
- public void sendFoo(@PathVariable String input) {
- this.template.send("topic_input", input);
- }
- @KafkaListener(id = "webGroup", topics = "topic_input")
- public void listen(String input) {
- logger.info("input value: {}" , input);
- }
- }
啟動應用后,在瀏覽器中輸入:http://localhost:8080/send/kl。就可以在控制臺看到有日志輸出了:input value: "kl"。基礎的使用就這么簡單。發送消息時注入一個KafkaTemplate,接收消息時添加一個@KafkaListener注解即可。
Spring-kafka-test嵌入式Kafka Server
不過上面的代碼能夠啟動成功,前提是你已經有了Kafka Server的服務環境,我們知道Kafka是由Scala + Zookeeper構建的,可以從官網下載部署包在本地部署。但是,我想告訴你,為了簡化開發環節驗證Kafka相關功能,Spring-Kafka-Test已經封裝了Kafka-test提供了注解式的一鍵開啟Kafka Server的功能,使用起來也是超級簡單。本文后面的所有測試用例的Kafka都是使用這種嵌入式服務提供的。
引入依賴
- <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><version>2.2.6.RELEASE</version><scope>test</scope></dependency>
啟動服務
下面使用Junit測試用例,直接啟動一個Kafka Server服務,包含四個Broker節點。
- @RunWith(SpringRunner.class)@SpringBootTest(classes = ApplicationTests.class)@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})public class ApplicationTests {@Testpublic void contextLoads()throws IOException { System.in.read(); }}
如上:只需要一個注解@EmbeddedKafka即可,就可以啟動一個功能完整的Kafka服務,是不是很酷。默認只寫注解不加參數的情況下,是創建一個隨機端口的Broker,在啟動的日志中會輸出具體的端口以及默認的一些配置項。不過這些我們在Kafka安裝包配置文件中的配置項,在注解參數中都可以配置,下面詳解下@EmbeddedKafka注解中的可設置參數 :
- value:broker節點數量
- count:同value作用一樣,也是配置的broker的節點數量
- controlledShutdown:控制關閉開關,主要用來在Broker意外關閉時減少此Broker上Partition的不可用時間
Kafka是多Broker架構的高可用服務,一個Topic對應多個partition,一個Partition可以有多個副本Replication,這些Replication副本保存在多個Broker,用于高可用。但是,雖然存在多個分區副本集,當前工作副本集卻只有一個,默認就是首次分配的副本集【首選副本】為Leader,負責寫入和讀取數據。當我們升級Broker或者更新Broker配置時需要重啟服務,這個時候需要將partition轉移到可用的Broker。下面涉及到三種情況
- 直接關閉Broker:當Broker關閉時,Broker集群會重新進行選主操作,選出一個新的Broker來作為Partition Leader,選舉時此Broker上的Partition會短時不可用
- 開啟controlledShutdown:當Broker關閉時,Broker本身會先嘗試將Leader角色轉移到其他可用的Broker上
- 使用命令行工具:使用bin/kafka-preferred-replica-election.sh,手動觸發PartitionLeader角色轉移
- ports:端口列表,是一個數組。對應了count參數,有幾個Broker,就要對應幾個端口號
- brokerProperties:Broker參數設置,是一個數組結構,支持如下方式進行Broker參數設置:
- @EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.threads = 8"})
- kerPropertiesLocation:Broker參數文件設置
功能同上面的brokerProperties,只是Kafka Broker的可設置參數達182個之多,都像上面這樣配置肯定不是最優方案,所以提供了加載本地配置文件的功能,如:
- @EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")
默認情況下,如果在使用KafkaTemplate發送消息時,Topic不存在,會創建一個新的Topic,默認的分區數和副本數為如下Broker參數來設定
創建新的Topic
- num.partitions = 1 #默認Topic分區數
- num.replica.fetchers = 1 #默認副本數
程序啟動時創建Topic
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/31
- */
- @Configuration
- public class KafkaConfig {
- @Bean
- public KafkaAdmin admin(KafkaProperties properties){
- KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
- admin.setFatalIfBrokerNotAvailable(true);
- return admin;
- }
- @Bean
- public NewTopic topic2() {
- return new NewTopic("topic-kl", 1, (short) 1);
- }
- }
如果Kafka Broker支持(1.0.0或更高版本),則如果發現現有Topic的Partition 數少于設置的Partition 數,則會新增新的Partition分區。關于KafkaAdmin有幾個常用的用法如下:
setFatalIfBrokerNotAvailable(true):默認這個值是False的,在Broker不可用時,不影響Spring 上下文的初始化。如果你覺得Broker不可用影響正常業務需要顯示的將這個值設置為True
setAutoCreate(false) : 默認值為True,也就是Kafka實例化后會自動創建已經實例化的NewTopic對象
initialize():當setAutoCreate為false時,需要我們程序顯示的調用admin的initialize()方法來初始化NewTopic對象
代碼邏輯中創建
有時候我們在程序啟動時并不知道某個Topic需要多少Partition數合適,但是又不能一股腦的直接使用Broker的默認設置,這個時候就需要使用Kafka-Client自帶的AdminClient來進行處理。上面的Spring封裝的KafkaAdmin也是使用的AdminClient來處理的。如:
- @Autowired
- private KafkaProperties properties;
- @Test
- public void testCreateToipc(){
- AdminClient client = AdminClient.create(properties.buildAdminProperties());
- if(client !=null){
- try {
- Collection<NewTopic> newnewTopics = new ArrayList<>(1);
- newTopics.add(new NewTopic("topic-kl",1,(short) 1));
- client.createTopics(newTopics);
- }catch (Throwable e){
- e.printStackTrace();
- }finally {
- client.close();
- }
- }
- }
ps:其他的方式創建Topic
上面的這些創建Topic方式前提是你的spring boot版本到2.x以上了,因為spring-kafka2.x版本只支持spring boot2.x的版本。在1.x的版本中還沒有這些api。下面補充一種在程序中通過Kafka_2.10創建Topic的方式
引入依賴
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.2.2</version>
- </dependency>
api方式創建
- @Test
- public void testCreateTopic()throws Exception{
- ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$)
- String topicName = "topic-kl";
- int partitions = 1;
- int replication = 1;
- AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties());
- }
注意下ZkClient最后一個構造入參,是一個序列化反序列化的接口實現,博主測試如果不填的話,創建的Topic在ZK上的數據是有問題的,默認的Kafka實現也很簡單,就是做了字符串UTF-8編碼處理。ZKStringSerializer$是Kafka中已經實現好的一個接口實例,是一個Scala的伴生對象,在Java中直接調用點MODULE$就可以得到一個實例
命令方式創建
- @Test
- public void testCreateTopic(){
- String [] options= new String[]{
- "--create",
- "--zookeeper","127.0.0.1:2181",
- "--replication-factor", "3",
- "--partitions", "3",
- "--topic", "topic-kl"
- };
- TopicCommand.main(options);
- }
消息發送之KafkaTemplate探秘
獲取發送結果
異步獲取
- template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
- @Override
- public void onFailure(Throwable throwable) {
- ......
- }
- @Override
- public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
- ....
- }
- });
同步獲取
- ListenableFuture<SendResult<Object,Object>> future = template.send("topic-kl","kl");
- try {
- SendResult<Object,Object> result = future.get();
- }catch (Throwable e){
- e.printStackTrace();
- }
kafka事務消息
默認情況下,Spring-kafka自動生成的KafkaTemplate實例,是不具有事務消息發送能力的。需要使用如下配置激活事務特性。事務激活后,所有的消息發送只能在發生事務的方法內執行了,不然就會拋一個沒有事務交易的異常
- spring.kafka.producer.transaction-id-prefix=kafka_tx.
當發送消息有事務要求時,比如,當所有消息發送成功才算成功,如下面的例子:假設第一條消費發送后,在發第二條消息前出現了異常,那么第一條已經發送的消息也會回滾。而且正常情況下,假設在消息一發送后休眠一段時間,在發送第二條消息,消費端也只有在事務方法執行完成后才會接收到消息
- @GetMapping("/send/{input}")
- public void sendFoo(@PathVariable String input) {
- template.executeInTransaction(t ->{
- t.send("topic_input","kl");
- if("error".equals(input)){
- throw new RuntimeException("failed");
- }
- t.send("topic_input","ckl");
- return true;
- });
- }
當事務特性激活時,同樣,在方法上面加@Transactional注解也會生效
- @GetMapping("/send/{input}")
- @Transactional(rollbackFor = RuntimeException.class)
- public void sendFoo(@PathVariable String input) {
- template.send("topic_input", "kl");
- if ("error".equals(input)) {
- throw new RuntimeException("failed");
- }
- template.send("topic_input", "ckl");
- }
Spring-Kafka的事務消息是基于Kafka提供的事務消息功能的。而Kafka Broker默認的配置針對的三個或以上Broker高可用服務而設置的。這邊在測試的時候為了簡單方便,使用了嵌入式服務新建了一個單Broker的Kafka服務,出現了一些問題:如
1、事務日志副本集大于Broker數量,會拋如下異常:
- Number of alive brokers '1' does not meet the required replication factor '3'
- for the transactions state topic (configured via 'transaction.state.log.replication.factor').
- This error can be ignored if the cluster is starting up and not all brokers are up yet.
默認Broker的配置transaction.state.log.replication.factor=3,單節點只能調整為1
2、副本數小于副本同步隊列數目,會拋如下異常
- Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2]
默認Broker的配置transaction.state.log.min.isr=2,單節點只能調整為1
ReplyingKafkaTemplate獲得消息回復
ReplyingKafkaTemplate是KafkaTemplate的一個子類,除了繼承父類的方法,新增了一個方法sendAndReceive,實現了消息發送\回復語義
- RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
也就是我發送一條消息,能夠拿到消費者給我返回的結果。就像傳統的RPC交互那樣。當消息的發送者需要知道消息消費者的具體的消費情況,非常適合這個api。如,一條消息中發送一批數據,需要知道消費者成功處理了哪些數據。下面代碼演示了怎么集成以及使用ReplyingKafkaTemplate
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/30
- */
- @SpringBootApplication
- @RestController
- public class Application {
- private final Logger logger = LoggerFactory.getLogger(Application.class);
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
- @Bean
- public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
- ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("replies");
- repliesContainer.getContainerProperties().setGroupId("repliesGroup");
- repliesContainer.setAutoStartup(false);
- return repliesContainer;
- }
- @Bean
- public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) {
- return new ReplyingKafkaTemplate(pf, repliesContainer);
- }
- @Bean
- public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> pf) {
- return new KafkaTemplate(pf);
- }
- @Autowired
- private ReplyingKafkaTemplate template;
- @GetMapping("/send/{input}")
- @Transactional(rollbackFor = RuntimeException.class)
- public void sendFoo(@PathVariable String input) throws Exception {
- ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
- RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
- ConsumerRecord<String, String> consumerRecord = replyFuture.get();
- System.err.println("Return value: " + consumerRecord.value());
- }
- @KafkaListener(id = "webGroup", topics = "topic-kl")
- @SendTo
- public String listen(String input) {
- logger.info("input value: {}", input);
- return "successful";
- }
- }
Spring-kafka消息消費用法探秘
@KafkaListener的使用
前面在簡單集成中已經演示過了@KafkaListener接收消息的能力,但是@KafkaListener的功能不止如此,其他的比較常見的,使用場景比較多的功能點如下:
- 顯示的指定消費哪些Topic和分區的消息,
- 設置每個Topic以及分區初始化的偏移量,
- 設置消費線程并發度
- 設置消息異常處理器
- @KafkaListener(id = "webGroup", topicPartitions = {
- @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
- @TopicPartition(topic = "topic2", partitions = "0",
- partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
- },concurrency = "6",errorHandler = "myErrorHandler")
- public String listen(String input) {
- logger.info("input value: {}", input);
- return "successful";
- }
其他的注解參數都很好理解,errorHandler需要說明下,設置這個參數需要實現一個接口KafkaListenerErrorHandler。而且注解里的配置,是你自定義實現實例在spring上下文中的Name。比如,上面配置為errorHandler = "myErrorHandler"。則在spring上線中應該存在這樣一個實例:
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/31
- */
- @Service("myErrorHandler")
- public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
- Logger logger =LoggerFactory.getLogger(getClass());
- @Override
- public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
- logger.info(message.getPayload().toString());
- return null;
- }
- @Override
- public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
- logger.info(message.getPayload().toString());
- return null;
- }
- }
手動Ack模式
手動ACK模式,由業務邏輯控制提交偏移量。比如程序在消費時,有這種語義,特別異常情況下不確認ack,也就是不提交偏移量,那么你只能使用手動Ack模式來做了。開啟手動首先需要關閉自動提交,然后設置下consumer的消費模式
- spring.kafka.consumer.enable-auto-commit=false
- spring.kafka.listener.ack-mode=manual
上面的設置好后,在消費時,只需要在@KafkaListener監聽方法的入參加入Acknowledgment 即可,執行到ack.acknowledge()代表提交了偏移量
- @KafkaListener(id = "webGroup", topics = "topic-kl")
- public String listen(String input, Acknowledgment ack) {
- logger.info("input value: {}", input);
- if ("kl".equals(input)) {
- ack.acknowledge();
- }
- return "successful";
- }
@KafkaListener注解監聽器生命周期
@KafkaListener注解的監聽器的生命周期是可以控制的,默認情況下,@KafkaListener的參數autoStartup = "true"。也就是自動啟動消費,但是也可以同過KafkaListenerEndpointRegistry來干預他的生命周期。KafkaListenerEndpointRegistry有三個動作方法分別如:start(),pause(),resume()/啟動,停止,繼續。如下代碼詳細演示了這種功能。
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/30
- */
- @SpringBootApplication
- @RestController
- public class Application {
- private final Logger logger = LoggerFactory.getLogger(Application.class);
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
- @Autowired
- private KafkaTemplate template;
- @GetMapping("/send/{input}")
- @Transactional(rollbackFor = RuntimeException.class)
- public void sendFoo(@PathVariable String input) throws Exception {
- ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
- template.send(record);
- }
- @Autowired
- private KafkaListenerEndpointRegistry registry;
- @GetMapping("/stop/{listenerID}")
- public void stop(@PathVariable String listenerID){
- registry.getListenerContainer(listenerID).pause();
- }
- @GetMapping("/resume/{listenerID}")
- public void resume(@PathVariable String listenerID){
- registry.getListenerContainer(listenerID).resume();
- }
- @GetMapping("/start/{listenerID}")
- public void start(@PathVariable String listenerID){
- registry.getListenerContainer(listenerID).start();
- }
- @KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false")
- public String listen(String input) {
- logger.info("input value: {}", input);
- return "successful";
- }
- }
在上面的代碼中,listenerID就是@KafkaListener中的id值“webGroup”。項目啟動好后,分別執行如下url,就可以看到效果了。
先發送一條消息:http://localhost:8081/send/ckl。因為autoStartup = "false",所以并不會看到有消息進入監聽器。
接著啟動監聽器:http://localhost:8081/start/webGroup。可以看到有一條消息進來了。
暫停和繼續消費的效果使用類似方法就可以測試出來了。
SendTo消息轉發
前面的消息發送響應應用里面已經見過@SendTo,其實除了做發送響應語義外,@SendTo注解還可以帶一個參數,指定轉發的Topic隊列。常見的場景如,一個消息需要做多重加工,不同的加工耗費的cup等資源不一致,那么就可以通過跨不同Topic和部署在不同主機上的consumer來解決了。如:
- @KafkaListener(id = "webGroup", topics = "topic-kl")
- @SendTo("topic-ckl")
- public String listen(String input) {
- logger.info("input value: {}", input);
- return input + "hello!";
- }
- @KafkaListener(id = "webGroup1", topics = "topic-ckl")
- public void listen2(String input) {
- logger.info("input value: {}", input);
- }
消息重試和死信隊列的應用
除了上面談到的通過手動Ack模式來控制消息偏移量外,其實Spring-kafka內部還封裝了可重試消費消息的語義,也就是可以設置為當消費數據出現異常時,重試這個消息。而且可以設置重試達到多少次后,讓消息進入預定好的Topic。也就是死信隊列里。下面代碼演示了這種效果:
- @Autowired
- private KafkaTemplate template;
- @Bean
- public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
- ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
- ConsumerFactory<Object, Object> kafkaConsumerFactory,
- KafkaTemplate<Object, Object> template) {
- ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
- configurer.configure(factory, kafkaConsumerFactory);
- //最大重試三次
- factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
- return factory;
- }
- @GetMapping("/send/{input}")
- public void sendFoo(@PathVariable String input) {
- template.send("topic-kl", input);
- }
- @KafkaListener(id = "webGroup", topics = "topic-kl")
- public String listen(String input) {
- logger.info("input value: {}", input);
- throw new RuntimeException("dlt");
- }
- @KafkaListener(id = "dltGroup", topics = "topic-kl.DLT")
- public void dltListen(String input) {
- logger.info("Received from DLT: " + input);
- }
上面應用,在topic-kl監聽到消息會,會觸發運行時異常,然后監聽器會嘗試三次調用,當到達最大的重試次數后。消息就會被丟掉重試死信隊列里面去。死信隊列的Topic的規則是,業務Topic名字+“.DLT”。如上面業務Topic的name為“topic-kl”,那么對應的死信隊列的Topic就是“topic-kl.DLT”
文末結語
最近業務上使用了kafka用到了Spring-kafka,所以系統性的探索了下Spring-kafka的各種用法,發現了很多好玩很酷的特性,比如,一個注解開啟嵌入式的Kafka服務、像RPC調用一樣的發送\響應語義調用、事務消息等功能。希望此博文能夠幫助那些正在使用Spring-kafka或即將使用的人少走一些彎路少踩一點坑。