除了對Kafka參數的調整,我們還要根據業務處理邏輯對消費者組進行提前規劃,避免為了方便將業務相關的topic同時劃分到同一個大消費者組,這樣一旦某個消費者出現問題,將會導致整個消費者組重新Rebalace。
需求
《Bug:Zabbix對Kafka topic積壓數據監控》一文我們通過監控lag來對Kafka某個分區topic的消費情況進行告警。通過告警我們發現,分區topic的消費積壓情況告警非常頻繁,這無疑會引起開發、運維的重點關注。經過一系列的監控、摸索、實踐、總結,我們逐步發現分區topic的消費積壓和以下幾種情況有關:
- 消費者組頻繁出現Rebalance,導致整個消費者組下的topic都無法消費;
- 消費者性能問題,無法在超時時間內完成消費;
- topic分區數和消費者數量不均衡,一個消費者需要消費多個分區topic,消費緩慢;
- topic分區數量變化;
- 等等
從以上幾種情況分析,無論哪種都和消費者組Rebalance有相關性,都是在經過Rebalance后再重新消費。因此我們還得從Rebalance的角度再出發。
Rebalance再出發
其中關于消費者性能問題,這大多和客戶端的參數設置不恰當相關,這是運維比較難覺察導致。但是為了更全面的了解Kafka,我們運維還是很有必要去輕了解下的。先從相關參數說起:
# 消費者每次poll()最大消費消息數量,默認500條
max.poll.records
# 兩次poll()之間的最大間隔,默認值為5分鐘
max.poll.interval.ms
這個參數定義了兩次poll()之間的最大間隔,「默認值為5分鐘」。如果業務處理消息時間過長,則會導致兩次poll()的時間間隔大于超時時間,從而觸發Rebalance。因此我們應該適當調整每次poll()的數量,以保證在規定時間內處理完消息,這就需要關注max.poll.records參數了。
這個參數定義了poll()方法最多可以返回多少條消息,「默認值為500」。poll()的數量如何定義,需要根據業務處理邏輯來決定,例如數據要經過多個數據源進行處理,一旦某一數據源訪問超時,無疑都會降低消費效率。比較友好的解決方案是,開發可以根據不同的情況實時調整相關參數,應用側動態感知并進行自動熱加載,達到快速調整消費的效果。
除了對Kafka參數的調整,我們還要根據業務處理邏輯對消費者組進行提前規劃,避免為了方便將業務相關的topic同時劃分到同一個大消費者組,這樣一旦某個消費者出現問題,將會導致整個消費者組重新Rebalace。如果Rebalance時間過長,此時所有的topic無法消費,那么實時業務將會受到很大的影響。因此我們要合理分配topic到不同的消費者組。
監控
經過以上的探索分析,我們的首要任務應該是監控Kafka消費者組是否處于Rebalance狀態,進而確定:
分區消費者是否發生切換,此時消費者數量不變;
分區消費者數量是否減少,出現一個消費者同時消費多個分區topic;
分區數量和消費者是否為1:1關系,避免出現消費者和分區數量不一致的情況;
1.監控思路
在多消費者組的情況下,我們不僅要監控每個消費者組的Rebalance的狀態,還要考慮到未來消費者組的擴展,因此我們希望可以通過配置文件的形式對消費者進行自動發現并添加監控。在此我們仍然是通過Zabbix自動發現,實現對每個消費者組的Rebalance狀態進行監控告警。
2.消費者組自動發現
由于此配置文件和Kafk topic監控復用同一個文件,通過Zabbix可對特定消費者組(Group)進行去重識別并行自動發現。
# 自動發現
vim consumer-groups.conf
#按消費者組(Group)|Topic格式,寫入自動發現配置文件
test-group|test
# 執行腳本自動發現所有的消費者分組
bash consumer-groups-rebalance.sh discovery
{
"data": [
{ "{#GROUP}":"test-group" }
]
}
3.獲取消費者組Rebalance狀態
# 獲取rebalance狀態,0代表沒有rebalance,1代表處于rebalance
[root#~] bash consumer-groups-rebalance.sh status test-group
0
4.最終腳本
#!/bin/bash
#comment: 查詢消費者組狀態,如果出現reabalance則進行告警
#配置文件說明,和topic lag監控共用一套配置文件
#消費者組|Topic
#test-group|test
#加載環境變量
export JAVA_HOME=/usr/local/jdk1.8.0_261
export JRE_HOME=$JAVA_HOME/jre
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH
#group自動發現
group_discovery() {
printf "{\n"
printf "\t\"data\": [\n"
m=0
num=`cat /etc/zabbix/monitor_scripts/consumer-groups.conf |awk -F'|' '{print $1}'|sort|uniq|wc -l`
for group in `cat /etc/zabbix/monitor_scripts/consumer-groups.conf|awk -F'|' '{print $1}'|sort|uniq`
do
m=`expr $m + 1`
#判斷最后一行
if [ $m -eq $num ]; then
printf "\t\t{ \"{#GROUP}\":\"${group}\" }\n"
else
printf "\t\t{ \"{#GROUP}\":\"${group}\" },\n"
fi
done
printf "\t]\n"
printf "}\n"
}
if [ $1 == "discovery" ]; then
group_discovery
elif [ $1 == "status" ];then
/usr/local/kafka/bin/./kafka-consumer-groups.sh --bootstrap-server 10.10.10.233:9092 --describe --group $2 > /tmp/$2 2>&1
cat /tmp/$2 |grep rebalanc |wc -l
else
echo "Usage: /etc/zabbix/monitor_scripts/consumer-group.sh discovery | lag"
fi
5.Zabbix自動發現

- 監控項原型 以消費者組定義監控項名稱,告警信息中的名稱能夠幫助我們快速定位配置。

- 觸發器配置 告警觸發時,能夠通過告警信息快速定位kafka 消費者組故障。

告警主機:Kafka_192.168.3.55
主機IP:192.168.3.55
主機組:Kafka
告警時間:2022.11.11 11:22:00
恢復時間:2022.11.11 11:23:02
告警等級:Warning
告警信息:消費者組test-group:處于rebalance狀態
告警項目:group_status[test-group]
問題詳情:
blaze-route: 1
其他運維問題簡單處理
# 1.手動消費某topic積壓的消息
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 10.10.10.233:9092 --topic test --group test-group
# 2.調整kafka某個topic的數據有效期,有效釋放磁盤空間
/usr/local/kafka/bin/kafka-topics.sh -zookeeper 10.10.10.233:9092 --topic test-group --alter --config retention.ms=79200000
# 3.調整kafka某個topic的分區數量
/usr/local/kafka/bin/kafka-topics.sh -zookeeper 10.10.10.233:9092 --topic test-group --alter --partitions 4
以上是在使用Kafka過程中比較常用的幾個命令使用方式。