并發編程之ForkJoin框架原理分析
前言
前面我們介紹了線程池框架(ExecutorService)的兩個具體實現:
- ThreadPoolExecutor 默認線程池
- ScheduledThreadPoolExecutor定時線程池
線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對多個任務重用線程,線程創建的開銷被分攤到多個任務上。Java7 又提供了的一個用于并行執行的任務的框架 Fork/Join ,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。在介紹Fork/Join 框架之前我們先了解幾個概念:CPU密集型、IO密集型,再逐步深入去認識Fork/Join 框架。
任務性質類型
CPU密集型(CPU bound)
CPU密集型也叫計算密集型,指的是系統的硬盤、內存性能相對于CPU要好很好多,此時,系統運作大部分的狀況是 CPU Loading 100%,CPU要讀/寫 I/O(硬盤/內存),I/O在很短的時間就可以完成,而CPU還有許多運算要處理,CPU Loading很高。
在多重程序系統中,大部分時間用來做計算、邏輯判斷等CPU動作的程序稱之 CPU bound。例如一個計算圓周率至小數點一千位以下的程序,在執行的過程當中絕大部分時間在用三角函數和開根號的計算,便是屬于CPU bound的程序。
CPU bound的程序一般而言CPU占用率相當高。這可能是因為任務本身不太需要訪問I/O設備,也可能是因為程序是多線程實現因此屏蔽了等待I/O的時間。
- 線程數一般設置為:線程數 = CPU核數 + 1(現代CPU支持超線程)
IO密集型(I/O bound)
I/O密集型指的是系統的CPU性能相對硬盤、內存要好很多,此時,系統運作,大部分的狀況是 CPU 在等 I/O(硬盤/內存)的讀/寫操作,此時 CPU Loading 并不高。
I/O bound的程序一般在達到性能極限時,CPU占用率仍然較低。這可能是因為任務本身需要大量I/O操作,而 pipeline 做的不是很好,沒有充分利用處理器能力。
- 線程數一般設置為:線程數 = ((線程等待時間 + 線程CPU時間) / 線程CPU時間) * CPU數目
CPU密集型 VS I/O密集型
我們可以把任務分為計算密集型和I/O密集型。
計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等于CPU的核心數。
計算密集型任務由于主要消耗CPU資源,因此,代碼運行效率至關重要。Python這樣的腳本語言運行效率很低,完全不適合計算密集型任務。對于計算密集型任務,最好用C語言編寫。
第二種任務的類型是I/O密集型,涉及到網絡、磁盤I/O的任務都是I/O密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待I/O操作完成(因為I/O的速度遠遠低于CPU和內存的速度)。對于I/O密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是I/O密集型任務,比如Web應用。
I/O密集型任務執行期間,99%的時間都花在I/O上,花在CPU上的時間很少,因此,用運行速度極快的C語言替換用Python這樣運行速度極低的腳本語言,完全無法提升運行效率。對于I/O密集型任務,最合適的語言就是開發效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。
什么是 Fork/Join 框架?
Fork/Join 框架是 Java7 提供了的一個用于并行執行的任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。
Fork 就是把一個大任務切分為若干個子任務并行的執行,Join 就是合并這些子任務的執行結果,最后得到這個大任務的結果。比如計算 1+2+......+10000,可以分割成10個子任務,每個子任務對1000個數進行求和,最終匯總這10個子任務的結果。如下圖所示:

Fork/Join的特性:
- ForkJoinPool 不是為了替代 ExecutorService,而是它的補充,在某些應用場景下性能比 ExecutorService 更好。(見 Java Tip: When to use ForkJoinPool vs ExecutorService )
- ForkJoinPool 主要用于實現“分而治之”的算法,特別是分治之后遞歸調用的函數,例如 quick sort 等;
- ForkJoinPool 最適合的是計算密集型的任務,如果存在 I/O、線程間同步、sleep() 等會造成線程長時間阻塞的情況時,最好配合 MangedBlocker。
關于“分而治之”的算法,可以查看《分治、回溯的實現和特性》
工作竊取算法
工作竊取(work-stealing)算法 是指某個線程從其他隊列里竊取任務來執行。
我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,于是把這些子任務分別放到不同的隊列里,并為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。
但是有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

