救命!Java Web項目中的MQ消息堆積讓我抓狂
我之前參與開發了一個餐廳系統,該系統在午餐和晚餐高峰時段面臨巨大的并發需求。
為了確保系統順暢運行,公司要求各個部門在用餐時間輪流值班,以便及時解決在線問題。
我所在的團隊負責廚房展示系統,這是訂單系統的下游服務。
當用戶下單后,訂單系統會向我們的系統發送一條 Kafka 消息。我們的系統讀取消息,處理業務邏輯,保存訂單和菜品數據,然后在菜品管理客戶端上顯示出來。
通過這種方式,廚師們可以了解每個訂單所需的菜品,一旦有菜品準備好,系統會通知服務員上菜。
上菜后,服務員會更新菜品狀態,這樣用戶就能知道哪些菜已上,哪些菜還在準備中。
這個系統極大地提高了從廚房到顧客的效率。
圖片
這其中的關鍵是消息中間件 Kafka。如果它出現問題,將直接影響廚房展示系統的正常運作。
在本文中,我將分享我們在處理“消息積壓問題”時的經驗,希望能對你有所幫助。
初次遇到消息積壓問題
最初,我們的用戶量較小,系統上線后的一段時間里,消息隊列(MQ)的消息通信非常順暢。
隨著用戶量的增長,每個商戶每天都生成大量訂單數據,每個訂單包含多個菜品。這導致我們菜品管理系統的數據量顯著增加。
某天下午,我們收到了商戶的投訴,用戶下單后,菜品列表在平板上出現延遲。
廚房只有在幾分鐘后才能看到這些菜品。
我們立即開始調查原因。
菜品展示延遲問題通常與 Kafka 有關,因此我們首先檢查了 Kafka。
果不其然,存在 消息積壓。
消息積壓的常見原因有以下兩種:
- MQ 消費者服務宕機。
- MQ 生產者產生消息的速率超過 MQ 消費者消費消息的速率。
我檢查了監控系統,發現 MQ 消費者服務運行正常,沒有異常。
剩下的原因可能是 MQ 消費者的消息處理速度變慢了。
接下來,我檢查了菜品管理表,發現只有幾十萬條記錄。
似乎有必要優化 MQ 消費者的處理邏輯。
我在代碼中添加了一些日志,打印出 MQ 消費者中各個關鍵節點的耗時。
發現兩處存在明顯的延遲:
- 在 for 循環中逐條查詢數據庫的代碼。
- 進行多條件數據查詢的代碼。
我針對性地進行了優化:
對于 for 循環逐條查詢數據庫的代碼,我改成了使用參數集合的 批量查詢。
有時,我們需要查詢指定集合中的哪些用戶已經存在于數據庫中。實現方式如下:
publicList<User>queryUser(List<User> searchList){
if(CollectionUtils.isEmpty(searchList)){
returnCollections.emptyList();
}
List<User> result = Lists.newArrayList();
searchList.forEach(user -> result.add(userMapper.getUserById(user.getId())));
return result;
}
如果有 50 個用戶,這種方法需要查詢數據庫 50 次。眾所周知,每次數據庫查詢都是一次遠程調用。
查詢數據庫 50 次意味著需要進行 50 次遠程調用,耗時非常長。
那么,如何優化呢?
優化后的代碼如下:
publicList<User>queryUser(List<User> searchList){
if(CollectionUtils.isEmpty(searchList)){
returnCollections.emptyList();
}
List<Long> ids = searchList.stream().map(User::getId).collect(Collectors.toList());
return userMapper.getUserByIds(ids);
}
這種方式提供了一個基于用戶 ID 集合進行批量查詢的接口,僅需一次遠程調用就可以獲取所有數據。
對于多條件數據查詢,我添加了一個 組合索引,解決了這一問題。
經過這些優化后,MQ 消費者的消息處理速度顯著提升,成功解決了消息積壓問題。
再次遇到消息積壓問題
幾個月后,我們再次遇到了消息積壓問題。
這次問題是偶發性的,只在某些時候出現,大部分時間沒有問題。
積壓持續時間很短,對用戶的影響較小,商戶也沒有投訴。
我檢查了菜品管理表,此時已有幾百萬條記錄。
通過監控和 DBA 提供的每日慢查詢郵件,我發現了一些異常。
我注意到有些 SQL 語句的 WHERE 條件相同,但僅參數值不同,卻使用了不同的索引。
例如,order_id=123 使用索引 a,而 order_id=124 使用索引 b。
該表存在多種查詢場景,為滿足不同的業務需求,添加了多個組合索引。
MySQL 根據以下幾種因素來選擇索引:
- 通過數據采樣估算掃描的行數。掃描的行數越多,I/O 操作和 CPU 使用率越高。
- 是否使用臨時表,臨時表會影響查詢速度。
- 是否需要排序,排序也會影響查詢速度。
綜合這些因素,MySQL 優化器選擇其認為最合適的索引。
MySQL 優化器通過采樣估算掃描的行數,這涉及從數據頁中選擇一些進行統計估算,這種方法會帶來一定的誤差。
由于 MVCC 機制,數據頁存在多個版本,例如被刪除的數據在其他事務中仍然可見,因此索引并未真正刪除。這可能導致統計數據不準確,影響優化器的決策。
這些因素可能導致 MySQL 在執行 SQL 語句時 選擇了錯誤的索引。
即便使用索引 a 更為高效,MySQL 也可能使用了索引 b。
為了解決 MySQL 選擇錯誤索引的問題,我們使用了 FORCE INDEX 關鍵字,強制 SQL 查詢使用索引 a。
經過這一優化,消息的輕微積壓問題也得到了解決。
第三次遭遇消息積壓
六個月后,某天晚上大約六點,幾位商家投訴菜品管理系統出現延遲。
他們反饋說下單后菜品要過幾分鐘才會顯示。
檢查監控系統后,我發現 Kafka 消息再次堆積了。
我復查了 MySQL 的索引,發現索引是正確的,但數據查詢依然很慢。
接著查看了菜品管理表,發現表中的數據量在短短六個月內竟然增長到了 3000 萬條記錄。
通常情況下,當單表數據過多時,查詢和寫入性能都會下降。
這次查詢變慢的原因就是因為數據量過于龐大。
為了解決這個問題,我們需要:
- 實施數據庫和表的分區
- 備份歷史數據
在當時,實施數據庫和表分區的成本太高,而且商家數量尚不足以支持這樣的解決方案。
因此,我們決定備份歷史數據。
在與產品經理和 DBA 討論后,我們決定菜品管理表僅保留最近 30 天的數據,超過 30 天的數據會被移動到一個“歷史表”中。
經過這個優化后,菜品管理表在 30 天內只積累了幾百萬條數據,對性能的影響較小。
這樣一來,消息堆積問題得以解決。
第四次遇到消息堆積問題
在上述優化之后,系統長時間內運行順利,沒有出現消息堆積的問題。
然而,一年后的一天下午,一些商家又來投訴。
我查閱了公司郵件,發現有大量關于 Kafka 消息堆積的監控告警郵件。
由于我當時正在開會,錯過了這些告警。
這次問題的時間點比較奇怪。
通常高并發都是在午餐或晚餐的高峰時段,但這次消息堆積卻發生在“下午”。
這很不尋常。
一開始,我沒有任何線索能找到問題的原因。
于是,我詢問了訂單團隊是否在下午發布了新版本或者執行了某些特定操作。
因為我們的菜品管理系統是他們的下游系統,直接和他們的操作相關。
一位同事提到,半小時前他們進行了一個批量更新數萬個訂單狀態的作業。
更改訂單狀態會自動發送 MQ 消息。
這導致他們的程序在極短時間內產生了大量 MQ 消息。
我們的 MQ 消費端無法快速處理這些消息,因而出現了消息堆積。
我們查看了 Kafka 消息堆積情況,發現有數十萬條消息在排隊等待處理。
為了快速提高 MQ 消費端的處理速度,我們考慮了兩個解決方案:
- 增加分區數量。
- 使用線程池處理消息。
然而,由于消息已經堆積在現有分區中,增加新的分區并不會有太大幫助。
因此,我們決定重構代碼,使用線程池來處理消息。
為了解決堆積消息,我們將線程池的核心線程數和最大線程數增加到 50。
這些參數是可以動態配置的。
經過這個調整后,堆積的數十萬條消息在大約 20 分鐘內被處理完畢。
這次突發的消息堆積問題得到了妥善解決。
解決問題后,我們保留了線程池的消息消費邏輯,將核心線程數設置為 8,最大線程數設置為 10。
這樣在遇到消息堆積問題時,我們可以臨時調整線程數以快速應對,而不會對用戶造成明顯影響。
注意:使用線程池消費 MQ 消息并不是一個通用的解決方案。它存在一些缺點,比如可能會導致消息順序性問題,以及服務器 CPU 使用率的飆升風險。此外,如果在多線程環境中調用第三方接口,可能會造成第三方服務的超負荷甚至崩潰。
總結來說,MQ 消息堆積不是一個簡單的問題。
根本原因是 MQ 生產端的消息生產速率超過了消費端的消息消費速率,但具體原因可能有多種。
在實際場景中,我們需要根據不同的業務情況進行優化。
對 MQ 隊列消息堆積的監控和告警至關重要,能夠及時發現問題。
沒有完美的解決方案,只有最適合當前業務場景的方案。