生產事故!Kafka 消費延遲十小時,我用這三招起死回生
兄弟們,凌晨兩點,手機像被踩了尾巴的貓一樣狂震。我迷迷糊糊摸到手機,鎖屏上跳出運維監控群的99+消息——某核心業務的Kafka消費延遲突破10小時大關,像脫韁的野馬在報警大屏上狂奔。作為負責這個系統的背鍋俠,我瞬間清醒,套上拖鞋就往公司趕,心里暗罵:“Kafka你個老六,平時挺穩當的,怎么突然給我整這出?”
一、事故現場:當訂單堆積成“珠穆朗瑪”
到公司打開監控頁面,好家伙,Kafka消費組的Lag數值像坐了火箭,直接竄到120萬條。再看業務系統,訂單處理模塊的吞吐量幾乎歸零,數據庫里待處理的訂單表像個被吹脹的氣球,隨時可能爆炸。客服部門已經傳來戰報,用戶投訴量直線上升,說下單后半天收不到確認短信,還以為自己遇到了詐騙APP。
趕緊連到Kafka服務器,用kafka-consumer-groups.sh
命令一查,發現問題出在某個關鍵Topic的消費組上。這個Topic平時承載著用戶下單、支付、物流等核心事件,下游有10多個消費者組在消費,偏偏我們這個組掉了鏈子。再仔細看消費者實例,明明配置了8個實例,怎么每個實例的消費速率都低得可憐?每秒處理量不到200條,而生產端的消息寫入速率可是穩定在5000條/秒,這就好比一個水龍頭開得嘩嘩響,下面接水的杯子卻只有針眼大,不堵才怪。
抽絲剝繭:到底是誰拖了后腿?
剛開始懷疑是網絡問題,畢竟之前有過機房交換機故障導致吞吐量下降的先例。但登錄服務器一查,網卡流量連峰值的10%都沒到,帶寬穩穩的。再看CPU和內存,CPU利用率倒是不低,平均在70%左右,但內存還有一大半空閑,難道是CPU瓶頸?
不對啊,我們的消費者實例用的可是4核8G的配置,按理說處理這種量級的消息不該這么吃力。突然想到,Kafka消費者的性能和分區分配有很大關系。用list-consumer-groups
和describe-consumer-groups
命令一看,好家伙,8個消費者實例,居然有2個實例各分到了20個分區,剩下6個實例只分到5個分區。這就好比班里分作業,有的同學抱了一摞,有的同學卻只拿到可憐的幾本,忙的忙死,閑的閑死。
再深入分析消費者的日志,發現大量的時間花在了反序列化和業務處理上。我們用的是Avro序列化格式,按道理反序列化效率不低,但業務處理里有個坑:每處理一條消息,都要去調用3個不同的微服務接口,而且還是串行調用,每個接口的平均耗時居然超過200ms。這就相當于你吃個漢堡,非要先去買面包,再去煎肉餅,最后去摘生菜,每一步都得等上半天,效率能高才怪。
二、第一招:讓消費者“多線程搬磚”
既然問題出在分區分配不均和處理效率上,那就先從消費者的并行度下手。Kafka的消費者是通過分區來并行消費的,每個分區只能被同一個消費組中的一個消費者實例處理,所以合理分配分區是關鍵。
1. 調整分區分配策略
默認的分區分配策略是RangeAssignor
,這種策略在分區數量不能被消費者實例數整除時,容易導致分配不均。比如我們有100個分區,8個消費者,100÷8=12余4,前4個消費者會分到13個分區,后4個分到12個。雖然這次的分配更離譜,但本質還是分配策略的問題。我們換成RoundRobinAssignor
,它會把分區按順序輪流分配給消費者,能更均勻一些。
修改消費者配置,加上partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
,然后重啟消費者實例。神奇的事情發生了,每個實例分到的分區數基本一致,再也沒有“勞模”和“摸魚黨”之分了。
2. 增加消費者實例數
既然分區數量是100個,那消費者實例數最好和分區數匹配,或者是分區數的因數。我們之前用8個實例,和100不匹配,那就加到20個實例,這樣每個實例分到5個分區,壓力均勻多了。這里要注意,消費者實例數不能超過分區數,否則多余的實例會閑置。
3. 優化業務處理線程池
消費者處理消息是在poll循環里,默認是單線程處理。我們的業務處理耗時太長,必須用多線程來加速。在消費者的回調函數里,把消息丟到一個線程池里異步處理,這樣poll可以盡快去取下一批消息,不會被阻塞。
ExecutorService executor = Executors.newFixedThreadPool(10);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> processMessage(record));
}
consumer.commitAsync();
}
這里要注意線程池的大小不能太大,否則會占用太多資源,一般設置為CPU核心數的2倍左右比較合適。另外,異步處理時要做好異常處理,避免消息處理失敗后丟失。
三、第二招:給Kafka“松綁”,別讓它“背鍋”
處理完消費者端的問題,發現延遲下降了一些,但還是在5小時左右徘徊。這時候意識到,可能生產者端也有問題,或者Kafka本身的配置需要優化。
1. 檢查生產者批次大小
登錄生產者服務器,查看配置,發現batch.size
設置得太小,只有16KB。Kafka生產者會把消息攢成批次發送,批次太小會導致發送頻率過高,網絡開銷增大。我們把它調到160KB,這樣每次發送的消息更多,效率更高。不過也不能調得太大,否則會增加延遲,需要根據實際情況平衡。
2. 調整消費者拉取參數
消費者端的fetch.maxBytes
和fetch.max等待時間
也很重要。默認fetch.maxBytes
是50MB,可能太大了,導致消費者每次拉取需要處理很久。我們調到10MB,同時把fetch.max.wait.ms
從500ms調到200ms,讓消費者在沒有足夠數據時不用等太久,及時處理已有的數據。
3. 清理無效的舊數據
查看Kafka Topic的配置,發現retention.hours
設置為72小時,但我們的業務其實只需要保留24小時的數據。大量的舊數據堆積在磁盤上,不僅占用空間,還會影響消費者的拉取速度。趕緊修改配置,執行kafka-topics.sh --alter --topic my_topic --config retention.hours=24
,然后等待Kafka自動清理舊數據。不過要注意,清理過程中可能會對性能有一定影響,最好選擇業務低峰期操作。
四、第三招:給消息處理“減肥”,拒絕“無效勞動”
經過前兩招,延遲已經降到了2小時,但離我們的目標還有差距。這時候必須深入業務處理邏輯,看看有沒有可以優化的地方。
1. 合并微服務調用
之前每處理一條消息,都要串行調用3個微服務接口,總耗時超過600ms。其實這3個接口之間沒有嚴格的依賴關系,可以改成并行調用。用CompletableFuture來實現異步并行調用,然后合并結果。
CompletableFuture<Result1> future1 = CompletableFuture.supplyAsync(() -> callService1());
CompletableFuture<Result2> future2 = CompletableFuture.supplyAsync(() -> callService2());
CompletableFuture<Result3> future3 = CompletableFuture.supplyAsync(() -> callService3());
CompletableFuture.allOf(future1, future2, future3).join();
Result1 result1 = future1.get();
Result2 result2 = future2.get();
Result3 result3 = future3.get();
這樣總耗時降到了200ms左右,效率提升了3倍。
2. 增加本地緩存
有些頻繁調用的基礎數據,比如商品類目、用戶等級等,每次都去數據庫查詢,耗時很長。我們在消費者實例里增加了本地緩存,用Caffeine緩存,設置5分鐘的過期時間,減少數據庫訪問次數。這就好比你每天上班都要帶鑰匙,每次回家都要翻包找,不如在門口裝個密碼鎖,直接輸入密碼更快捷。
3. 跳過無效消息
通過監控發現,有一部分消息是重復的或者狀態無效的,比如已經取消的訂單再次發送確認消息。我們在消息處理前增加了一個過濾環節,先檢查消息的狀態,如果是無效的,直接跳過,不進行后續處理。這就像分揀快遞,先把明顯破損或者地址錯誤的包裹挑出來,剩下的再慢慢處理,效率自然提高。
五、勝利時刻:延遲從10小時到30分鐘的逆襲
經過這三招組合拳,凌晨五點,監控頁面上的Lag數值開始穩步下降,像泄了氣的氣球一樣,到早上八點,已經降到了30分鐘以內,吞吐量也恢復到了每秒5000條以上,和生產端基本持平。再看業務系統,訂單處理終于跟上了節奏,數據庫里的積壓訂單也慢慢消化完了。
六、復盤總結:這些坑以后別再踩
- 分區分配要均衡:根據分區數量合理設置消費者實例數,選擇合適的分配策略,避免“勞逸不均”。
- 業務處理要輕量:盡量減少同步阻塞操作,能用異步并行的就別串行,能緩存的就別頻繁查數據庫。
- 參數配置要調優:生產者和消費者的參數不是一成不變的,要根據實際吞吐量和延遲情況動態調整,比如
batch.size
、fetch.maxBytes
等。 - 監控報警要完善:這次事故能及時發現,多虧了完善的監控體系。以后要繼續優化監控指標,比如消費者延遲、吞吐量、CPU內存使用率等,設置合理的報警閾值。
Kafka雖然強大,但也不是萬能的。當遇到消費延遲問題時,不要慌,先冷靜分析原因,從消費者、生產者、業務處理三個維度去排查。記住,沒有最好的配置,只有最適合自己業務的配置。平時多做壓力測試,模擬高并發場景,提前發現潛在問題,才能在生產事故來臨時不慌不亂,從容應對。