成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Kafka消費位點管理沒你想的那么簡單

云計算 Kafka
熟悉RocketMQ的小伙伴都知道RocketMQ已經默認幫我實現好了消息消費失敗重試,消費位點自動提交,死信隊列等功能,那么kafka是否也是如此呢?

背景

如果你習慣了使用RocketMQ這種自動擋管理消費位點,消息失敗重試的方式。你再來使用kafka,會發現kafka這種手動擋的消費位點管理就沒那么容易了

熟悉RocketMQ的小伙伴都知道RocketMQ已經默認幫我實現好了消息消費失敗重試,消費位點自動提交,死信隊列等功能,那么kafka是否也是如此呢?

kafka消費位點管理

kafka消費位點有兩種管理方式

  1. 手動提交消費位點
  2. 自動提交消費位點

自動提交消費位點

想要設置自動提交消費位點我們只需要設置兩個屬性

  1. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 自動提交消費位點
  2. ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG 自動提交消費位點的時間間隔

一個簡單的消費代碼如下

Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    // 自動提交消費位點的時間間隔
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList(TOPIC_NAME));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            try {
                handlerMessage(record);
            } catch (Exception e) {
                log.error("處理消息異常: {}", record, e);
                // 循環繼續
            }

        }
    }

自動提交消費位點有幾個缺點

  1. 會出現重復消費:比如Consumer每5秒自動提交一次位移,如果在第4秒時,消費了消息,但是還沒有提交位移,此時Consumer掛掉了,那么下次Consumer啟動時,會從上次提交的位移開始消費,這樣就會導致消息重復消費。 當然比如出現Rebalance也是會出現重復消費的情況
  2. 無法精準控制消費位點

手動提交消費位點

手動提交消費位點又分兩種

  1. 同步提交(commitSync)
  2. 異步提交(commitAsync)

同步提交(commitSync)

同步提交的方式很簡單,就是每次消費完通過調用API consumer.commitSync。

相關的代碼如下:

Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofSeconds(1));
            // 注意這里消費業務邏輯上消費失敗后的消息處理
            handlerMessage(records);
            try {
                // 消費成功后手動提交位點
                consumer.commitSync();
            } catch (CommitFailedException e) {
                // 消費位點提交失敗異常處理
                handleError(e); 
            }
        }

同步提交的方式有一個缺點,調用commitSync()時,Consumer會處于阻塞狀態,直到broker返回提交成功,嚴重影響消費性能。

異步提交(commitAsync)

異步提交的方式很簡單,就是每次消費完通過調用API consumer.commitAsync。

Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofSeconds(1));
            handlerMessage(records); // 處理消息
            consumer.commitAsync((offsets, exception) -> {
                if (exception != null)
                    handleError(exception);
            });
        }

commitAsync主要是提供了異步回調,通過回調來通知消費位點是否提交成功。

異步提交消費位點也有一些缺點,比如消費位點不能重復提交。因為提交位點失敗后,重新提交位點可能更晚的消費位點已經提交了,這里提交已經是沒有意義的了。

spring-kafka消息消費

可以看到不管是同步提交消費位點還是異步提交消費位點,都有一些問題,想要寫出生產可用的消費代碼,需要注意的細節非常多。

比如消費失敗后的消息如何處理,是停止消費跳出循環,還是說記錄消費失敗的消息,人工處理等。

這里我們可以簡單看看spring-kafka是如何消費消息的。

我們簡單看看主流程代碼:

圖片圖片

這里我們忽略源碼的一些其他細節。只分析主要的消費流程。

  • invokeOnMessage(cRecord); 處理消息

可以看到invokeOnMessage是被整個try-catch包裹的,這樣就保證了消費失敗后不會影響整個消費流程。

具體我們先看看消息正常處理的邏輯。

