關于Kafka消費者的這些參數,你應該要知道?
本文將對Kafka Consumer做一個簡單的介紹,是深入研究Kafka Conumer的一扇窗。主要從如下三個方面展開:
- 核心參數
- 核心組件
- 核心API
1、Kafka Consumer核心參數覽
個人覺得,要想深入了解Kafka Consumer的核心工作機制可以從它的核心參數切入,為后續深入了解它的隊列負載機制、消息拉取模型、消費模型、位點提交等機制打下基礎。
kafka Consumer的核心屬性定義在ConsumerConfig中。
1.1 基礎功能參數
- group.id
消費組名稱。
- client.id
客戶端標識id,默認為consumer-序號,在實踐中建議包含客戶端IP,在一個消費組中不能重復。
- bootstrap.servers
broker服務端地址列表。
- client.dns.lookup
客戶端尋找bootstrap地址的方式,支持如下兩種方式:
- resolve_canonical_bootstrap_servers_only
這種方式,會依據bootstrap.servers提供的主機名(hostname),根據主機上的名稱服務返回其IP地址的數組(InetAddress.getAllByName),然后依次獲取inetAddress.getCanonicalHostName(),再建立tcp連接。
一個主機可配置多個網卡,如果啟用該功能,應該可以有效利用多網卡的優勢,降低Broker的網絡端負載壓力。
- use_all_dns_ips
這種方式會直接使用bootstrap.servers中提供的hostname、port創建tcp連接,默認選項。
- enable.auto.commit
是否開啟自動位點提交,默認為true。
- auto.commit.interval.ms
如果開啟自動位點提交,位點的提交頻率,默認為5s。
- partition.assignment.strategy
消費端隊列負載算法,默認為按區間平均分配(RangeAssignor),可選值:輪詢(RoundRobinAssignor)
- auto.offset.reset
重置位點策略,但kafka提交位點時,對應的消息已被刪除時采取的恢復策略,默認為latest,可選:earliest、none(會拋出異常)。
- key.deserializer
使用的key序列化類
- value.deserializer
消息體序列化類
- interceptor.classes
消費端攔截器,可以有多個。
- check.crcs
在消費端時是否需要校驗CRC,默認為true。
1.2 網絡相關參數
- send.buffer.bytes
網絡通道(TCP)的發送緩存區大小,默認為128K。
- receive.buffer.bytes
網絡通道(TCP)的接收緩存區大小,默認為32K。
- reconnect.backoff.ms
重新建立鏈接的等待時長,默認為50ms,屬于底層網絡參數,基本無需關注。
- reconnect.backoff.max.ms
重新建立鏈接的最大等待時長,默認為1s,連續兩次對同一個連接建立重連,等待時間會在reconnect.backoff.ms的初始值上成指數級遞增,但超過max后,將不再指數級遞增。
- retry.backoff.ms
重試間隔時間,默認為100ms。
- connections.max.idle.ms
連接的最大空閑時間,默認為9s。
- request.timeout.ms
請求的超時時間,與Broker端的網絡通訊的請求超時時間。
1.3 核心工作參數
- max.poll.records
每一次poll方法調用拉取的最大消息條數,默認為500。
- max.poll.interval.ms
兩次poll方法調用的最大間隔時間,單位毫秒,默認為5分鐘。如果消費端在該間隔內沒有發起poll操作,該消費者將被剔除,觸發重平衡,將該消費者分配的隊列分配給其他消費者。
- session.timeout.ms
消費者與broker的心跳超時時間,默認10s,broker在指定時間內沒有收到心跳請求,broker端將會將該消費者移出,并觸發重平衡。
- heartbeat.interval.ms
心跳間隔時間,消費者會以該頻率向broker發送心跳,默認為3s,主要是確保session不會失效。
- fetch.min.bytes
一次拉取消息最小返回的字節數量,默認為1字節。
- fetch.max.bytes
一次拉取消息最大返回的字節數量,默認為1M,如果一個分區的第一批消息大小大于該值也會返回。
- max.partition.fetch.bytes
一次拉取每一個分區最大拉取字節數,默認為1M。
- fetch.max.wait.ms
fetch等待拉取數據符合fetch.min.bytes的最大等待時間。
- metadata.max.age.ms
元數據在客戶端的過期時間,過期后客戶端會向broker重新拉取最新的元數據,默認為5分鐘。
- internal.leave.group.on.close
消費者關閉后是否立即離開訂閱組,默認為true,即當客戶端斷開后立即觸發重平衡。如果設置為false,則不會立即觸發重平衡,而是要等session過期后才會觸發。
2、KafkaConsumer核心組件與API
通過KafkaConsumer核心參數,我們基本可以窺探Kafka中的核心要點,接下來再介紹一下KafkaConsumer的核心組件,為后續深入研究Kafka消費者消費模型打下基礎。
2.1 核心組件
KafkaConsumer由如下幾個核心組件構成:
- ConsumerNetworkClient
消費端網絡客戶端,服務底層網絡通訊,負責客戶端與服務端的RPC通信。
- ConsumerCoordinator
消費端協調器,在Kafka的設計中,每一個消費組在集群中會選舉一個broker節點成為該消費組的協調器,負責消費組狀態的狀態管理,尤其是消費組重平衡(消費者的加入與退出),該類就是消費者與broker協調器進行交互。
- Fetcher
消息拉取。
溫馨提示:本文不打算對每一個組件進行詳細解讀,這里建議大家按照本文第一部分關于各個參數的含義,然后對照這些參數最終是傳resume遞給哪些組件,進行一個關聯思考。
2.2 核心API概述
最后我們再來看一下消費者的核心API。
- Set< TopicPartition> assignment()
獲取該消費者的隊列分配列表。
- Set< String> subscription()
獲取該消費者的訂閱信息。
- void subscribe(Collection< String> topics)
訂閱主題。
- void subscribe(Collection< String> topics, ConsumerRebalanceListener callback)
訂閱主題,并指定隊列重平衡的監聽器。
- void assign(Collection< TopicPartition> partitions)
取代 subscription,手動指定消費哪些隊列。
- void unsubscribe()
取消訂閱關系。
- ConsumerRecords
poll(Duration timeout)
拉取消息,是 KafkaConsumer 的核心方法,將在下文詳細介紹。
- void commitSync()
同步提交消費進度,為本批次的消費提交,將在后續文章中詳細介紹。
- void commitSync(Duration timeout)
同步提交消費進度,可設置超時時間。
- void commitSync(Map
offsets)
顯示同步提交消費進度, offsets 指明需要提交消費進度的信息。
- void commitSync(final Map
offsets, final Duration timeout)
顯示同步提交消費進度,帶超時間。
- void seek(TopicPartition partition, long offset)
重置 consumer#poll 方法下一次拉消息的偏移量。
- void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
seek 方法重載方法。
- void seekToBeginning(Collection< TopicPartition> partitions)
將 poll 方法下一次的拉取偏移量設置為隊列的初始偏移量。
- void seekToEnd(Collection< TopicPartition> partitions)
將 poll 方法下一次的拉取偏移量設置為隊列的最大偏移量。
- long position(TopicPartition partition)
獲取將被拉取的偏移量。
- long position(TopicPartition partition, final Duration timeout)
同上。
- OffsetAndMetadata committed(TopicPartition partition)
獲取指定分區已提交的偏移量。
- OffsetAndMetadata committed(TopicPartition partition, final Duration timeout)
同上。
- Map metrics()
統計指標。
- List< PartitionInfo> partitionsFor(String topic)
獲取主題的路由信息。
- List< PartitionInfo> partitionsFor(String topic, Duration timeout)
同上。
- Map listTopics()
獲取所有 topic 的路由信息。
- Map listTopics(Duration timeout)
同上。
- Set< TopicPartition> paused()
獲取已掛起的分區信息。
- void pause(Collection< TopicPartition> partitions)
掛起分區,下一次 poll 方法將不會返回這些分區的消息。
- void resume(Collection< TopicPartition> partitions)
恢復掛起的分區。
- Map
offsetsForTimes(MaptimestampsToSearch)
根據時間戳查找最近的一條消息的偏移量。
- Map
offsetsForTimes(MaptimestampsToSearch, Duration timeout)
同上。
- Map
beginningOffsets(Collection< TopicPartition> partitions)
查詢指定分區當前最小的偏移量。
- Map
beginningOffsets(Collection< TopicPartition> partitions, Duration timeout)
同上。
- Map
endOffsets(Collection< TopicPartition> partitions)
查詢指定分區當前最大的偏移量。
- Map
endOffsets(Collection< TopicPartition> partitions, Duration timeout)
同上。
- void close()
關閉消費者。
- void close(Duration timeout)
關閉消費者。
- void wakeup()
喚醒消費者。
Kafka提供的消費者并不像RocketMQ提供了Push模式自動拉取消息,需要應用程序自動組織這些API進行消息拉取。
值得注意的kafka消費者也支持位點自動提交機制,kafka的消費者(KafkaConsumer)對象是線程不安全的。
基于KafkaConsumer的pause(暫停某些分區的消費)與resume(恢復某些分區的消費),可以輕松實現消費端限流機制。
本文主要是對消費者有一個大概的了解,后續文章將持續逐一解開消費者的核心運作機制,請持續關注。
本文轉載自微信公眾號「中間件興趣圈」,可以通過以下二維碼關注。轉載本文請聯系中間件興趣圈公眾號。