Java為什么不建議使用Executors來創(chuàng)建線程池呢?
我們都知道在面試的過程中,關(guān)于線程池的問題,一直都是面試官比較注重的考點,現(xiàn)在也不會有面試官會選擇去問創(chuàng)建線程都有哪些方式了,而更多的實惠關(guān)注到如何去使用線程池,今天了不起就來和大家說說線程池。
Java創(chuàng)建線程池方式
在Java中,創(chuàng)建線程池主要使用java.util.concurrent包下的Executors類。這個類提供了幾種靜態(tài)工廠方法,用于創(chuàng)建和管理不同類型的線程池。以下是一些常見的創(chuàng)建線程池的方式:
1.Fixed Thread Pool(固定線程池)
- 創(chuàng)建一個可重用固定線程數(shù)的線程池,以共享的無界隊列方式來運行這些線程。在任意點,在大多數(shù) nThreads 線程會處于處理任務的活動狀態(tài)。如果在所有線程處于活動狀態(tài)時提交附加任務,則在有可用線程之前,附加任務將在隊列中等待。
- 創(chuàng)建方法:Executors.newFixedThreadPool(int nThreads)
2.Cached Thread Pool(緩存線程池)
- 創(chuàng)建一個可根據(jù)需要創(chuàng)建新線程的線程池,但是在以前構(gòu)造的線程可用時將重用它們。對于執(zhí)行很多短期異步任務的程序而言,這些線程池通常可提高程序性能。調(diào)用 execute 將重用以前構(gòu)造的線程(如果線程可用)。如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。
- 創(chuàng)建方法:Executors.newCachedThreadPool()
3.Single Thread Executor(單線程執(zhí)行器)
- 創(chuàng)建一個使用單個工作線程的 Executor,以無界隊列方式來運行該線程。(注意,如果單個線程始終因為等待新任務而處于非活動狀態(tài),則在現(xiàn)行線程終止之前,它可能無法終止。)但是,如果線程因為失敗而終止,那么會有一個新的線程來替代它。單個線程的優(yōu)勢在于,你無需處理對線程生命周期的管理。
- 創(chuàng)建方法:Executors.newSingleThreadExecutor()
4.Scheduled Thread Pool(計劃線程池)
- 創(chuàng)建一個線程池,它可安排在給定延遲后運行命令或者定期地執(zhí)行。
- 創(chuàng)建方法:Executors.newScheduledThreadPool(int corePoolSize)
5.自定義線程池
除了使用Executors類提供的靜態(tài)工廠方法創(chuàng)建線程池外,還可以通過實例化ThreadPoolExecutor類來自定義線程池。這種方式提供了更多的靈活性,允許你設(shè)置線程池的核心參數(shù),如核心線程數(shù)、最大線程數(shù)、線程存活時間、任務隊列等。
示例代碼:
import java.util.concurrent.*;
public class CustomThreadPool {
public static void main(String[] args) {
int corePoolSize = 5;
int maximumPoolSize = 10;
long keepAliveTime = 60L;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
// 使用線程池執(zhí)行任務...
}
}
非自定義線程池的缺點
我們先來看看 Executors 當中的幾個方法,也就是上面了不起給大家寫的除了自定義線程池的幾個方法。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
在源碼中有一個類,我們明顯的看到了隊列的身影,那就是 LinkedBlockingQueue。
它實現(xiàn)了一個基于鏈接節(jié)點的可選容量的阻塞隊列。此隊列按 FIFO(先進先出)排序元素。隊列的頭部是在隊列中存在時間最長的元素,隊列的尾部是在隊列中存在時間最短的元素。新元素總是插入到隊列的尾部,而檢索操作(如 take 和 poll)總是從隊列的頭部開始。
LinkedBlockingQueue 是一個線程安全的隊列,它內(nèi)部使用了鎖和條件變量來保證多線程環(huán)境下的正確性和一致性。因為它是阻塞隊列,所以它可以用于生產(chǎn)者和消費者模型,在生產(chǎn)者線程和消費者線程之間傳遞數(shù)據(jù)。
LinkedBlockingQueue 的主要特點就幾個
- 容量可選
- 阻塞操作
- 非阻塞操作
- 線程安全
- 高效的并發(fā)性能
為什么說容量可選呢?因為我們?nèi)绻麊为毷褂眠@個LinkedBlockingQueue 那么你可以在創(chuàng)建 LinkedBlockingQueue 時指定一個容量,這將限制隊列中可以存儲的元素數(shù)量。如果未指定容量,則隊列的容量將是 Integer.MAX_VALUE。當隊列滿時,任何嘗試插入元素的線程都將被阻塞,直到隊列中有空間可用。
而阻塞操作則是他提供了阻塞的 put 和 take 方法。put 方法用于添加元素到隊列中,如果隊列已滿,則調(diào)用線程將被阻塞直到隊列有空閑空間。take 方法用于從隊列中移除并返回頭部元素,如果隊列為空,則調(diào)用線程將被阻塞直到隊列中有元素可用。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
......
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
.....
我們看一個使用LinkedBlockingQueue的示例:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumerExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
System.out.println("Produced: " + i);
queue.put(i);
Thread.sleep(200); // 模擬生產(chǎn)耗時
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
while (true) {
Integer item = queue.take();
System.out.println("Consumed: " + item);
Thread.sleep(500); // 模擬消費耗時
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
// 注意:這里的 consumer 線程是一個無限循環(huán),所以它不會自然結(jié)束。
// 在實際應用中,你需要有一個明確的停止條件來結(jié)束消費者線程。
}
}
說到這里感覺說多了,我們回歸正題,如果我們使用標準的 newCachedThreadPool 方法,如果線程數(shù)設(shè)置和任務數(shù)不能夠配合起來,就比如說設(shè)置的線程數(shù)是一定的,這個時候,任務數(shù)量越多,就會慢慢的進入到隊列LinkedBlockingQueue中,隊列的話,任務越多,占用的內(nèi)存越多,最終就非常容易耗盡內(nèi)存,導致OOM。
所以我們不推薦直接使用 Executors 來創(chuàng)建線程池,但是我們更推薦使用 ThreadpoolExecutor創(chuàng)建線程池。原因就是如下的幾點:
1.資源控制:ThreadPoolExecutor 允許你明確控制并發(fā)線程的最大數(shù)量,防止因為創(chuàng)建過多的線程而耗盡系統(tǒng)資源。通過合理地設(shè)置線程池的大小,可以平衡資源利用率和系統(tǒng)性能。
2.線程復用:線程池中的線程可以被多個任務復用,這減少了在創(chuàng)建和銷毀線程上花費的時間以及開銷,提高了系統(tǒng)的響應速度。
3.任務隊列:ThreadPoolExecutor 內(nèi)部維護了一個任務隊列,當線程池中的線程都在工作時,新提交的任務會被放在隊列中等待執(zhí)行。這提供了一種緩沖機制,可以平滑處理突發(fā)的高并發(fā)任務。
4.靈活性:ThreadPoolExecutor 提供了多種配置選項,如核心線程數(shù)、最大線程數(shù)、線程存活時間、任務隊列類型等,這些選項可以根據(jù)具體的應用場景進行調(diào)整,以達到最佳的性能和資源利用率。
5.異常處理:當線程池中的線程因為未捕獲的異常而終止時,ThreadPoolExecutor 會創(chuàng)建一個新的線程來替代它,從而保持線程池的穩(wěn)定性。此外,你也可以通過提供自定義的 ThreadFactory 來控制線程的創(chuàng)建過程,例如設(shè)置線程的名稱、優(yōu)先級、守護狀態(tài)等。
6.可擴展性:ThreadPoolExecutor 的設(shè)計是基于策略的,它使用了多個接口和抽象類來定義線程池的行為,這使得它很容易通過擴展或替換某些組件來適應不同的需求。
7.與Java并發(fā)庫集成:ThreadPoolExecutor 是 Java 并發(fā)庫 java.util.concurrent 的一部分,這個庫提供了豐富的并發(fā)工具和類,如鎖、信號量、倒計時器、阻塞隊列等,這些都可以與 ThreadPoolExecutor 無縫集成,簡化多線程編程的復雜性。
8.性能監(jiān)控和調(diào)優(yōu):ThreadPoolExecutor 提供了一些有用的方法,如 getTaskCount()、getCompletedTaskCount()、getPoolSize() 等,這些方法可以幫助你監(jiān)控線程池的運行狀態(tài),從而進行性能調(diào)優(yōu)。
所以你了解了么?