如何優雅地中斷 Java 線程
我們通過并發編程提升了系統的吞吐量,同時我們也希望并發運行的線程能夠及時停止并做好資源歸納,所以筆者就借此文來談談Java并發編程中線程中斷的藝術。
一、詳解Java中斷的哲學
1. 何時觸發中斷阻塞
按照操作系統對于線程的任務調度管理來說,當觸發以下幾種情況時線程就會阻塞而處于BLOCKED、WAITING、TIMED_WAITING幾種狀態: 第一種是線程執行IO請求,在等待IO資源返回,觸發阻塞,此時線程就處于BLOCKED狀態,例如服務端server執行serverSocket.accept()等待就緒的客戶端接入:
第二種則是等待條件為真期間,線程因此掛起等待notify通知或者通過sleep休眠,進而處于WAITING或者TIMED_WAITING:
new Thread(() -> ThreadUtil.sleep(3600), "t-0").start();
因為并發互斥原因,線程需要等待其它線程釋放監視鎖而進入BLOCKED阻塞態:
2. Java是如何響應中斷的
在操作系統中,線程的中斷方式一般分為以下兩種:
- 搶占式:當線程需要中斷時,直接強制讓線程立刻停止手里的任務
- 協作式:當線程需要中斷時,通過標識告知線程需要被中斷,線程輪詢檢查時看到這個標識就會直接中斷
而Java線程則是采用協作式中斷,即調用interrupt時其底層僅僅是將線程設置為可中斷狀態,等到線程主動檢查到線程標識被設置為中斷時,則觸發InterruptedException:
對應的我們以Linux為例給出JDK底層關于線程中斷函數interrupt的實現,即位于os_linux.cpp的interrupt方法,可以看到其底層本質上就是定位到java線程對應的os線程并將其中斷標識設置為true:
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
//設置os線程中斷標識為true
osthread->set_interrupted(true);
//......
}
//......
}
同時我們也給出處于sleep休眠狀態的線程響應中斷的源碼,同樣是以Linux為例的線程封裝類os_linux.cpp下的sleep函數,可以看到進行休眠時其底層在明確知曉線程可被中斷的時候,就會在for循環中知曉休眠并定期檢查可中斷狀態:
int os::sleep(Thread* thread, jlong millis, bool interruptible) {
//......
if (interruptible) {
jlong prevtime = javaTimeNanos();
for (;;) {
//循環中感知到中斷直接返回OS_INTRPT標識
if (os::is_interrupted(thread, true)) {
return OS_INTRPT;
}
}
} else {
//......
}
}
3. 線程中斷的守則
通常來說我們對線程中斷響應度越高,就越容易處理并優雅的完成兜底動作,一般來說,在處理線程中斷時一般會出現如下兩種情況:
- 當前代碼層面對象實例不具備處理該中斷異常
- 處于線程內部的run方法感知到中斷無法向上拋出
針對情況1,本質上就是權責上的轉移,如果當前業務層面不具備處理此類異常的能力,那么就將異常向上層拋出傳遞給上層使用者:
public void sleep(int seconds) throws InterruptedException {
TimeUnit.SECONDS.sleep(seconds);
}
而情況2則相對麻煩一些,如果類似于Runnable 這種無法向上拋出的內置接口類的實現,我們則可以主動去捕獲中斷中斷異常,并將在完成必要的資源清理工作后,將當前線程打斷從而讓高層棧幀感知到這個異常中斷:
class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//執行當前線程資源清理
//打斷當前線程引發更高層線程響應此中斷
Thread.currentThread().interrupt();
}
}
}
二、Java線程中斷處理的一些實踐
1. 基于標識取消任務
我們先來說說基于自定義標識的方式中斷線程,即非java內置方法層面的協作式標識來停止線程,通過任務運行時輪詢檢測,一旦線程輪詢檢查看到中斷標識設置為true,則直接結束運行:
對應的我們給出自定義協作式中斷的實現,整體思路為:
- 采用cancelled標識中斷狀態,并用volatile保證可見性
- 內置初始化一個執行線程thread
- 對外暴露start方法,執行線程啟動
- 對外暴露cancel方法修改線程中斷狀態
- run方法執行業務邏輯,并定期輪詢檢查中斷標識,一旦標識被設置為true則退出循環,結束線程
public class Task implements Runnable {
/**
* 使用volatile修飾保證標識修改可見
*/
privatevolatileboolean cancelled = false;
privatefinal Thread thread = new Thread(this);
public void start() {
thread.start();
}
/**
* 停止時,通過cancel請求取消
*/
public void cancel() {
cancelled = true;
}
@Override
public void run() {
//取消標識檢測,如果取消則直接結束循環
while (!cancelled) {
System.out.println("running");
ThreadUtil.sleep(1000);
}
System.out.println("task cancelled");
}
}
對應的我們也給出這種方式的使用示例,可以看到我們的測試代碼會在5s后調用task暴露的任務取消方法完成線程中斷:
//線程啟動運行5s
Task task = new Task();
task.start();
//休眠5s后將task任務對應線程中斷
new Thread(()->{
ThreadUtil.sleep(5000);
task.cancel();
}).start();
而執行的輸出結果如下,是符合我們預期的:
running
running
running
running
running
running
task cancelled
當然這種做法也存在一定的弊端,即帶有阻塞性質的操作,任務可能出現永遠無法檢查取消標志,例如我們的線程在循環往阻塞阻塞隊列blockingQueue的put添加元素,一旦隊列空間達到容器上界,當前線程就會阻塞即無法執行到循環分支上:
對應我們也給出這段錯誤的樣例,即阻塞隊列添加操作后阻塞而走不到循環判斷:
private final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(100);
@Override
public void run() {
//取消標識檢測,如果取消則直接結束循環
while (!cancelled) {
System.out.println("running");
try {
queue.put(RandomUtil.randomInt(10));
} catch (InterruptedException e) {
//......
}
}
System.out.println("task cancelled");
}
測試代碼還是和上一小節一致,不多贅述,測試輸出結果如下,即第二次添加操作時發現隊列已滿而阻塞從而無法打斷:
running
running
2. 如何處理阻塞式的中斷
參考java內置方法Thread.sleep(1000);或者wait()方法,其底層都會針對外部中斷操作做檢測,一旦感知到中斷就會提前返回,即執行如下步驟:
- 清除中斷狀態
- 拋出InterruptedException讓被中斷線程感知異常
所以對于阻塞式中斷的的正確方式為:
通過合理的時機發出中斷請求,讓線程在下一個合適時候處理中斷
所以對于上述阻塞隊列操作來說,可以按照如下方式進行線程優雅中斷:
- 對外暴露interrupt方法打斷當前線程,確保阻塞隊列插入阻塞時依然可以利用內置方法完成線程打斷
- 線程感知中斷時不可直接拋出異常,而是利用異常捕獲將資源處理清楚,再次執行中斷循環監測,正常退出線程邏輯
對應我們給出改造后的代碼,可以看到我們將cancel改為調用線程的中斷方法將線程中斷,同時在感知到中斷異常時會將執行中斷后的兜底邏輯:
/**
* 停止時,通過cancel請求取消
*/
public void cancel() {
thread.interrupt();
}
@Override
public void run() {
//取消標識檢測,如果取消則直接結束循環
while (!Thread.currentThread().isInterrupted()) {
System.out.println("運行中......");
Integer element = RandomUtil.randomInt(10);
try {
queue.put(element);
} catch (InterruptedException e) {
//處理中斷
}
}
}
3. 合理的中斷策略
筆者在上面的文章中對于拋出的異常給出了一段todo的伏筆,這里我們就來說說線程面對中斷異常后響應的哲學。一般來說,線程級或者服務級的中斷策略為:
- 盡快的退出
- 必要時完成手頭任務的清理
這也就是為什么java中各種并發包的類庫對于中斷的任務僅僅是拋出InterruptedException而不是直接處理掉中斷 ,例如ArrayBlockingQueue的put方法:
//將任務中的中斷InterruptedException 丟給調用棧的上層代碼執行
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//上可打斷的鎖
lock.lockInterruptibly();
try {
//......
} finally {
//1. 釋放鎖
lock.unlock();
}
}
而中斷策略的響應,正確的做法應該是讓執行該任務的線程進行按照如下原則進行處理:
- 如果不具備處理的能力,則將異常向上傳遞
- 如果無法傳遞異常則顯示拋出中斷讓上層的調用棧感知。
以我們Task生產者代碼為例,我們將提交給線程即哪些繼承runnable或者lambda表達式統稱為任務,一般來說持有這些任務的線程不一定是這些任務的執行者,它們僅擁有對于任務狀態管理的一些權限,例如一個主線程main方法調用thread-0異步執行阻塞隊列存取操作:
所以從任務的維度來說,執行任務的線程應該小心的保存中斷的狀態,即在面對中斷時,它們不應該對中斷進行任何的干預,而是讓擁有線程的代碼段做出正確的響應,即讓thread-0感知到中斷異常然后將狀態狀態還原向上傳遞:
對應的我們也給出阻塞存儲元素的優雅中斷處理:
- 輪詢檢測本地中斷標識,若未中斷執行插入
- 感知中斷,捕獲異常并打印未能處理的元素
- 完成資源兜底,主動打斷線程將狀態向上傳遞
//外部線程打斷的方法
public void cancel() {
thread.interrupt();
}
@Override
public void run() {
try {
//取消標識檢測,如果取消則直接結束循環
while (!Thread.currentThread().isInterrupted()) {
System.out.println("運行中......");
Integer element = RandomUtil.randomInt(10);
try {
queue.put(element);
} catch (InterruptedException e) {
Console.log("線程中斷,未處理資源:{}", element);
Thread.currentThread().interrupt();
}
}
} finally {
if (thread.isInterrupted()) {
System.out.println("任務已取消");
}
}
}
對應的我們也給出輸出結果,可以看到阻塞的隊列在被打斷后完成必要的資源兜底,就會將中斷狀態向上傳遞:
運行中......
運行中......
線程中斷,未處理資源:6
任務已取消
4. 時刻保留中斷的狀態
需要注意的是,執行者僅僅傳遞中斷還是不行的,更重要的一點是:
在必要時刻,保存中斷的狀態,并返回前恢復狀態,而不是捕獲到isInterrupted,避免陷入無限循環的漩渦。
很多情況下當前任務不具備處理中斷的能力,例如Runnable收到中斷的請求不可拋出異常交由上層調用棧處理,那么就在收到中斷請求,按照如下步驟執行:
- 基于本地標識保留中斷狀態
- 完成必要的收尾工作
- 在返回前打斷該線程恢復中斷狀態
例如線程0循環獲取阻塞隊列元素,在因為沒有元素而阻塞時,線程1打斷該線程,已按照時刻保留中斷的狀態守則,線程0則應該按照如下步驟執行:
- 收到中斷,利用本地變量保留中斷狀態
- 繼續循環等待元素獲取
- 獲取到元素并返回,在返回前將當前線程打斷,讓外部感知
對應的我們給出消費者循環獲取元素并處理狀態的代碼:
privatestaticfinal BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);
public static String getNextElement() {
boolean interrupted = false;
try {
while (true) {
try {
//1. 等待結果返回而阻塞
return blockingQueue.take();
} catch (InterruptedException e) {
//2.收到異常中斷,小心保存中斷狀態,繼續阻塞等待元素返回
interrupted = true;
Console.log("消費者線程中斷,待完成本次資源獲取后執行中斷");
}
}
} finally {
//3. 返回前基于中斷標識將中斷狀態向上傳遞
if (interrupted) {
Console.log("消費者線程已中斷");
Thread.currentThread().interrupt();
}
}
}
對應的我們也給出測試代碼,即在消費者阻塞后將其打斷,并投遞元素,讓其完成優雅中斷:
public static void main(String[] args) {
//消費者線程
Thread thread = new Thread(() -> {
String nextElement = getNextElement();
Console.log("消費者線程獲取結果:{}", nextElement);
});
thread.start();
Console.log("消費者線程啟動");
//休眠5s后將task任務對應線程中斷
new Thread(() -> {
//休眠5s后將task任務對應線程中斷
ThreadUtil.sleep(5000);
thread.interrupt();
//休眠5s后再投遞元素
ThreadUtil.sleep(5000);
try {
String element = RandomUtil.randomString(10);
Console.log("生產者線程投遞:{}", element);
blockingQueue.put(element);
} catch (InterruptedException e) {
//......
}
}).start();
}
輸出結果如下,可以看到消費者在收到中斷后明確保留中斷狀態,并完成資源處理的工作后執行中斷:
消費者線程啟動
消費者線程中斷,待完成本次資源獲取后執行中斷
生產者線程投遞:7gmnj1rqpj
消費者線程已中斷
消費者線程獲取結果:7gmnj1rqpj
5. 超時任務取消的最優解
如果我們現在需要實現這樣以一個函數,該函數會接受外部傳入一個異步任務并提交到我們的線程池異步執行,并具備如下要求:
- 要求在給定時間完成執行
- 任務執行完成后,要知曉是超時取消,還是正常執行完成返回,即任務正確執行則返回true,反之返回false
- 任務執行過程中可被中斷
所以對于該需求,要做到如下幾點:
- 可以感知任務執行完成并返回true
- 可以感知任務執行超時,并返回false
- 任務可中斷,直接拋出讓上層代碼解決
對應我們給出如下代碼,可以看到我們采用submit獲取異步任務的Future對象,利用Future實現帶有時限的阻塞獲取,一旦超時則直接拋出超時異常,并在函數返回前的finally語句塊調用cancel取消任務,需要注意的是這個cancel方法并不會一味的取消任務:
- 如果任務已完成,cancel就會返回false
- 如果任務因為超時等原因調用cancel,那么任務則還是活躍的,調用cancel可以取消并直接返回true
所以基于cancel這個特點,我們直接取反,即可實現正確執行返回true,超時返回false:
private staticfinal ExecutorService executor = ThreadUtil.newExecutor(10);
public static boolean get(Runnable r, int timeout) throws InterruptedException {
Future<?> future = executor.submit(r);
try {
future.get(timeout, TimeUnit.SECONDS);
} catch (TimeoutException e) {
Console.error("任務執行超時");
} catch (ExecutionException e) {
thrownew RuntimeException(e);
} finally {
/*
1. 設置為true,如果任務是運行中,則取消任務,如果已經取消,則沒有任務效果
2. 如果任務已經完成,則返回false,反之返回true
*/
return !future.cancel(true);
}
}
對應的我們也給出測試代碼:
public static void main(String[] args) {
try {
boolean isTimeOut = get(() -> {
ThreadUtil.sleep(5000);
Console.log("任務執行完成");
}, 1);
Console.log("任務是否正常執行:{}", isTimeOut);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
輸出結果如下,可以看到任務執行超時后直接打斷休眠,任務執行到已完成的輸出,然后執行超時取消,如果取反得到false:
任務執行超時
任務執行完成
任務是否正常執行:false
6. 處理系統層面阻塞IO
有時候阻塞并非來自阻塞式并發包的調用,而是例如硬件層面文件IO或者網絡層面的socket IO,這種API涉及內核態調用,通過interrupt我們也只能修改中斷表示,無法直接將其中斷。
所以我們只能通過間接的手段干預其資源關閉來做到中斷,無論是socket還是文件IO,本質上都是針對系統或者網卡IO數據的阻塞讀取,所以我們可以直接通過關閉文件IO流或者socket套接字來間接打斷其資源讀取。
對應的我們以文件IO為例給出代碼示例,可以看到我們通過繼承thread重寫其中斷方法,當我們需要打斷系統資源時,直接關閉其流通道讓工作線程感知到這一點,然后通過原生interrupt修改中斷狀態:
public class IOThread extends Thread {
privatefinal BufferedReader utf8Reader;
public IOThread(String path) {
utf8Reader = FileUtil.getUtf8Reader(path);
}
@Override
public synchronized void start() {
while (true) {
try {
String line = utf8Reader.readLine();
Console.log(line);
} catch (IOException e) {
//保存IO上下文狀態
thrownew RuntimeException(e);
}
}
}
@Override
public void interrupt() {
try {
//強制關閉IO讓其感知中斷
utf8Reader.close();
} catch (IOException e) {
thrownew RuntimeException(e);
} finally {
super.interrupt();
}
}
}
對應的我們也給出使用示例,這段代碼會在中斷線程調用interrupt關閉流通道直接直接將IOUtil 線程打斷:
IOThread ioThread = new IOThread("F:\\test.txt");
Thread thread = new Thread(() -> {
ThreadUtil.sleep(10_000);
ioThread.interrupt();
});
thread.start();
ioThread.start();