private void invokeOnMessage(final ConsumerRecord<K, V> cRecord) {

   if (cRecord.value() instanceof DeserializationException ex) {
    throw ex;
   }
   if (cRecord.key() instanceof DeserializationException ex) {
    throw ex;
   }
   if (cRecord.value() == null && this.checkNullValueForExceptions) {
    checkDeser(cRecord, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
   }
   if (cRecord.key() == null && this.checkNullKeyForExceptions) {
    checkDeser(cRecord, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
   }
   doInvokeOnMessage(cRecord);
   if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) {
    ackCurrent(cRecord);
   }
   if (this.isCountAck || this.isTimeOnlyAck) {
    doProcessCommits();
   }
  }

這里主要是一些異常校驗,然后就是判斷是否可以提交消費位點。如果可以則調用doProcessCommits()進行正常的消費位點提交。

  • doProcessCommits() 消費位點處理

如果消費位點提交失敗也會進行一些異常處理。

private void doProcessCommits() {
   if (!this.autoCommit && !this.isRecordAck) {
    try {
     processCommits();
    }
    catch (CommitFailedException cfe) {
     if (this.remainingRecords != null && !this.isBatchListener) {
      ConsumerRecords<K, V> pending = this.remainingRecords;
      this.remainingRecords = null;
      List<ConsumerRecord<?, ?>> records = new ArrayList<>();
      for (ConsumerRecord<K, V> kvConsumerRecord : pending) {
       records.add(kvConsumerRecord);
      }
      this.commonErrorHandler.handleRemaining(cfe, records, this.consumer,
        KafkaMessageListenerContainer.this.thisOrParentContainer);
     }
    }
   }
  }

如果消費位點提交失敗則會調用commonErrorHandler進行異常處理。

commonErrorHandler有多個實現類,有一個默認實現DefaultErrorHandler

  • 消息消費失敗異常處理

如果消息消費失敗,也提供了一個異常處理擴展invokeErrorHandler(cRecord, iterator, e);

里面實際使用的也是DefaultErrorHandler

核心的處理邏輯主要還是在SeekUtils中封裝

  • DefaultErrorHandler
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
   Consumer<?, ?> consumer, MessageListenerContainer container) {

  SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
    getFailureTracker(), this.logger, getLogLevel());
 }
  • SeekUtils
public static void seekOrRecover(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,
    Consumer<?, ?> consumer, MessageListenerContainer container, boolean commitRecovered,
    RecoveryStrategy recovery, LogAccessor logger, Level level) {}

可以看到有一個RecoveryStrategy參數,這個是消息消費失敗如何恢復,比如我們需要手動增加一個類似死信隊列的topic,這里消息消費失敗就會自動發送到我們的死信隊列

死信隊列的topic名字生成規則主要是topicName + -dlt

private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
  DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + "-dlt", cr.partition());

總結

可以看到如果我們單純的使用kafka-client原生的sdk來進行消息消費,是非常容易出現問題的。

我們需要很多細節,比如

  1. 消息消費失敗了如何處理,是否需要重試,如果重試還是失敗怎么辦?丟掉還是手動處理丟到自己創建的死信隊列中。
  2. 消費位點提交失敗了如何處理。
  3. 消費位點是使用同步提交還是異步提交?或者混合提交?

所以如果spring boot項目還是建議使用spring相關已經封裝好的kafka sdk。

非必要盡量不要使用原生的kafka-client sdk。

責任編輯:武曉燕 來源: 小奏技術
相關推薦

2015-04-30 10:12:13

開源云平臺OpenStack

2017-08-09 14:49:03

WebHTTPS瀏覽器

2014-08-25 10:17:54

數據中心管理

2021-03-29 13:00:50

代碼替換開發

2020-03-26 10:41:02

API網關大公司

2014-03-14 09:35:56

內存優化軟件內存優化

2015-06-24 10:32:13

訊鳥云計算會展

2016-01-07 10:17:48

2021-08-02 15:24:19

Windows 11Windows微軟

2014-03-21 15:30:06

產品經理PM能力

2023-12-28 12:07:21

2013-01-15 10:09:43

Windows Ser

2016-07-25 12:58:07

SDN路由故障排查

2014-07-09 09:06:33

SDN自動化

2010-08-04 09:20:31

JavaScript

2009-06-22 14:02:00

2019-05-17 09:33:50

圖像識別三維重建文本識別

2020-01-03 08:44:05

TCP網絡協議三次握手

2013-02-19 09:21:01

Win 8

2019-07-25 14:52:51

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产精品国产三级国产aⅴ浪潮 | 99久久亚洲 | 91在线电影| 欧美在线观看网站 | 黄色骚片| 久久九九影视 | 成人在线视频观看 | 男女下面一进一出网站 | 亚洲一av | 一区二区播放 | www.日日干 | 精品国产91| 国产成人短视频在线观看 | 久久高清精品 | 精品一区二区三区在线观看国产 | 国产一二三区免费视频 | 欧美国产视频 | 久久久久久综合 | 在线观看中文字幕 | 一区二区三区在线 | 欧 | 国产精品精品视频一区二区三区 | 久久专区 | 在线视频一区二区 | 久久国产一区二区 | www.四虎.com| 久久新视频| 欧洲一区二区三区 | 中文字幕av网站 | 日韩欧美在线免费观看视频 | 亚洲一区国产精品 | 久久噜噜噜精品国产亚洲综合 | 欧美成年人视频在线观看 | www.蜜桃av.com | 亚洲91精品 | 在线观看国产www | 91免费看片神器 | 日韩成人免费视频 | 九九热这里 | 日韩精品一区二区三区在线观看 | 在线日韩中文字幕 | 亚洲精品福利视频 |