MQ 消息積壓怎么辦?如何實現零業務損失?五步應急方案避免業務雪崩
在使用消息隊列遇到的問題中,消息積壓這個問題,應該是最常遇到的問題了,消息積壓的直接原因,一定是系統中的某個部分出現了性能問題,來不及處理上游發送的消息,才會導致消息積壓。
在使用消息隊列時,如何來優化代碼的性能,避免出現消息積壓。然后再來看看,如果你的線上系統出現了消息積壓,該如何進行緊急處理,最大程度地避免消息積壓對業務的影響。
消息解壓的本質與根源分析
想要知道本質原因,我們需要知道消息生命周期的瓶頸全景圖。
圖片
總結下出現消息積壓的場景有以下三種:
- 生產端:突發流量紅方、網絡波動、序列化方式性能瓶頸
- Broker 端:磁盤 I/O、分區數不足,副本同步延遲。
- 消費端:消費線程不足、業務邏輯處理耗時阻塞、外部依賴超時。
圖片
“
Chaya:對于絕大多數使用消息隊列的業務來說,消息隊列本身的處理能力要遠大于業務系統的處理能力。
主流消息隊列的單個節點,消息收發的性能可以達到每秒鐘處理幾萬至幾十萬條消息的水平,還可以通過水平擴展 Broker 的實例數成倍地提升處理能力。
業務系統的業務邏輯遠比消息隊列要復雜,我們關注的核心是消費端業務邏輯的性能優化來比避免消息積壓。
生產端性能優化
發送端業務代碼的處理性能,實際上和消息隊列的關系不大,因為一般發送端都是先執行自己的業務邏輯,最后再發送消息。
如果說,你的代碼發送消息的性能上不去,你需要優先檢查一下,是不是發消息之前的業務邏輯耗時太多導致的。
如果發送端是一個微服務,主要接受 RPC 請求處理在線業務。很自然的,微服務在處理每次請求的時候,就在當前線程直接發送消息就可以了,因為所有 RPC 框架都是多線程支持多并發的,自然也就實現了并行發送消息。
如果是一個離線分析系統,離線系統在性能上的需求是什么呢?它不關心時延,更注重整個系統的吞吐量。
發送端的數據都是來自于數據庫,這種情況就更適合批量發送,你可以批量從數據庫讀取數據,然后批量來發送消息,同樣用少量的并發就可以獲得非常高的吞吐量。
“
余姐姐:有沒有一個架構方案,兩種場景都可以適應的極致性能優化方案?
姐姐真是貪心呀……
不管離線還是微服務處理業務業務邏輯發送消息,想要追求極致的發送性能,可以使用本地內存隊列緩沖架構優化。
圖片
關鍵優化策略
- 批量發送:合并小消息減少網絡 IO
- 數據壓縮:使用 Snappy/LZ4 減少傳輸量
- 異步確認:非阻塞等待 Broker 響應
- 分區選擇:基于業務鍵保證分區均勻
Broker 端優化
Broker 端的話,通常可以通過擴展分區、磁盤存儲優化、合理調整 Broker 參數實現。
最怕的就是有的公司引入了一些開源 MQ,在開源基礎上包了一層皮封裝的公司。
因為隨著時間的發展,原先開源的那套可能已經退出歷史舞臺,性能也很差,但是公司魔改過,很多業務系統都在使用,根本改不了。
磁盤優化
圖片
Kafka 分區動態擴容
圖片
關鍵配置優化(Kafka 3.x)
# Kafka黃金配置
# 網絡吞吐
num.network.threads=8 # 網絡線程數
queued.max.requests=1000 # 請求隊列大小
# 磁盤優化
num.io.threads=16 # IO線程數
log.flush.interval.messages=10000
log.flush.interval.ms=1000
# 內存管理
log.retention.bytes=-1 # 按容量保留
log.segment.bytes=1073741824 # 1GB段文件
消費端優化
余姐姐: 好的愛情不是一味地索取,更不是毫無意義的付出,而是互相成長。
消息隊列也是愛情的折射。
使用消息隊列的時候,大部分的性能問題都出現在消費端,如果消費的速度跟不上發送端生產消息的速度,就會造成消息積壓。最后系統崩塌。
所以消息的發送與消息的消費需要同頻。要是消費速度一直比生產速度慢,時間長了,整個系統就會出現問題,要么,消息隊列的存儲被填滿無法提供服務,要么消息丟失,這對于整個系統來說都是嚴重故障。
我們在設計系統的時候,一定要保證消費端的消費性能要高于生產端的發送性能,這樣的系統才能健康的持續運行。
消費端的性能優化除了優化消費業務邏輯以外,也可以通過水平擴容,增加消費端的并發數來提升總體的消費性能。
特別需要注意的一點是,在擴容 Consumer 的實例數量的同時,必須同步擴容主題中的分區(也叫隊列)數量,確保 Consumer 的實例數和分區數量是相等的。
“
Chaya:如果愛到了盡頭,被壓的喘不過氣了該怎么辦?同樣的,處理消息的業務邏輯很難再優化了,為了避免消息積壓,我能否先不處理消息,直接放到一個內存隊列就返回 ack?然后再啟動一些線程從內存隊列取消息處理。
有一種愛就做放手……當愛已成往事,你能做的只有交給時間去處理。
如果不能提高處理該消息的業務邏輯,只是放到一個內存隊列就返回 MQ ack,這是一種極其錯誤的實現方式。
為什么錯誤?因為會丟消息。如果收消息的節點發生宕機,在內存隊列中還沒來及處理的這些消息就會丟失。
消息積壓了該如何處理?
還有一種消息積壓的情況是,日常系統正常運轉的時候,沒有積壓或者只有少量積壓很快就消費掉了,但是某一個時刻,突然就開始積壓消息并且積壓持續上漲。
這種情況下需要你在短時間內找到消息積壓的原因,迅速解決問題才不至于影響業務。
“
Chaya:能導致消息積壓忽然增加,通常只有兩種情況:要么是發送變快了,要么是消費變慢了。
大部分消息隊列都內置了監控的功能,只要通過監控數據,很容易確定是哪種原因。
如果是單位時間發送的消息增多,比如說是趕上大促或者搶購,短時間內不太可能優化消費端的代碼來提升消費性能,唯一的方法是通過擴容消費端的實例數來提升總體的消費能力。
還有一種不太常見的情況,你通過監控發現,無論是發送消息的速度還是消費消息的速度和原來都沒什么變化,這時候你需要檢查一下你的消費端,是不是消費失敗導致的一條消息反復消費這種情況比較多,這種情況也會拖慢整個系統的消費速度。
總結
消息積壓治理的本質是資源與需求的動態平衡藝術,需要建立三層防御體系:
- 事前預防:通過容量規劃、代碼優化和壓力測試構建第一道防線
- 優化生產發送模式
- 合理設置分區數量
- 設計彈性消費架構
- 事中監控:建立全鏈路監控和智能預警系統
- 實時跟蹤生產/消費速率比
- 設置多級積壓閾值告警
- 可視化關鍵性能指標
- 事后應急:制定分級響應預案
- 輕度積壓:動態擴容消費者
- 中度積壓:限流+降級非核心
真正的消息專家不是讓系統永不積壓,而是當洪水來襲時,能在業務感知前完成疏導。
這要求我們在代碼優化、架構設計和應急預案三方面建立縱深防御體系。