工作竊取算法的優點是充分利用線程進行并行計算,并減少了線程間的競爭,其缺點是在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。并且消耗了更多的系統資源,比如創建多個線程和多個雙端隊列。

- ForkJoinPool 的每個工作線程都維護著一個工作隊列(WorkQueue),這是一個雙端隊列(Deque),里面存放的對象是任務(ForkJoinTask)。
- 每個工作線程在運行中產生新的任務(通常是因為調用了 fork())時,會放入工作隊列的隊尾,并且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。
- 每個工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(或是來自于剛剛提交到 pool 的任務,或是來自于其他工作線程的工作隊列),竊取的任務位于其他線程的工作隊列的隊首,也就是說工作線程在竊取其他工作線程的任務時,使用的是 FIFO 方式。
- 在遇到 join() 時,如果需要 join 的任務尚未完成,則會先處理其他任務,并等待其完成。
- 在既沒有自己的任務,也沒有可以竊取的任務時,進入休眠。
Fork/Join的使用
使用場景示例
定義fork/join任務,如下示例,隨機生成2000w條數據在數組當中,然后求和_
- package com.niuh.forkjoin.recursivetask;
- import java.util.concurrent.RecursiveTask;
- /**
- * RecursiveTask 并行計算,同步有返回值
- * ForkJoin框架處理的任務基本都能使用遞歸處理,比如求斐波那契數列等,但遞歸算法的缺陷是:
- * 一只會只用單線程處理,
- * 二是遞歸次數過多時會導致堆棧溢出;
- * ForkJoin解決了這兩個問題,使用多線程并發處理,充分利用計算資源來提高效率,同時避免堆棧溢出發生。
- * 當然像求斐波那契數列這種小問題直接使用線性算法搞定可能更簡單,實際應用中完全沒必要使用ForkJoin框架,
- * 所以ForkJoin是核彈,是用來對付大家伙的,比如超大數組排序。
- * 最佳應用場景:多核、多內存、可以分割計算再合并的計算密集型任務
- */
- class LongSum extends RecursiveTask<Long> {
- //任務拆分的最小閥值
- static final int SEQUENTIAL_THRESHOLD = 1000;
- static final long NPS = (1000L * 1000 * 1000);
- static final boolean extraWork = true; // change to add more than just a sum
- int low;
- int high;
- int[] array;
- LongSum(int[] arr, int lo, int hi) {
- array = arr;
- low = lo;
- high = hi;
- }
- /**
- * fork()方法:將任務放入隊列并安排異步執行,一個任務應該只調用一次fork()函數,除非已經執行完畢并重新初始化。
- * tryUnfork()方法:嘗試把任務從隊列中拿出單獨處理,但不一定成功。
- * join()方法:等待計算完成并返回計算結果。
- * isCompletedAbnormally()方法:用于判斷任務計算是否發生異常。
- */
- protected Long compute() {
- if (high - low <= SEQUENTIAL_THRESHOLD) {
- long sum = 0;
- for (int i = low; i < high; ++i) {
- sum += array[i];
- }
- return sum;
- } else {
- int mid = low + (high - low) / 2;
- LongSum left = new LongSum(array, low, mid);
- LongSum right = new LongSum(array, mid, high);
- left.fork();
- right.fork();
- long rightAns = right.join();
- long leftAns = left.join();
- return leftAns + rightAns;
- }
- }
- }
執行fork/join任務
- package com.niuh.forkjoin.recursivetask;
- import com.niuh.forkjoin.utils.Utils;
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.ForkJoinTask;
- public class LongSumMain {
- //獲取邏輯處理器數量
- static final int NCPU = Runtime.getRuntime().availableProcessors();
- /**
- * for time conversion
- */
- static final long NPS = (1000L * 1000 * 1000);
- static long calcSum;
- static final boolean reportSteals = true;
- public static void main(String[] args) throws Exception {
- int[] array = Utils.buildRandomIntArray(2000000);
- System.out.println("cpu-num:" + NCPU);
- //單線程下計算數組數據總和
- long start = System.currentTimeMillis();
- calcSum = seqSum(array);
- System.out.println("seq sum=" + calcSum);
- System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));
- start = System.currentTimeMillis();
- //采用fork/join方式將數組求和任務進行拆分執行,最后合并結果
- LongSum ls = new LongSum(array, 0, array.length);
- ForkJoinPool fjp = new ForkJoinPool(NCPU); //使用的線程數
- ForkJoinTask<Long> task = fjp.submit(ls);
- System.out.println("forkjoin sum=" + task.get());
- System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));
- if (task.isCompletedAbnormally()) {
- System.out.println(task.getException());
- }
- fjp.shutdown();
- }
- static long seqSum(int[] array) {
- long sum = 0;
- for (int i = 0; i < array.length; ++i) {
- sum += array[i];
- }
- return sum;
- }
- }
Fork/Join框架原理
Fork/Join 其實就是指由ForkJoinPool作為線程池、ForkJoinTask(通常實現其三個抽象子類)為任務、ForkJoinWorkerThread作為執行任務的具體線程實體這三者構成的任務調度機制。

