ParallelStream的坑,不踩不知道,一踩嚇一跳
本文轉載自微信公眾號「小姐姐味道」,作者小姐姐養的狗 。轉載本文請聯系小姐姐味道公眾號。
很多同學喜歡使用lambda表達式,它允許你定義短小精悍的函數,體現你高超的編碼水平。當然,這個功能在某些以代碼行數來衡量工作量的公司來說,就比較吃虧一些。
比如下面的代碼片段,讓人閱讀的時候就像是讀詩一樣。但是一旦用不好,也是會要命的。
- List<Integer> transactionsIds =
- widgets.stream()
- .filter(b -> b.getColor() == RED)
- .sorted((x,y) -> x.getWeight() - y.getWeight())
- .mapToInt(Widget::getWeight)
- .sum();
這段代碼有一個關鍵的函數,那就是stream。通過它,可以將一個普通的list,轉化為流,然后就可以使用類似于管道的方式對list進行操作。總之,用過的都說好。
對這些函數還不是太熟悉?可以參考:《到處是map、flatMap,啥意思?》
問題來了
假如我們把stream換成parallelStream,會發生什么情況?
根據字面上的意思,流會從串行 變成并行。
既然是并行,那用屁股想一想,就知道這里面肯定會有線程安全問題。不過我們這里討論的并不是要你使用線程安全的集合,這個話題太低級。現階段,知道在線程不安全的環境中使用線程安全的集合,已經是一個基本的技能。
這次踩坑的地方,是并行流的性能問題。
我們用代碼來說話。
下面的代碼,開啟了8個線程,這8個線程都在使用并行流進行數據計算。在執行的邏輯中,我們讓每個任務都sleep 1秒鐘,這樣就能夠模擬一些I/O請求的耗時等待。
使用stream,程序會在30秒后返回,但我們期望程序能夠在1秒多返回,因為它是并行流,得對得起這個稱號。
測試發現,我們等了好久,任務才執行完畢。
- static void paralleTest() {
- List<Integer> numbers = Arrays.asList(
- 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
- 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
- 20, 21, 22, 23, 24, 25, 26, 27, 28, 29
- );
- final long begin = System.currentTimeMillis();
- numbers.parallelStream().map(k -> {
- try {
- Thread.sleep(1000);
- System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return k;
- }).collect(Collectors.toList());
- }
- public static void main(String[] args) {
- // System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
- new Thread(() -> paralleTest()).start();
- new Thread(() -> paralleTest()).start();
- new Thread(() -> paralleTest()).start();
- new Thread(() -> paralleTest()).start();
- new Thread(() -> paralleTest()).start();
- new Thread(() -> paralleTest()).start();
- new Thread(() -> paralleTest()).start();
- new Thread(() -> paralleTest()).start();
- }
坑
實際上,在不同的機器上執行,這段代碼花費的時間都不一樣。
既然是并行,那肯定得有個并行度。太低了,體現不到并行的能能力;太大了,又浪費了上下文切換的時間。我是很沮喪的發現,很多高級研發,將線程池的各種參數背的滾瓜爛熟,各種調優,竟然敢睜一只眼閉一只眼的在I/O密集型業務中用上parallelStream。
要了解這個并行度,我們需要查看具體的構造方法。在ForkJoinPool類中找到這樣的代碼。
- try { // ignore exceptions in accessing/parsing properties
- String pp = System.getProperty
- ("java.util.concurrent.ForkJoinPool.common.parallelism");
- if (pp != null)
- parallelism = Integer.parseInt(pp);
- fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
- "java.util.concurrent.ForkJoinPool.common.threadFactory");
- handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
- "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
- } catch (Exception ignore) {
- }
- if (fac == null) {
- if (System.getSecurityManager() == null)
- fac = defaultForkJoinWorkerThreadFactory;
- else // use security-managed default
- fac = new InnocuousForkJoinWorkerThreadFactory();
- }
- if (parallelism < 0 && // default 1 less than #cores
- (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
- parallelism = 1;
- if (parallelism > MAX_CAP)
- parallelism = MAX_CAP;
可以看到,并行度到底是多少,是由下面的參數來控制的。如果無法獲取這個參數,則默認使用 CPU個數-1 的并行度。
可以看到,這個函數是為了計算密集型業務去設計的。如果你喂給它一大堆任務,它就會由并行執行退變成類似于串行的效果。
- -Djava.util.concurrent.ForkJoinPool.common.parallelism=N
即使你使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=N設置了一個初始值大小,它依然有問題。
因為,parallelism這個變量是final的,一旦設定,不允許修改。也就是說,上面的參數只會生效一次。
張三可能使用下面的代碼,設置了并行度大小為20。
- System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
李四可能用同樣的方式,設置了這個值為30。那實際在項目中用的是哪個值,那就得問JVM是怎么加載的類信息了。
這種方式并不太非常靠譜。
一種解決方式
我們可以通過提供外置的forkjoinpool,也就是改變提交方式,來實現不同類型的任務分離。
代碼如下所示,通過顯式的代碼提交,即可實現任務分離。
- ForkJoinPool pool = new ForkJoinPool(30);
- final long begin = System.currentTimeMillis();
- try {
- pool.submit(() ->
- numbers.parallelStream().map(k -> {
- try {
- Thread.sleep(1000);
- System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return k;
- }).collect(Collectors.toList())).get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
這樣,不同的場景,就可以擁有不同的并行度。這種方式和CountDownLatch有異曲同工之妙,我們需要手動管理資源。
使用了這種方式,代碼量增加,已經和優雅關系不大了,不僅不優雅,而且丑的要命。白天鵝變成了丑小鴨,你還會愛它么?
作者簡介:小姐姐味道 (xjjdog),一個不允許程序員走彎路的公眾號。聚焦基礎架構和Linux。十年架構,日百億流量,與你探討高并發世界,給你不一樣的味道。我的個人微信xjjdog0,歡迎添加好友,進一步交流。