記一次線程池使用不當觸發(fā)死鎖導致RocketMQ消費停滯
背景
團隊小伙伴突然找到我們團隊說,不得了了,線上的RocketMQ又出現(xiàn)了消費停滯,怎么辦? 要不要我們先重啟一下
其實早在之前也出現(xiàn)過一次,當時為了快速恢復業(yè)務的,就直接重啟解決的。
這次因為堆積量不多,所以想對運行環(huán)境進行一些環(huán)境快照保留。所以就和業(yè)務方溝通是否看見接受短暫的消息延時
得到肯定答案后就是放手干吧!
問題定位
首先我們要確定業(yè)務反饋的是否屬實,所以需要去RocketMQ dashboard上看看具體的消費進度。
圖片
可以看到consumer中并不是全部的queue消費都堆積了,只有一個queuq消息堆積了。
這個消費者訂閱的topic是分區(qū)有序的,正常來說分區(qū)有序,如果某個分區(qū)的消息單條消息出現(xiàn)了消費異常,必須要等這條消息消費成功(或者是重試結束)后才能繼續(xù)消費后面的消息。
有時候會因為這個原因出現(xiàn)消息堆積是正常的,但是業(yè)務對消息重試進行了合理的設置,設置的重試次數(shù)比較合理,不會出現(xiàn)長時間的堆積。
RocketMQ的消費線程
一般出現(xiàn)這種問題很明顯就是線程出現(xiàn)了死鎖或者僵死之類的情況。
熟悉RocketMQ的都知道RocketMQ消費消息主要是依賴1個線程1個線程池。
- 以PullMessageService開頭的線程, 主要用來拉去消息
圖片
- 以ConsumeMessageThread開頭的線程(實際是一個線程池),主要用來執(zhí)行消費邏輯。
圖片
直到了RocketMQ的消費線程模型后我們就好解決了。我們直接通過jstack命令查看線程的堆棧信息。
線程快照分析
我們直接通過jstack命令生成線程快照。
jstack <pid> > thread_dump_$(date +%Y%m%d_%H%M%S).txt
pid 和后面的 thread_dump_$(date +%Y%m%d_%H%M%S).txt自己隨便取個名字就行。自己記得就行。
由于應用運行在pod中,生成了我們就下載到本地。
我們自己看還是比較難分析出分體。這里我們直接使用一個在線的網(wǎng)站進行線程快照的分析。
fastthread
fastthread是一個在線的線程快照分析工具,可以直接將線程快照上傳到這個網(wǎng)站進行分析。
圖片
我們上傳我們下載的線程快照文件。
然后進行線程分析:
圖片
很快定位到阻塞其他線程的代碼。
這里的代碼被我打碼了。
arhtas
如果我們使用arthas也可以很方便的找到阻塞的線程。
thread -b
arthas 提供了thread -b, 一鍵找出那個罪魁禍首。
問題元兇找到
通過阻塞代碼我們很快定位到是由于線程池使用不當導致的阻塞。
線程池使用不當
什么情況下會出現(xiàn)線程池使用不當導致的"死鎖"呢?
我們看看下面的demo:
public class XiaoZouExample {
public static void main(String[] args) {
ExecutorService executor = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
// 提交兩個外部任務
for (int i = 0; i < 2; i++) {
executor.submit(new OuterTask(executor));
}
// 等待一段時間后關閉線程池
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
}
static class OuterTask implements Runnable {
private final ExecutorService executor;
public OuterTask(ExecutorService executor) {
this.executor = executor;
}
@Override
public void run() {
System.out.println("小奏技術 Outer task started by thread: " + Thread.currentThread().getName());
// 創(chuàng)建一個Future來等待內(nèi)部任務的結果
Future<?> future = executor.submit(new InnerTask());
try {
// 等待內(nèi)部任務完成
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("小奏技術 Outer task finished by thread: " + Thread.currentThread().getName());
}
}
static class InnerTask implements Runnable {
@Override
public void run() {
System.out.println("小奏技術 Inner task started by thread: " + Thread.currentThread().getName());
try {
// 模擬長時間運行的任務
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("小奏技術 Inner task finished by thread: " + Thread.currentThread().getName());
}
}
}
- 運行結果
圖片
可以看到?jīng)]有任何任務執(zhí)行完成,線程池一直處于被阻塞狀態(tài)。
核心原因就是首先線程池的核心線程數(shù)是2,核心線程用來執(zhí)行2個任務,用完了所有線程。
然后在核心線程執(zhí)行的2個任務中又用原來的線程池進行執(zhí)行任務,這時候因為沒有線程可以去執(zhí)行任務了,所以會添加到阻塞隊列中等待核心線程執(zhí)行完任務后再執(zhí)行。
但是核心線程想要釋放任務又必須等待這兩個子任務執(zhí)行完,這樣就形成了一個死鎖。
解決方案
解決方式有多種,最簡單的方式可以考慮不要使用隊列,直接使用SynchronousQueue。
ExecutorService executor = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>());
讓多的任務直接通過主線程執(zhí)行或者丟棄任務。
當然最好的方式就是避免這種情況的發(fā)生,合理的使用線程池,不要線程池中的任務還要使用這個線程池去執(zhí)行任務。
這種情況是比較難避免的,因為現(xiàn)在大部分業(yè)務開發(fā)都是隱式使用線程池,自己也不知道自己用的哪個線程池。
比如spring的@Async注解,@Scheduled注解等等。
總結
線程池的使用還是要盡量消息,要避免出現(xiàn)線程池中的任務繼續(xù)使用該線程池去執(zhí)行任務,出現(xiàn)死鎖。
也可以考慮對線程池進行監(jiān)控,避免出現(xiàn)大量任務阻塞。
這個問題想要復現(xiàn)需要大量任務并且超過核心線程數(shù)才能復現(xiàn),還是比較難復現(xiàn)的,只有線上大流量的時候才能復現(xiàn)。