ForkJoinWorkerThread
ForkJoinWorkerThread 直接繼承了Thread,但是僅僅是為了增加一些額外的功能,并沒有對線程的調度執行做任何更改。

ForkJoinWorkerThread 是被ForkJoinPool管理的工作線程,在創建出來之后都被設置成為了守護線程,由它來執行ForkJoinTasks。該類主要為了維護創建線程實例時通過ForkJoinPool為其創建的任務隊列,與其他兩個線程池整個線程池只有一個任務隊列不同,ForkJoinPool管理的所有工作線程都擁有自己的工作隊列,為了實現任務竊取機制,該隊列被設計成一個雙端隊列,而ForkJoinWorkerThread的首要任務就是執行自己的這個雙端任務隊列中的任務,其次是竊取其他線程的工作隊列,以下是其代碼片段:
- public class ForkJoinWorkerThread extends Thread {
- // 這個線程工作的ForkJoinPool池
- final ForkJoinPool pool;
- // 這個線程擁有的工作竊取機制的工作隊列
- final ForkJoinPool.WorkQueue workQueue;
- //創建在給定ForkJoinPool池中執行的ForkJoinWorkerThread。
- protected ForkJoinWorkerThread(ForkJoinPool pool) {
- // Use a placeholder until a useful name can be set in registerWorker
- super("aForkJoinWorkerThread");
- this.pool = pool;
- //向ForkJoinPool執行池注冊當前工作線程,ForkJoinPool為其分配一個工作隊列
- this.workQueue = pool.registerWorker(this);
- }
- //該工作線程的執行內容就是執行工作隊列中的任務
- public void run() {
- if (workQueue.array == null) { // only run once
- Throwable exception = null;
- try {
- onStart();
- pool.runWorker(workQueue); //執行工作隊列中的任務
- } catch (Throwable ex) {
- exception = ex; //記錄異常
- } finally {
- try {
- onTermination(exception);
- } catch (Throwable ex) {
- if (exception == null)
- exception = ex;
- } finally {
- pool.deregisterWorker(this, exception); //撤銷工作
- }
- }
- }
- }
- .....
- }
ForkJoinTask
ForkJoinTask :與FutureTask一樣, ForkJoinTask也是Future的子類,不過它是一個抽象類。

