張開濤:線程的中斷、超時與降級
最近一位朋友在公眾號留言問一個關于熔斷的問題:使用hystrix進行httpclient超時熔斷錯誤,我是順序操作的(沒有并發),發現hystrix會超時斷開,但是會導致hystrix線程池不斷增多,直到后面因線程池裝不下拒絕?
而該問題跟線程中斷、超時與降級等有關,因此本文將詳細介紹導致這個問題背后的原因。
當我們在線程中執行如用戶請求任務時,比如HTTP處理線程,最擔心的什么?
- 線程數***增長;
- 線程執行時間長;
- 線程不可中斷。
對于線程數***增長,我們可以通過使用線程池來控制線程數量,控制線程不是***增長的。
對于線程執行時間長,我們應設置合理的超時時間來保障線程執行時間可控,當超時時要么返回給用戶錯誤頁面,要么可以返回降級頁面。
對于線程不可中斷,我們應想辦法將線程設計的可中斷,從而在遇到問題可中斷線程并降級處理。
接下來的部分將主要講解線程中斷。
線程中斷是通過Thread.interrupt()方法來做,一般是在A線程來中斷B線程。
首先我們來看下該方法的一些Javadoc描述:
1. 如果線程被Object的wait()、wait(long)、wait(long, int) 或者
Thread的join()、join(long)、join(long, int)、sleep(long)、sleep(long, int)方法阻塞,執行線程中斷,且拋出InterruptedException,但中斷狀態并清空重置,即Thread. isInterrupted()返回false;
2. 如果線程被java.nio.channels.InterruptibleChannel上的一個I/O操作阻塞,執行線程中斷,且該
InterruptibleChannel將被關閉,拋出java.nio.channels.ClosedByInterruptException,線程中斷狀態會設置,即Thread. isInterrupted()返回true;
3. 如果線程被java.nio.channels.Selector阻塞,執行線程中斷,該Selector#select()方法將立即返回,相當于調用了java.nio.channels.Selector#wakeup(),不會拋出異常,但會設置中斷狀態,即Thread. isInterrupted()返回true;
4. 如果不滿足以上條件的,那么執行線程中斷不會拋出異常,僅設置中斷狀態,即Thread. isInterrupted()返回true。也就是說我們代碼要根據該狀態來決定下一步怎么做。
從如上描述可以看出,如果方法異常描述上有拋出
InterruptedException、ClosedByInterruptException異常的,說明該方法可以中斷,如“public final native void wait(longtimeout) throws InterruptedException”,但是中斷狀態會被會被重置要看其Javadoc描述。其他情況基本都是設置中斷狀態而不會中斷掉操作。
BIO(Blocking I/O)操作不可中斷
如java.net.Socket讀寫網絡I/O時是阻塞的,除了設置超時時間外,還應該考慮讓它可中斷或者盡早中斷。可以參考《你的Java代碼可中斷嗎》。還有如JDBC驅動mysql-connector-java、HttpClient等大部分都是使用BIO,它們也是不可中斷的。
NIO(New I/O)操作可中斷
NIO涉及到兩部分:
java.nio.channels.Selector和java.nio.channels.InterruptibleChannel,它們是可中斷的。如java.nio.channels#SocketChannel實現了InterruptibleChannel,如下方法都是可中斷的,并會拋出ClosedByInterruptException異常:
- connect(SocketAddress remote)
- read(ByteBuffer[] dsts, int offset, int length)
- read(ByteBuffer[] dsts)
- write(ByteBuffer src)
線程、BIO與中斷
我們使用BIO實現的HttpClient來做個實驗,如下代碼所示:
- public class BlockingIOTest {
- public static void main(String[] args) throws Exception {
- Thread threadA = new Thread(()-> {
- try {
- //該阻塞5s
- String url = "http://localhost:9090/ajax";
- //HttpClient是BIO,不可中斷
- HttpResponse response = HttpClientUtils.getHttpClient().execute(new HttpGet(url));
- System.out.println("http status code : " + response.getStatusLine().getStatusCode());
- //雖然在threadB執行了threadA線程中斷
- //但是僅僅是設置了中斷狀態為true
- //并沒有中斷線程A的執行,該線程還是正常的執行完成了
- System.out.println("threadA is interrupted: " + Thread.currentThread().isInterrupted());
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
- Thread threadB = new Thread(()-> {
- try {
- Thread.sleep(2000L);
- //休眠2s后,中斷線程A
- threadA.interrupt();
- } catch (Exception e) {
- }
- });
- threadA.start();
- threadB.start();
- Thread.sleep(15000L);
- }
- }
如上代碼的輸出結果為:
- http status code : 200
- threadA is interrupted: true
如上代碼的執行流程是這樣的:
- 線程A通過BIO實現HttpClient遠程調用http://localhost:9090/ajax獲取數據,而該服務需要5s才能響應;
- 線程B在線程A執行2s后進行了中斷處理,但是線程A調用的HttpClient是阻塞且不可中斷的操作,僅僅是設置了線程A的中斷狀態為true,因此其一直等待網絡I/O完成;
- 當線程A從遠程獲取到結果后繼續執行,Thread.currentThread().isInterrupted()將輸出true,表示線程A被設置了中斷狀態。
從而需要注意設置了中斷狀態與中斷執行不是一回事。因此對于使用BIO,一定要設置好連接和讀寫的超時時間,另外可以參考《你的Java代碼可中斷嗎》進行可中斷設計。
線程池、Future與中斷
我們往線程池提交一個HttpClient任務,并通過Future來等待執行結果,如下代碼所示:
- public class ThreadPoolTest {
- private static ExecutorService executorService = Executors.newFixedThreadPool(5);
- public static void main(String[] args) throws Exception {
- Future<Integer> futureA = executorService.submit((Callable) () -> {
- //該url會阻塞5s
- String url = "http://localhost:9090/ajax";
- //HttpClient是BIO,不可中斷
- HttpResponse response = HttpClientUtils.getHttpClient().execute(new HttpGet(url));
- Integer result = response.getStatusLine().getStatusCode();
- System.out.println("thread a result : " + result);
- return response.getStatusLine().getStatusCode();
- });
- Future<Integer> futureB = executorService.submit((Callable) () -> {
- //該url會阻塞5s
- String url = "http://localhost:9090/ajax";
- //HttpClient是BIO,不可中斷
- HttpResponse response = HttpClientUtils.getHttpClient().execute(new HttpGet(url));
- Integer result = response.getStatusLine().getStatusCode();
- System.out.println("thread b result : " + result);
- return result;
- });
- try {
- Integer resultA = futureA.get(100, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- System.out.println("future a timeout");
- }
- try {
- Integer resultB = futureB.get(100, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- System.out.println("future b timeout");
- }
- executorService.awaitTermination(10000L, TimeUnit.MILLISECONDS);
- }
- }
如上代碼的輸出結果為:
- future a timeout
- future b timeout
- thread a result : 200
- thread b result : 200
如上代碼的執行流程是這樣的:
- 主線程往線程池提交了兩個HttpClient阻塞調用任務,該任務響應時間為5s;
- 主線程阻塞在兩個帶超時的Future等待上,Future在等待線程池任務執行結束,Future的超時時間設置為100ms,所以很快就超時并返回了,主線程繼續執行,在《億級流量》中我們用到了很多這種方法進行并發獲取數據和降級或熔斷處理;
- 線程池中的兩個任務其實并沒有被中斷,還是占用著線程池中的線程,在后臺繼續執行,直到完成。
從如上可以看出,使用Future時只是在主線程解除了阻塞,并沒有連帶把線程池任務取消掉,還是占用著線程并阻塞執行。
之前有位同學在公眾號后臺留言咨詢:
使用hystrix進行httpclient超時熔斷錯誤,我是順序操作的(沒有并發),發現hystrix會超時斷開,但是會導致hystrix線程池不斷增多,直到后面因線程池裝不下拒絕?
看完如上示例,應該能解決該讀者的疑惑。雖然熔斷了,但是線程中的操作并沒有真正的中斷,而是還占著線程資源。
接下來我們可以簡單看下Future其中的一個實現FutureTask:
超時等待方法get(long timeout, TimeUnit unit)偽代碼:
- while(true) {
- if (Thread.interrupted()) {//如果當前線程中斷了,處理現場,并拋出中斷異常
- //some code
- throw new InterruptedException();
- }
- //判斷剩余休眠時間
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {//如果沒有休眠時間了,則處理線程,并終止執行
- //some code
- return state;
- }
- //休眠一段時間,內部實現為UNSAFE.park(false, nanos)
- LockSupport.parkNanos(this, nanos);
- }
取消方法cancel(boolean mayInterruptIfRunning)偽代碼:
- if (mayInterruptIfRunning) {//中斷當前線程
- Thread t = runner;
- if (t != null)
- t.interrupt();
- }
- //執行UNSAFE.unpark(thread)喚醒休眠的當前線程
- LockSupport.unpark(t);
即當我們調用Future#cancel時,是通過喚醒Future所在線程實現,當然實際是比這個要復雜的。
回填結果方法set(V v)偽代碼:
- //修改Future狀態為完成
- //保持v的值,從而Future#get能獲取到
- //通過LockSupport.unpark(t)喚醒休眠的線程
當線程池中的線程執行完成后,是通過Future#set把值設置回Future,從而喚醒休眠的線程,即阻塞在Future#get的等待,然后獲取到該結果。
鎖與中斷
synchronized和ReentrantLock#lock()在獲取鎖的過程中是不可中斷的,假設出現了死鎖將一直僵持在那,無法終止阻塞。但我們可以使用可中斷的
ReentrantLock#lockInterruptibly()方法或者ReentrantLock#tryLock(long timeout, TimeUnit unit)實現可中斷。
總結
在設計高可用系統時,盡量使用線程池,而不是通過每個請求創建一個線程來實現,通過線程池的拒絕策略來優雅的拒絕無法處理的請求。
檢查整個請求鏈路設置合理的超時時間,跟調用方協商合理的SLA、降級限流方案。更長的超時時間意味著出現問題時請求堆積的越多,越可能產生雪崩。
明確知道自己的服務是否可中斷,如果不可中斷,應該使用線程池和Future實現偽可中斷,通過Future配置合理的超時時間,當超時時執行相應的降級策略。也要清楚的知道通過Future只是偽中斷,線程池中的任務還是在后臺執行,當Future超時后進行重試時,會對調用的服務產生更多的請求,從而造成一種DDos,一定要注意相應的處理策略。
池大小、超時時間和中斷沒有***的配置策略,要根據自己的場景來動態調整,在系統遇到高并發或者異常時,我們要保護什么,放棄什么,要有權衡。
【本文是51CTO專欄作者“張開濤”的原創文章,作者微信公眾號:開濤的博客( kaitao-1234567)】