Kafka Consumer 消費消息和 Rebalance 機制
Kafka Consumer
Kafka 有消費組的概念,每個消費者只能消費所分配到的分區的消息,每一個分區只能被一個消費組中的一個消費者所消費,所以同一個消費組中消費者的數量如果超過了分區的數量,將會出現有些消費者分配不到消費的分區。消費組與消費者關系如下圖所示:
consumer group
Kafka Consumer Client 消費消息通常包含以下步驟:
- 配置客戶端,創建消費者
- 訂閱主題
- 拉去消息并消費
- 提交消費位移
- 關閉消費者實例
過程
因為 Kafka 的 Consumer 客戶端是線程不安全的,為了保證線程安全,并提升消費性能,可以在 Consumer 端采用類似 Reactor 的線程模型來消費數據。
消費模型
Kafka consumer 參數
- bootstrap.servers:連接 broker 地址,host:port 格式。
- group.id:消費者隸屬的消費組。
- key.deserializer:與生產者的key.serializer對應,key 的反序列化方式。
- value.deserializer:與生產者的value.serializer對應,value 的反序列化方式。
- session.timeout.ms:coordinator 檢測失敗的時間。默認 10s 該參數是 Consumer Group 主動檢測 (組內成員 comsummer) 崩潰的時間間隔,類似于心跳過期時間。
- auto.offset.reset:該屬性指定了消費者在讀取一個沒有偏移量后者偏移量無效(消費者長時間失效當前的偏移量已經過時并且被刪除了)的分區的情況下,應該作何處理,默認值是 latest,也就是從最新記錄讀取數據(消費者啟動之后生成的記錄),另一個值是 earliest,意思是在偏移量無效的情況下,消費者從起始位置開始讀取數據。
- enable.auto.commit:否自動提交位移,如果為false,則需要在程序中手動提交位移。對于精確到一次的語義,最好手動提交位移
- fetch.max.bytes:單次拉取數據的最大字節數量
- max.poll.records:單次 poll 調用返回的最大消息數,如果處理邏輯很輕量,可以適當提高該值。但是max.poll.records條數據需要在在 session.timeout.ms 這個時間內處理完 。默認值為 500
- request.timeout.ms:一次請求響應的最長等待時間。如果在超時時間內未得到響應,kafka 要么重發這條消息,要么超過重試次數的情況下直接置為失敗。
Kafka Rebalance
rebalance 本質上是一種協議,規定了一個 consumer group 下的所有 consumer 如何達成一致來分配訂閱 topic 的每個分區。比如某個 group 下有 20 個 consumer,它訂閱了一個具有 100 個分區的 topic。正常情況下,Kafka 平均會為每個 consumer 分配 5 個分區。這個分配的過程就叫 rebalance。
什么時候 rebalance?
這也是經常被提及的一個問題。rebalance 的觸發條件有三種:
- 組成員發生變更(新 consumer 加入組、已有 consumer 主動離開組或已有 consumer 崩潰了——這兩者的區別后面會談到)
- 訂閱主題數發生變更
- 訂閱主題的分區數發生變更
如何進行組內分區分配?
Kafka 默認提供了兩種分配策略:Range 和 Round-Robin。當然 Kafka 采用了可插拔式的分配策略,你可以創建自己的分配器以實現不同的分配策略。
kafka 高頻面試題
- Kafka 有哪些命令行工具?你用過哪些?/bin目錄,管理 kafka 集群、管理 topic、生產和消費 kafka。
- Kafka Producer 的執行過程?攔截器,序列化器,分區器和累加器。
- Kafka Producer 有哪些常見配置?broker 配置,ack 配置,網絡和發送參數,壓縮參數,ack 參數。
- 如何讓 Kafka 的消息有序?Kafka 在 Topic 級別本身是無序的,只有 partition 上才有序,所以為了保證處理順序,可以自定義分區器,將需順序處理的數據發送到同一個 partition。
- Producer 如何保證數據發送不丟失?ack 機制,重試機制。
- 如何提升 Producer 的性能?批量,異步,壓縮。
- 如果同一 group 下 consumer 的數量大于 part 的數量,kafka 如何處理?多余的 Part 將處于無用狀態,不消費數據。
- Kafka Consumer 是否是線程安全的?不安全,單線程消費,多線程處理。
- 講一下你使用 Kafka Consumer 消費消息時的線程模型,為何如此設計?拉取和處理分離。
- Kafka Consumer 的常見配置?broker, 網絡和拉取參數,心跳參數。
- Consumer 什么時候會被踢出集群?奔潰,網絡異常,處理時間過長提交位移超時。
- 當有 Consumer 加入或退出時,Kafka 會作何反應?進行 Rebalance。
- 什么是 Rebalance,何時會發生 Rebalance?topic 變化,consumer 變化。