ForkJoinTask :我們要使用 ForkJoin 框架,必須首先創建一個 ForkJoin 任務。它提供在任務中執行 fork() 和 join() 操作的機制,通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,Fork/Join框架提供類以下幾個子類:
- RecursiveAction:用于沒有返回結果的任務。(比如寫數據到磁盤,然后就退出。一個 RecursiveAvtion 可以把直接的工作分割成更小的幾塊,這樣它們可以由獨立的線程或者 CPU 執行。我們可以通過繼承來實現一個 RecusiveAction)
- RescursiveTask:用于有返回結果的任務。(可以將自己的工作分割為若干更小任務,并將這些子任務的執行合并到一個集體結果。可以有幾個水平的分割和合并)
- CountedCompleter :在任務完成執行后會觸發執行一個自定義的鉤子函數。
常量介紹
ForkJoinTask 有一個int類型的status字段:
- 其高16位存儲任務執行狀態例如NORMAL、CANCELLED或EXCEPTIONAL
- 低16位預留用于用戶自定義的標記。
任務未完成之前status大于等于0,完成之后就是NORMAL、CANCELLED或EXCEPTIONAL這幾個小于0的值,這幾個值也是按大小順序的:0(初始狀態) > NORMAL > CANCELLED > EXCEPTIONAL.
- public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
- /** 該任務的執行狀態 */
- volatile int status; // accessed directly by pool and workers
- static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
- static final int NORMAL = 0xf0000000; // must be negative
- static final int CANCELLED = 0xc0000000; // must be < NORMAL
- static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
- static final int SIGNAL = 0x00010000; // must be >= 1 << 16
- static final int SMASK = 0x0000ffff; // short bits for tags
- // 異常哈希表
- //被任務拋出的異常數組,為了報告給調用者。因為異常很少見,所以我們不直接將它們保存在task對象中,而是使用弱引用數組。注意,取消異常不會出現在數組,而是記錄在statue字段中
- //注意這些都是 static 類屬性,所有的ForkJoinTask共用的。
- private static final ExceptionNode[] exceptionTable; //異常哈希鏈表數組
- private static final ReentrantLock exceptionTableLock;
- private static final ReferenceQueue<Object> exceptionTableRefQueue; //在ForkJoinTask被GC回收之后,相應的異常節點對象的引用隊列
- /**
- * 固定容量的exceptionTable.
- */
- private static final int EXCEPTION_MAP_CAPACITY = 32;
- //異常數組的鍵值對節點。
- //該哈希鏈表數組使用線程id進行比較,該數組具有固定的容量,因為它只維護任務異常足夠長,以便參與者訪問它們,所以在持續的時間內不應該變得非常大。但是,由于我們不知道最后一個joiner何時完成,我們必須使用弱引用并刪除它們。我們對每個操作都這樣做(因此完全鎖定)。此外,任何ForkJoinPool池中的一些線程在其池變為isQuiescent時都會調用helpExpungeStaleExceptions
- static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
- final Throwable ex;
- ExceptionNode next;
- final long thrower; // 拋出異常的線程id
- final int hashCode; // 在弱引用消失之前存儲hashCode
- ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
- super(task, exceptionTableRefQueue); //在ForkJoinTask被GC回收之后,會將該節點加入隊列exceptionTableRefQueue
- this.ex = ex;
- this.next = next;
- this.thrower = Thread.currentThread().getId();
- this.hashCode = System.identityHashCode(task);
- }
- }
- .................
- }
除了status記錄任務的執行狀態之外,其他字段主要是為了對任務執行的異常的處理,ForkJoinTask采用了哈希數組 + 鏈表的數據結構(JDK8以前的HashMap實現方法)存放所有(因為這些字段是static)的ForkJoinTask任務的執行異常。
fork 方法(安排任務異步執行)
fork() 做的工作只有一件事,既是把任務推入當前工作線程的工作隊列里(安排任務異步執行)。可以參看以下的源代碼:
- public final ForkJoinTask<V> fork() {
- Thread t;
- if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
- ((ForkJoinWorkerThread)t).workQueue.push(this);
- else
- ForkJoinPool.common.externalPush(this);
- return this;
- }
該方法其實就是將任務通過push方法加入到當前工作線程的工作隊列或者提交隊列(外部非ForkJoinWorkerThread線程通過submit、execute方法提交的任務),等待被線程池調度執行,這是一個非阻塞的立即返回方法。
- 這里需要知道,ForkJoinPool線程池通過哈希數組+雙端隊列的方式將所有的工作線程擁有的任務隊列和從外部提交的任務分別映射到哈希數組的不同槽位上。
join 方法(等待執行結果)
join() 的工作則復雜得多,也是 join() 可以使得線程免于被阻塞的原因——不像同名的 Thread.join()。
- 檢查調用 join() 的線程是否是 ForkJoinThread 線程。如果不是(例如 main 線程),則阻塞當前線程,等待任務完成。如果是,則不阻塞。
- 查看任務的完成狀態,如果已經完成,直接返回結果。
- 如果任務尚未完成,但處于自己的工作隊列內,則完成它。
- 如果任務已經被其他的工作線程偷走,則竊取這個小偷的工作隊列內的任務(以 FIFO 方式),執行,以期幫助它早日完成 join 的任務。
- 如果偷走任務的小偷也已經把自己的任務全部做完,正在等待需要 join 的任務時,則找到小偷的小偷,幫助它完成它的任務。
- 遞歸地執行第5步。
將上述流程畫成序列圖的話就是這個樣子:
由于文章篇幅有限,源碼分析請查看文章末尾的“了解更多”
小結
通常ForkJoinTask只適用于非循環依賴的純函數的計算或孤立對象的操作,否則,執行可能會遇到某種形式的死鎖,因為任務循環地等待彼此。但是,這個框架支持其他方法和技術(例如使用Phaser、helpQuiesce和complete),這些方法和技術可用于構造解決這種依賴任務的ForkJoinTask子類,為了支持這些用法,可以使用setForkJoinTaskTag或compareAndSetForkJoinTaskTag原子性地標記一個short類型的值,并使用getForkJoinTaskTag進行檢查。ForkJoinTask實現沒有將這些受保護的方法或標記用于任何目的,但是它們可以用于構造專門的子類,由此可以使用提供的方法來避免重新訪問已經處理過的節點/任務。
ForkJoinTask應該執行相對較少的計算,并且應該避免不確定的循環。大任務應該被分解成更小的子任務,通常通過遞歸分解。如果任務太大,那么并行性就不能提高吞吐量。如果太小,那么內存和內部任務維護開銷可能會超過處理開銷。
ForkJoinTask是可序列化的,這使它們能夠在諸如遠程執行框架之類的擴展中使用。只在執行之前或之后序列化任務才是明智的,而不是在執行期間。
ForkJoinPool
ForkJoinPool:ForkJoinTask 需要通過 ForkJoinPool 來執行,任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務。

常量介紹
ForkJoinPool 與 內部類 WorkQueue 共享的一些常量
- // Constants shared across ForkJoinPool and WorkQueue
- // 限定參數
- static final int SMASK = 0xffff; // 低位掩碼,也是最大索引位
- static final int MAX_CAP = 0x7fff; // 工作線程最大容量
- static final int EVENMASK = 0xfffe; // 偶數低位掩碼
- static final int SQMASK = 0x007e; // workQueues 數組最多64個槽位
- // ctl 子域和 WorkQueue.scanState 的掩碼和標志位
- static final int SCANNING = 1; // 標記是否正在運行任務
- static final int INACTIVE = 1 << 31; // 失活狀態 負數
- static final int SS_SEQ = 1 << 16; // 版本戳,防止ABA問題
- // ForkJoinPool.config 和 WorkQueue.config 的配置信息標記
- static final int MODE_MASK = 0xffff << 16; // 模式掩碼
- static final int LIFO_QUEUE = 0; // LIFO隊列
- static final int FIFO_QUEUE = 1 << 16; // FIFO隊列
- static final int SHARED_QUEUE = 1 << 31; // 共享模式隊列,負數 ForkJoinPool 中的相關常量和實例字段:
ForkJoinPool 中的相關常量和實例字段
- // 低位和高位掩碼
- private static final long SP_MASK = 0xffffffffL;
- private static final long UC_MASK = ~SP_MASK;
- // 活躍線程數
- private static final int AC_SHIFT = 48;
- private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活躍線程數增量
- private static final long AC_MASK = 0xffffL << AC_SHIFT; //活躍線程數掩碼
- // 工作線程數
- private static final int TC_SHIFT = 32;
- private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工作線程數增量
- private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩碼
- private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // 創建工作線程標志
- // 池狀態
- private static final int RSLOCK = 1;
- private static final int RSIGNAL = 1 << 1;
- private static final int STARTED = 1 << 2;
- private static final int STOP = 1 << 29;
- private static final int TERMINATED = 1 << 30;
- private static final int SHUTDOWN = 1 << 31;
- // 實例字段
- volatile long ctl; // 主控制參數
- volatile int runState; // 運行狀態鎖
- final int config; // 并行度|模式
- int indexSeed; // 用于生成工作線程索引
- volatile WorkQueue[] workQueues; // 主對象注冊信息,workQueue
- final ForkJoinWorkerThreadFactory factory;// 線程工廠
- final UncaughtExceptionHandler ueh; // 每個工作線程的異常信息
- final String workerNamePrefix; // 用于創建工作線程的名稱
- volatile AtomicLong stealCounter; // 偷取任務總數,也可作為同步監視器
- /** 靜態初始化字段 */
- //線程工廠
- public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
- //啟動或殺死線程的方法調用者的權限
- private static final RuntimePermission modifyThreadPermission;
- // 公共靜態pool
- static final ForkJoinPool common;
- //并行度,對應內部common池
- static final int commonParallelism;
- //備用線程數,在tryCompensate中使用
- private static int commonMaxSpares;
- //創建workerNamePrefix(工作線程名稱前綴)時的序號
- private static int poolNumberSequence;
- //線程阻塞等待新的任務的超時值(以納秒為單位),默認2秒
- private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
- //空閑超時時間,防止timer未命中
- private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms
- //默認備用線程數
- private static final int DEFAULT_COMMON_MAX_SPARES = 256;
- //阻塞前自旋的次數,用在在awaitRunStateLock和awaitWork中
- private static final int SPINS = 0;
- //indexSeed的增量
- private static final int SEED_INCREMENT = 0x9e3779b9;
ForkJoinPool 的內部狀態都是通過一個64位的 long 型 變量ctl來存儲,它由四個16位的子域組成:
- AC: 正在運行工作線程數減去目標并行度,高16位
- TC: 總工作線程數減去目標并行度,中高16位
- SS: 棧頂等待線程的版本計數和狀態,中低16位
- ID: 棧頂 WorkQueue 在池中的索引(poolIndex),低16位
ForkJoinPool.WorkQueue 中的相關屬性:
- //初始隊列容量,2的冪
- static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
- //最大隊列容量
- static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
- // 實例字段
- volatile int scanState; // Woker狀態, <0: inactive; odd:scanning
- int stackPred; // 記錄前一個棧頂的ctl
- int nsteals; // 偷取任務數
- int hint; // 記錄偷取者索引,初始為隨機索引
- int config; // 池索引和模式
- volatile int qlock; // 1: locked, < 0: terminate; else 0
- volatile int base; // 下一個poll操作的索引(棧底/隊列頭)
- int top; // 一個push操作的索引(棧頂/隊列尾)
- ForkJoinTask<?>[] array; // 任務數組
- final ForkJoinPool pool; // the containing pool (may be null)
- final ForkJoinWorkerThread owner; // 當前工作隊列的工作線程,共享模式下為null
- volatile Thread parker; // 調用park阻塞期間為owner,其他情況為null
- volatile ForkJoinTask<?> currentJoin; // 記錄被join過來的任務
- volatile ForkJoinTask<?> currentSteal; // 記錄從其他工作隊列偷取過來的任務
內部數據結構
ForkJoinPool采用了哈希數組 + 雙端隊列的方式存放任務,但這里的任務分為兩類:
- 一類是通過execute、submit 提交的外部任務
- 另一類是ForkJoinWorkerThread工作線程通過fork/join分解出來的工作任務
ForkJoinPool并沒有把這兩種任務混在一個任務隊列中,對于外部任務,會利用Thread內部的隨機probe值映射到哈希數組的偶數槽位中的提交隊列中,這種提交隊列是一種數組實現的雙端隊列稱之為Submission Queue,專門存放外部提交的任務。
對于ForkJoinWorkerThread工作線程,每一個工作線程都分配了一個工作隊列,這也是一個雙端隊列,稱之為Work Queue,這種隊列都會被映射到哈希數組的奇數槽位,每一個工作線程fork/join分解的任務都會被添加到自己擁有的那個工作隊列中。
在ForkJoinPool中的屬性 WorkQueue[] workQueues 就是我們所說的哈希數組,其元素就是內部類WorkQueue實現的基于數組的雙端隊列。該哈希數組的長度為2的冪,并且支持擴容。如下就是該哈希數組的示意結構圖:
如圖,提交隊列位于哈希數組workQueue的奇數索引槽位,工作線程的工作隊列位于偶數槽位。
- 默認情況下,asyncMode為false時:因此工作線程把工作隊列當著棧一樣使用(后進先出),將分解的子任務推入工作隊列的top端,取任務的時候也從top端取(凡是雙端隊列都會有兩個分別指向隊列兩端的指針,這里就是圖上畫出的base和top);而當某些工作線程的任務為空的時候,就會從其他隊列(不限于workQueue,也會是提交隊列)竊取(steal)任務,如圖示擁有workQueue2的工作線程從workQueue1中竊取了一個任務,竊取任務的時候采用的是先進先出FIFO的策略(即從base端竊取任務),這樣不但可以避免在取任務的時候與擁有其隊列的工作線程發生沖突,從而減小競爭,還可以輔助其完成比較大的任務。
- asyncMode為true的話,擁有該工作隊列的工作線程將按照先進先出的策略從base端取任務,這一般只用于不需要返回結果的任務,或者事件消息傳遞框架。
ForkJoinPool構造函數
其完整構造方法如下
- private ForkJoinPool(int parallelism,
- ForkJoinWorkerThreadFactory factory,
- UncaughtExceptionHandler handler,
- int mode,
- String workerNamePrefix) {
- this.workerNamePrefix = workerNamePrefix;
- this.factory = factory;
- this.ueh = handler;
- this.config = (parallelism & SMASK) | mode;
- long np = (long)(-parallelism); // offset ctl counts
- this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
- }
重要參數解釋
- parallelism:并行度( the parallelism level),默認情況下跟我們機器的cpu個數保持一致,使用 Runtime.getRuntime().availableProcessors()可以得到我們機器運行時可用的CPU個數。
- factory:創建新線程的工廠( the factory for creating new threads)。默認情況下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory。
- handler:線程異常情況下的處理器(Thread.UncaughtExceptionHandler handler),該處理器在線程執行任務時由于某些無法預料到的錯誤而導致任務線程中斷時進行一些處理,默認情況為null。
- asyncMode:這個參數要注意,在ForkJoinPool中,每一個工作線程都有一個獨立的任務隊列
- asyncMode表示工作線程內的任務隊列是采用何種方式進行調度,可以是先進先出FIFO,也可以是后進先出LIFO。如果為true,則線程池中的工作線程則使用先進先出方式進行任務調度,默認情況下是false。
ForkJoinPool.submit 方法
- public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
- if (task == null)
- throw new NullPointerException();
- //提交到工作隊列
- externalPush(task);
- return task;
- }
ForkJoinPool 自身擁有工作隊列,這些工作隊列的作用是用來接收由外部線程(非 ForkJoinThread 線程)提交過來的任務,而這些工作隊列被稱為 submitting queue 。 submit() 和 fork() 其實沒有本質區別,只是提交對象變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作線程”竊取“的對象,因此當其中的任務被一個工作線程成功竊取時,就意味著提交的任務真正開始進入執行階段。
PS:以上代碼提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git