RocketMQ-沒有消費者的消息堆積場景分析
問題描述
訂閱關(guān)系
控制臺上沒有訂閱關(guān)系信息:Topic、過濾規(guī)則均為空。
消費者狀態(tài)
沒有消費者實例信息,消息在不斷堆積。
分析過程
初步判斷
為了便于表達和理解,我們只關(guān)注與該問題有關(guān)的部分邏輯。
因為消息堆積量不斷在增加,所以判斷該Group ID已經(jīng)在Broker上有了訂閱關(guān)系,很可能是使用該Group ID的Consumer實例下線后沒有取消訂閱關(guān)系導(dǎo)致的,如圖:
初步判斷-Consumer未取消訂閱關(guān)系
正常運行
在正常情況下,控制臺上可以看到Group ID的【訂閱關(guān)系】及【消費者狀態(tài)】,如圖:
正常情況-訂閱關(guān)系
正常情況-消費者狀態(tài)
異常之后
異常之后就變成了【問題描述】中的樣子,此時我們不清楚:
- 該GID訂閱了哪個topic
- 該GID被哪個應(yīng)用消費者使用后出現(xiàn)的異常
- 該GID對應(yīng)的消息生產(chǎn)者是哪個
在以上事情沒有弄清楚之前,也不敢對該GID做取消訂閱、刪除之類的操作。
確定topic
消息堆積是通過消費者的offset信息統(tǒng)計的,該信息存儲在Broker上的store/config/consumerOffset.json中,consumerOffset.json格式如圖:
Broker - consumerOffset.json
我們在consumerOffset.json文件中找到了GID對應(yīng)的topic,此處有個細節(jié)(后面代碼處有解釋):
- 該GID在groupTopicMap中沒有重試隊列Topic
- 該GID在offsetTable中沒有重試隊列Topic上的offset
確定Producer
通過Topic查詢Message
查詢消息
通過MessageID確定ECS IP
通過上面的查詢無法直接定位到ECS,我們可以通過Message ID計算出ECS IP,方法如下:
如果懶得寫代碼,也可以使用arthas來查詢:
arthas查詢消息
此時整個鏈路逐漸清晰起來了,還缺少最關(guān)鍵的Consumer信息。
確定Consumer
代碼Review
查詢了近期發(fā)版的所有代碼,沒有找到與該GID相關(guān)的信息。
Broker端找線索
我們試圖通過Broker端的日志來確認兩件事情:
- 該GID的Consumer在什么時候從哪些IP建立了與Broker的交互
- 該GID的Consumer在什么時候從哪些IP斷開了與Broker的交互
Broker heartBeat
Broker - Consumer心跳處理邏輯
通過以上代碼打印的日志,我們可以過濾出該GID與Broker建立交互時候的相關(guān)信息。
Broker unregisterClient
在Consumer實例shutdown的時候,會向Broker發(fā)送unregisterClient請求,會調(diào)用ConsumerManager中相應(yīng)的unregisterConsumer方法:
Broker - Consumer取消注冊
通過以上代碼打印的日志,我們可以過濾出該GID與Broker斷開交互時候的相關(guān)信息。
柳暗花明
同時我們也在想:除了程序,還有其他途徑變更這種訂閱關(guān)系嗎?答案是有的。
命令行
命令方式-重置消費位點
控制臺
控制臺 - 重置消費位點
到這里估計您已經(jīng)知道引起這次消息堆積的原因了。
經(jīng)驗總結(jié)
- 完善監(jiān)控告警、提高應(yīng)急響應(yīng)能力
- 最小權(quán)限原則
- RocketMQ控制臺是否應(yīng)該增加操作記錄的功能?