使用 Java8 中的并行流的注意事項
近期對迭代的功能進行壓測檢查,發現某些使用并發技術的線程任務耗時非常漫長,結合監控排查定位到的并行流使用上的不恰當,遂以此文分享一下筆者發現的問題。
一、問題復現
1. 需求背景
這里筆者先簡單介紹一下當前功能的使用背景,當前功能是一些大數據量的計算密集型任務定時執行,在常規優化效率有限的情況下,考慮到復用性,筆者通過JDK8底層內置的并行流完成這些任務的計算。
對應優化思路如下,可以看到針對每一批數據,筆者都是通過并行流采集出集合并將其寫入文檔:
2. 常規串行計算
我們給出第一段代碼示例,為了更專注于本文并行流問題的剖析,筆者對于兩個并行線程所執行的數據采集和寫入文檔的操作通過原子類并發計算來模擬:
public static void main(String[] args) throws Exception {
AtomicInteger atomicInteger = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(2);
long beginTime = System.currentTimeMillis();
//模擬采集5000w數據并寫入本地文檔中
new Thread(() -> {
IntStream.range(0, 5000_0000)
.forEach(i -> atomicInteger.getAndIncrement());
countDownLatch.countDown();
}, "t1").start();
//模擬采集5000w數據并寫入本地文檔中
new Thread(() -> {
IntStream.range(0, 5000_0000)
.forEach(i -> atomicInteger.getAndIncrement());
countDownLatch.countDown();
}, "t2").start();
//等待兩個線程結束
countDownLatch.await();
//輸出耗時
long endTime = System.currentTimeMillis();
System.out.println("atomicInteger: " + atomicInteger.get());
System.out.println("time: " + (endTime - beginTime) + " ms");
}
輸出結果如下,可以看到1e的數據耗時大約需要1.6s:
atomicInteger: 100000000
time: 1620 ms
3. 單任務并行流
我們再進行更進一步的優化,將某個線程的任務使用并行流進行原子運算(模擬業務操作):
public static void main(String[] args) throws Exception {
AtomicInteger atomicInteger = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(2);
long beginTime = System.currentTimeMillis();
//模擬并行流采集5000w數據并寫入本地文檔中
new Thread(() -> {
IntStream.range(0, 5000_0000)
.parallel()
.forEach(i -> atomicInteger.getAndIncrement());
countDownLatch.countDown();
}, "t1").start();
//模擬采集5000w數據并寫入本地文檔中
new Thread(() -> {
IntStream.range(0, 5000_0000)
.forEach(i -> atomicInteger.getAndIncrement());
countDownLatch.countDown();
}, "t2").start();
//等待兩個線程結束
countDownLatch.await();
//輸出耗時
long endTime = System.currentTimeMillis();
System.out.println("atomicInteger: " + atomicInteger.get());
System.out.println("time: " + (endTime - beginTime) + " ms");
}
從輸出結果來看,性能表現提升了幾毫秒,相對于最后生產上業務的數據量而言,可能會提升更多:
atomicInteger: 100000000
time: 1337 ms
4. 雙并行流運算
結合上述結果,我們大膽提出,是否所有任務都通過通過并行流進行運算,程序的執行性能是否會在此提升:
public static void main(String[] args) throws Exception {
AtomicInteger atomicInteger = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(2);
long beginTime = System.currentTimeMillis();
//模擬并行流采集5000w數據并寫入本地文檔中
new Thread(() -> {
IntStream.range(0, 5000_0000)
.parallel()
.forEach(i -> atomicInteger.getAndIncrement());
countDownLatch.countDown();
}, "t1").start();
//模擬并行流采集5000w數據并寫入本地文檔中
new Thread(() -> {
IntStream.range(0, 5000_0000)
.parallel()
.forEach(i -> atomicInteger.getAndIncrement());
countDownLatch.countDown();
}, "t2").start();
//等待兩個線程結束
countDownLatch.await();
//輸出耗時
long endTime = System.currentTimeMillis();
System.out.println("atomicInteger: " + atomicInteger.get());
System.out.println("time: " + (endTime - beginTime) + " ms");
}
很明顯,從最終的耗時來看,執行時間不減反增了,這是為什么呢?
atomicInteger: 100000000
time: 1863 ms
二、詳解多任務采用并行流導致執行低效的原因
實際上并行流底層所采用的線程池是一個在程序啟動初始化期間就會創建的線程池common,程序初始化時它會檢查用戶的是否有配置java.util.concurrent.ForkJoinPool.common.parallelism這個參數,如果有則基于這個參數的數值為common創建定量的線程,后續的我們的并行流運算的執行都會提交到該線程池中。
這就意味著我們上述的操作中,所有線程中千萬的執行子項都通過同一個線程池進行并行運算,這期間線程池的忙碌程度可想而知,這也就是為什么筆者在進行壓測時明明某些數據量不是很大的任務耗時卻非常大的本質原因:
對于該問題,筆者也通過StackOverflow看到并行流設計的思想,設計者認為對于計算密集型任務,默認情況下,它將通過一個初始化一個CPU核心數一致的線程池,讓所有并行運算共享一個線程池,進行并行流運算時使用的線程永遠在核心數以內,由此也會出現相同的缺點,所有并行運算依賴同一個線程池,可能會導致大量任務大耗時或者大阻塞:
This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all share the same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get "all the processors" assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlocker to circumvent that.)
這一點我們也可以在ForkJoinPool的靜態代碼塊中:
static {
// initialize field offsets for CAS etc
try {
//......
//調用makeCommonPool完成線程池創建和初始化
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
對應的我們步入makeCommonPool方法即可看到線程池的創建邏輯,即判斷用戶是否有通過java.util.concurrent.ForkJoinPool.common.parallelism指定線程數,若沒有則按照CPU核心數完成初始化:
private static ForkJoinPool makeCommonPool() {
//......
try { // ignore exceptions in accessing/parsing properties
//獲取用戶對于common線程池中線程數的配置
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
if (pp != null)
parallelism = Integer.parseInt(pp);
//......
} catch (Exception ignore) {
}
//......
//若小于parallelism小于0則說明用戶沒有指定,則直接按照CPU核心數創建線程池
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
//基于CPU核心數創建 ForkJoinPool線程池
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
三、解決方案
很明顯,對于該問題就是因為多個并行運算跑到了單個線程池中,我們的解決方式無非是以下幾種:
- 提升線程池線程數量已處理更多的并發運算。
- 業務上避免大量并發運算去競爭common線程池。
本著盡可能用簡單的方式達到低成本解決復雜問題的原則,結合業務場景來看,這段代碼的使用更多是計算密集型任務,通過java.util.concurrent.ForkJoinPool.common.parallelism去提升線程數并不會帶來提升,所以在筆者結合業務場景通過壓測計算出每個定時任務的耗時,大約是5分鐘,所以筆者通過調整定時任務的cron表達式由原來的3min改為5min保證任務錯峰執行解決該問題: