京東一面:Java 提供哪幾種線程池,什么場景用
前言
大家好,我是田螺。
我們來看一道京東一面面試題:Java 提供哪幾種線程池,什么場景使用?
- newFixedThreadPool
- newCachedThreadPool
- newSingleThreadExecutor
- newScheduledThreadPool
1. newFixedThreadPool
newFixedThreadPool的構造函數:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
1.1 線程池特點:
- 核心線程數和最大線程數大小一樣
- 沒有所謂的非空閑時間,即keepAliveTime為0
- 阻塞隊列為無界隊列LinkedBlockingQueue
1.2 newFixedThreadPool工作機制
圖片
- 提交任務
- 如果線程數少于核心線程,創建核心線程執行任務
- 如果線程數已經等于核心線程,把任務添加到LinkedBlockingQueue阻塞隊列
- 如果線程執行完任務,去阻塞隊列取任務,繼續執行。
- 如果持續無限添加任務,可能會導致OOM,因為它是無界隊列。
1.3 無界隊列OOM的實例代碼
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executor.execute(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
//do nothing
}
});
}
為了驗證OOM,IDE指定JVM參數:-Xmx8m -Xms8m
運行結果:
圖片
newFixedThreadPool使用了無界的阻塞隊列LinkedBlockingQueue,如果線程獲取一個任務后,任務的執行時間比較長(比如,上面demo設置了10秒),會導致隊列的任務越積越多,導致機器內存使用不停飆升, 最終導致OOM:
圖片
大家有興趣可以看看源碼哈:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
...
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
...
}
1.4 使用場景
- 定時任務調度
對于需要定時執行的任務,如每天的報表生成、數據備份或清理任務,FixedThreadPool 可以保持固定數量的線程來按時執行這些任務,確保系統在高峰期也能穩定運行。
- 一些后臺服務中,比如郵件發送、短信通知等
在一些后臺服務中,比如郵件發送、短信通知等,使用 FixedThreadPool 可以確保有足夠的線程來處理發送請求,而不會因為突發的高并發請求導致系統崩潰。例如,在一個活動結束后,用戶會收到活動總結郵件,固定線程池可以有效管理郵件發送任務,確保每封郵件都能及時發送。
- 適用于處理CPU密集型的任務
CPU密集型任務是指那些主要依賴于CPU計算能力的任務。這類任務通常需要大量的計算資源,且其執行時間與CPU的處理能力密切相關。與之相對的是I/O密集型任務,后者主要受限于輸入/輸出操作(如磁盤讀寫、網絡請求等)。
2. newCachedThreadPool
newCachedThreadPool 的構造函數。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
2.1 線程池特點
- 核心線程數為0
- 最大線程數為Integer.MAX_VALUE
- 阻塞隊列是SynchronousQueue
- 非核心線程空閑存活時間為60秒
當提交任務的速度大于處理任務的速度時,每次提交一個任務,就必然會創建一個線程。極端情況下會創建過多的線程,耗盡 CPU 和內存資源。由于空閑 60 秒的線程會被終止,長時間保持空閑的 CachedThreadPool 不會占用任何資源。
2.2 newCachedThreadPool工作機制
圖片
- 提交任務
- 因為沒有核心線程,所以任務直接加到SynchronousQueue隊列。
- 判斷是否有空閑線程,如果有,就去取出任務執行。
- 如果沒有空閑線程,就新建一個線程執行。
- 執行完任務的線程,還可以存活60秒,如果在這期間,接到任務,可以繼續活下去;否則,被銷毀。
2.3 無界隊列OOM的實例代碼
newCachedThreadPool 使用不當,也是會導致OOM的,比如以下這個demo:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolOOMExample {
public static void main(String[] args) {
// 創建一個無界線程池
ExecutorService executorService = Executors.newCachedThreadPool();
try {
// 不斷提交任務,模擬內存消耗
while (true) {
executorService.submit(() -> {
// 模擬一個長時間運行的任務
try {
// 創建一個大的對象來消耗內存
int[] largeArray = new int[1000000]; // 1,000,000 integers
// 模擬一些計算
for (int i = 0; i < largeArray.length; i++) {
largeArray[i] = i;
}
// 讓線程稍微休眠一下
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
2.4 使用場景
使用用于用于并發執行大量短期的小任務。比如一些網絡爬蟲、Web服務器處理請求。
3. newSingleThreadExecutor
newSingleThreadExecutor的構造函數:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
3.1 線程池特點
- 核心線程數為1
- 最大線程數也為1
- 阻塞隊列是LinkedBlockingQueue
- keepAliveTime為0
3.2 newSingleThreadExecutor的工作機制
圖片
- 提交任務
- 線程池是否有一條線程在,如果沒有,新建線程執行任務
- 如果有,講任務加到阻塞隊列
- 當前的唯一線程,從隊列取任務,執行完一個,再繼續取,一個人(一條線程)夜以繼日地干活。
3.3 newSingleThreadExecutor的實例代碼
newSingleThreadExecutor 使用的也是無界隊列。如果任務提交速率過高,可能會導致系統資源耗盡(如內存溢出)。我們來看一個簡單使用demo:
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName()+"正在執行");
});
}
運行結果:
圖片
3.4 使用場景
適用于串行執行任務的場景,一個任務一個任務地執行。比如任務調度
在某些業務場景中,任務之間存在依賴關系,即一個任務的輸出是另一個任務的輸入。在這種情況下,使用單線程執行器可以確保任務按預期的順序執行。
4. newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
4.1 線程池特點
- 最大線程數為Integer.MAX_VALUE
- 阻塞隊列是DelayedWorkQueue
- keepAliveTime為0
- scheduleAtFixedRate() :按某種速率周期執行
- scheduleWithFixedDelay():在某個延遲后執行
4.2 工作機制
- 添加一個任務
- 線程池中的線程從 DelayQueue 中取任務
- 線程從 DelayQueue 中獲取 time 大于等于當前時間的task
- 執行完后修改這個 task 的 time 為下次被執行的時間
- 這個 task 放回DelayQueue隊列中
4.3 實例代碼
/**
創建一個給定初始延遲的間隔性的任務,之后的下次執行時間是上一次任務從執行到結束所需要的時間+* 給定的間隔時間
*/
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleWithFixedDelay(()->{
System.out.println("current Time" + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName()+"正在執行");
}, 1, 3, TimeUnit.SECONDS);
運行結果:圖片
/**
創建一個給定初始延遲的間隔性的任務,之后的每次任務執行時間為 初始延遲 + N * delay(間隔)
*/
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(()->{
System.out.println("current Time" + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName()+"正在執行");
}, 1, 3, TimeUnit.SECONDS);;
周期性執行任務的場景,需要限制線程數量的場景。比如定時清理任務:
在某些應用程序中,可能會產生臨時文件或日志記錄。為了保持系統的整潔和性能,需要定期清理這些臨時文件或日志。可以使用 newScheduledThreadPool 來安排清理任務,例如每小時或每天清理一次。