JDK 7中的 Fork/Join模式
介 紹
隨著多核芯片逐漸成為主流,大多數軟件開發人員不可避免地需要了解并行編程的知識。而同時,主流程序語言正在將越來越多的并行特性合并到標準庫或者語言本身之中。我們可以看到,JDK 在這方面同樣走在潮流的前方。在 JDK 標準版 5 中,由 Doug Lea 提供的并行框架成為了標準庫的一部分(JSR-166)。隨后,在 JDK 6 中,一些新的并行特性,例如并行 collection 框架,合并到了標準庫中(JSR-166x)。直到今天,盡管 Java SE 7 還沒有正式發布,一些并行相關的新特性已經出現在 JSR-166y 中:
1.Fork/Join 模式;
2.TransferQueue,它繼承自 BlockingQueue 并能在隊列滿時阻塞“生產者”;
3.ArrayTasks/ListTasks,用于并行執行某些數組/列表相關任務的類;
4.IntTasks/LongTasks/DoubleTasks,用于并行處理數字類型數組的工具類,提供了排序、查找、求和、求最小值、求最大值等功能;
其中,對 Fork/Join 模式的支持可能是對開發并行軟件來說最通用的新特性。在 JSR-166y 中,Doug Lea 實現ArrayTasks/ListTasks/IntTasks/LongTasks/DoubleTasks 時就大量的用到了 Fork/Join 模式。讀者還需要注意一點,因為 JDK 7 還沒有正式發布,因此本文涉及到的功能和發布版本有可能不一樣。
Fork/Join 模式有自己的適用范圍。如果一個應用能被分解成多個子任務,并且組合多個子任務的結果就能夠獲得最終的答案,那么這個應用就適合用 Fork/Join 模式來解決。圖 1 給出了一個 Fork/Join 模式的示意圖,位于圖上部的 Task 依賴于位于其下的 Task 的執行,只有當所有的子任務都完成之后,調用者才能獲得 Task 0 的返回結果。
圖 1. Fork/Join 模式示意圖
可以說,Fork/Join 模式能夠解決很多種類的并行問題。通過使用 Doug Lea 提供的 Fork/Join 框架,軟件開發人員只需要關注任務的劃分和中間結果的組合就能充分利用并行平臺的優良性能。其他和并行相關的諸多難于處理的問題,例如負載平衡、同步等,都可以由框架采用統一的方式解決。這樣,我們就能夠輕松地獲得并行的好處而避免了并行編程的困難且容易出錯的缺點。
使用 Fork/Join 模式
在開始嘗試 Fork/Join 模式之前,我們需要從 Doug Lea 主持的 Concurrency JSR-166 Interest Site 上下載 JSR-166y 的源代碼,并且我們還需要安裝最新版本的 JDK 6(下載網址請參閱 參考資源)。Fork/Join 模式的使用方式非常直觀。首先,我們需要編寫一個 ForkJoinTask 來完成子任務的分割、中間結果的合并等工作。隨后,我們將這個 ForkJoinTask 交給 ForkJoinPool 來完成應用的執行。
通常我們并不直接繼承 ForkJoinTask,它包含了太多的抽象方法。針對特定的問題,我們可以選擇 ForkJoinTask 的不同子類來完成任務。RecursiveAction 是 ForkJoinTask 的一個子類,它代表了一類最簡單的 ForkJoinTask:不需要返回值,當子任務都執行完畢之后,不需要進行中間結果的組合。如果我們從 RecursiveAction 開始繼承,那么我們只需要重載 protected void compute() 方法。下面,我們來看看怎么為快速排序算法建立一個 ForkJoinTask 的子類:
清單 1. ForkJoinTask 的子類
- classSortTaskextendsRecursiveAction{
- finallong[]array;
- finalintlo;
- finalinthi;
- privateintTHRESHOLD=30;
- publicSortTask(long[]array){
- this.array=array;
- this.lo=0;
- this.hi=array.length-1;
- }
- publicSortTask(long[]array,intlo,inthi){
- this.array=array;
- this.lo=lo;
- this.hi=hi;
- }
- protectedvoidcompute(){
- if(hi-lo<THRESHOLD)
- sequentiallySort(array,lo,hi);
- else{
- intpivot=partition(array,lo,hi);
- coInvoke(newSortTask(array,lo,pivot-1),newSortTask(array,
- pivot+1,hi));
- }
- }
- privateintpartition(long[]array,intlo,inthi){
- longx=array[hi];
- inti=lo-1;
- for(intj=lo;j<hi;j++){
- if(array[j]<=x){
- i++;
- swap(array,i,j);
- }
- }
- swap(array,i+1,hi);
- returni+1;
- }
- privatevoidswap(long[]array,inti,intj){
- if(i!=j){
- longtemp=array[i];
- array[i]=array[j];
- array[j]=temp;
- }
- }
- privatevoidsequentiallySort(long[]array,intlo,inthi){
- Arrays.sort(array,lo,hi+1);
- }
- }
在清單1中,SortTask 首先通過 partition() 方法將數組分成兩個部分。隨后,兩個子任務將被生成并分別排序數組的兩個部分。當子任務足夠小時,再將其分割為更小的任務反而引起性能的降低。因此,這里我們使用一個 THRESHOLD,限定在子任務規模較小時,使用直接排序,而不是再將其分割成為更小的任務。其中,我們用到了 RecursiveAction 提供的方法 coInvoke()。它表示:啟動所有的任務,并在所有任務都正常結束后返回。如果其中一個任務出現異常,則其它所有的任務都取消。coInvoke() 的參數還可以是任務的數組。
現在剩下的工作就是將 SortTask 提交到 ForkJoinPool 了。ForkJoinPool() 默認建立具有與 CPU 可使用線程數相等線程個數的線程池。我們在一個 JUnit 的 test 方法中將 SortTask 提交給一個新建的 ForkJoinPool:
清單 2. 新建的 ForkJoinPool
- @Test
- publicvoidtestSort()throwsException{
- ForkJoinTasksort=newSortTask(array);
- ForkJoinPoolfjpool=newForkJoinPool();
- fjpool.submit(sort);
- fjpool.shutdown();
- fjpool.awaitTermination(30,TimeUnit.SECONDS);
- assertTrue(checkSorted(array));
- }
在上面的代碼中,我們用到了 ForkJoinPool 提供的如下函數:
1. submit():將 ForkJoinTask 類的對象提交給 ForkJoinPool,ForkJoinPool 將立刻開始執行 ForkJoinTask。
2. shutdown():執行此方法之后,ForkJoinPool 不再接受新的任務,但是已經提交的任務可以繼續執行。如果希望立刻停止所有的任務,可以嘗試 shutdownNow() 方法。
3. awaitTermination():阻塞當前線程直到 ForkJoinPool 中所有的任務都執行結束。
并行快速排序的完整代碼如下所示:
清單 3. 并行快速排序的完整代碼
- packagetests;
- importstaticorg.junit.Assert.*;
- importjava.util.Arrays;
- importjava.util.Random;
- importjava.util.concurrent.TimeUnit;
- importjsr166y.forkjoin.ForkJoinPool;
- importjsr166y.forkjoin.ForkJoinTask;
- importjsr166y.forkjoin.RecursiveAction;
- importorg.junit.Before;
- importorg.junit.Test;
- classSortTaskextendsRecursiveAction{
- finallong[]array;
- finalintlo;
- finalinthi;
- privateintTHRESHOLD=0;//Fordemoonly
- publicSortTask(long[]array){
- this.array=array;
- this.lo=0;
- this.hi=array.length-1;
- }
- publicSortTask(long[]array,intlo,inthi){
- this.array=array;
- this.lo=lo;
- this.hi=hi;
- }
- protectedvoidcompute(){
- if(hi-lo<THRESHOLD)
- sequentiallySort(array,lo,hi);
- else{
- intpivot=partition(array,lo,hi);
- System.out.println(" pivot="+pivot+",low="+lo+",high="+hi);
- System.out.println("array"+Arrays.toString(array));
- coInvoke(newSortTask(array,lo,pivot-1),newSortTask(array,
- pivot+1,hi));
- }
- }
- privateintpartition(long[]array,intlo,inthi){
- longx=array[hi];
- inti=lo-1;
- for(intj=lo;j<hi;j++){
- if(array[j]<=x){
- i++;
- swap(array,i,j);
- }
- }
- swap(array,i+1,hi);
- returni+1;
- }
- privatevoidswap(long[]array,inti,intj){
- if(i!=j){
- longtemp=array[i];
- array[i]=array[j];
- array[j]=temp;
- }
- }
- privatevoidsequentiallySort(long[]array,intlo,inthi){
- Arrays.sort(array,lo,hi+1);
- }
- }
- publicclassTestForkJoinSimple{
- privatestaticfinalintNARRAY=16;//Fordemoonly
- long[]array=newlong[NARRAY];
- Randomrand=newRandom();
- @Before
- publicvoidsetUp(){
- for(inti=0;i<array.length;i++){
- array[i]=rand.nextLong()%100;//Fordemoonly
- }
- System.out.println("InitialArray:"+Arrays.toString(array));
- }
- @Test
- publicvoidtestSort()throwsException{
- ForkJoinTasksort=newSortTask(array);
- ForkJoinPoolfjpool=newForkJoinPool();
- fjpool.submit(sort);
- fjpool.shutdown();
- fjpool.awaitTermination(30,TimeUnit.SECONDS);
- assertTrue(checkSorted(array));
- }
- booleancheckSorted(long[]a){
- for(inti=0;i<a.length-1;i++){
- if(a[i]>(a[i+1])){
- returnfalse;
- }
- }
- returntrue;
- }
- }
運行以上代碼,我們可以得到以下結果:
- InitialArray:[46,-12,74,-67,76,-13,-91,-96]
- pivot=0,low=0,high=7
- array[-96,-12,74,-67,76,-13,-91,46]
- pivot=5,low=1,high=7
- array[-96,-12,-67,-13,-91,46,76,74]
- pivot=1,low=1,high=4
- array[-96,-91,-67,-13,-12,46,74,76]
- pivot=4,low=2,high=4
- array[-96,-91,-67,-13,-12,46,74,76]
- pivot=3,low=2,high=3
- array[-96,-91,-67,-13,-12,46,74,76]
- pivot=2,low=2,high=2
- array[-96,-91,-67,-13,-12,46,74,76]
- pivot=6,low=6,high=7
- array[-96,-91,-67,-13,-12,46,74,76]
- pivot=7,low=7,high=7
- array[-96,-91,-67,-13,-12,46,74,76]
#p#
Fork/Join 模式高級特性
使用 RecursiveTask
除了 RecursiveAction,Fork/Join 框架還提供了其他 ForkJoinTask 子類:帶有返回值的 RecursiveTask,使用 finish() 方法顯式中止的 AsyncAction 和 LinkedAsyncAction,以及可使用 TaskBarrier 為每個任務設置不同中止條件的 CyclicAction。
從 RecursiveTask 繼承的子類同樣需要重載 protected void compute() 方法。與 RecursiveAction 稍有不同的是,它可使用泛型指定一個返回值的類型。下面,我們來看看如何使用 RecursiveTask 的子類。
清單 4. RecursiveTask 的子類
- classFibonacciextendsRecursiveTask<Integer>{
- finalintn;
- Fibonacci(intn){
- this.n=n;
- }
- privateintcompute(intsmall){
- finalint[]results={1,1,2,3,5,8,13,21,34,55,89};
- returnresults[small];
- }
- publicIntegercompute(){
- if(n<=10){
- returncompute(n);
- }
- Fibonaccif1=newFibonacci(n-1);
- Fibonaccif2=newFibonacci(n-2);
- f1.fork();
- f2.fork();
- returnf1.join()+f2.join();
- }
- }
在清單4 中,Fibonacci 的返回值為 Integer 類型。其 compute() 函數首先建立兩個子任務,啟動子任務執行,阻塞以等待子任務的結果返回,相加后得到最終結果。同樣,當子任務足夠小時,通過查表得到其結果,以減小因過多地分割任務引起的性能降低。其中,我們用到了 RecursiveTask 提供的方法 fork() 和 join()。它們分別表示:子任務的異步執行和阻塞等待結果完成。
現在剩下的工作就是將 Fibonacci 提交到 ForkJoinPool 了,我們在一個 JUnit 的 test 方法中作了如下處理:
清單 5. 將 Fibonacci 提交到 ForkJoinPool
- @Test
- publicvoidtestFibonacci()throwsInterruptedException,ExecutionException{
- ForkJoinTask<Integer>fjt=newFibonacci(45);
- ForkJoinPoolfjpool=newForkJoinPool();
- Future<Integer>result=fjpool.submit(fjt);
- //dosomething
- System.out.println(result.get());
- }
使用 CyclicAction 來處理循環任務
CyclicAction 的用法稍微復雜一些。如果一個復雜任務需要幾個線程協作完成,并且線程之間需要在某個點等待所有其他線程到達,那么我們就能方便的用 CyclicAction 和 TaskBarrier 來完成。圖 2 描述了使用 CyclicAction 和 TaskBarrier 的一個典型場景。
圖 2. 使用 CyclicAction 和 TaskBarrier 執行多線程任務
繼承自 CyclicAction 的子類需要 TaskBarrier 為每個任務設置不同的中止條件。從 CyclicAction 繼承的子類需要重載 protected void compute() 方法,定義在 barrier 的每個步驟需要執行的動作。compute() 方法將被反復執行直到 barrier 的 isTerminated() 方法返回 True。TaskBarrier 的行為類似于 CyclicBarrier。下面,我們來看看如何使用 CyclicAction 的子類。
清單 6. 使用 CyclicAction 的子類
- classConcurrentPrintextendsRecursiveAction{
- protectedvoidcompute(){
- TaskBarrierb=newTaskBarrier(){
- protectedbooleanterminate(intcycle,intregisteredParties){
- System.out.println("Cycleis"+cycle+";"
- +registeredParties+"parties");
- returncycle>=10;
- }
- };
- intn=3;
- CyclicAction[]actions=newCyclicAction[n];
- for(inti=0;i<n;++i){
- finalintindex=i;
- actions[i]=newCyclicAction(b){
- protectedvoidcompute(){
- System.out.println("I'mworking"+getCycle()+""
- +index);
- try{
- Thread.sleep(500);
- }catch(InterruptedExceptione){
- e.printStackTrace();
- }
- }
- };
- }
- for(inti=0;i<n;++i)
- actions[i].fork();
- for(inti=0;i<n;++i)
- actions[i].join();
- }
- }
在清單6中,CyclicAction[] 數組建立了三個任務,打印各自的工作次數和序號。而在 b.terminate() 方法中,我們設置的中止條件表示重復 10 次計算后中止。現在剩下的工作就是將 ConcurrentPrint 提交到 ForkJoinPool 了。我們可以在 ForkJoinPool 的構造函數中指定需要的線程數目,例如 ForkJoinPool(4) 就表明線程池包含 4 個線程。我們在一個 JUnit 的 test 方法中運行 ConcurrentPrint 的這個循環任務:
清單 7. 運行 ConcurrentPrint 循環任務
- @Test
- publicvoidtestBarrier()throwsInterruptedException,ExecutionException{
- ForkJoinTaskfjt=newConcurrentPrint();
- ForkJoinPoolfjpool=newForkJoinPool(4);
- fjpool.submit(fjt);
- fjpool.shutdown();
- }
RecursiveTask 和 CyclicAction 兩個例子的完整代碼如下所示:
清單 8. RecursiveTask 和 CyclicAction 兩個例子的完整代碼
- packagetests;
- importjava.util.concurrent.ExecutionException;
- importjava.util.concurrent.Future;
- importjsr166y.forkjoin.CyclicAction;
- importjsr166y.forkjoin.ForkJoinPool;
- importjsr166y.forkjoin.ForkJoinTask;
- importjsr166y.forkjoin.RecursiveAction;
- importjsr166y.forkjoin.RecursiveTask;
- importjsr166y.forkjoin.TaskBarrier;
- importorg.junit.Test;
- classFibonacciextendsRecursiveTask<Integer>{
- finalintn;
- Fibonacci(intn){
- this.n=n;
- }
- privateintcompute(intsmall){
- finalint[]results={1,1,2,3,5,8,13,21,34,55,89};
- returnresults[small];
- }
- publicIntegercompute(){
- if(n<=10){
- returncompute(n);
- }
- Fibonaccif1=newFibonacci(n-1);
- Fibonaccif2=newFibonacci(n-2);
- System.out.println("forknewthreadfor"+(n-1));
- f1.fork();
- System.out.println("forknewthreadfor"+(n-2));
- f2.fork();
- returnf1.join()+f2.join();
- }
- }
- classConcurrentPrintextendsRecursiveAction{
- protectedvoidcompute(){
- TaskBarrierb=newTaskBarrier(){
- protectedbooleanterminate(intcycle,intregisteredParties){
- System.out.println("Cycleis"+cycle+";"
- +registeredParties+"parties");
- returncycle>=10;
- }
- };
- intn=3;
- CyclicAction[]actions=newCyclicAction[n];
- for(inti=0;i<n;++i){
- finalintindex=i;
- actions[i]=newCyclicAction(b){
- protectedvoidcompute(){
- System.out.println("I'mworking"+getCycle()+""
- +index);
- try{
- Thread.sleep(500);
- }catch(InterruptedExceptione){
- e.printStackTrace();
- }
- }
- };
- }
- for(inti=0;i<n;++i)
- actions[i].fork();
- for(inti=0;i<n;++i)
- actions[i].join();
- }
- }
- publicclassTestForkJoin{
- @Test
- publicvoidtestBarrier()throwsInterruptedException,ExecutionException{
- System.out.println(" testingTaskBarrier...");
- ForkJoinTaskfjt=newConcurrentPrint();
- ForkJoinPoolfjpool=newForkJoinPool(4);
- fjpool.submit(fjt);
- fjpool.shutdown();
- }
- @Test
- publicvoidtestFibonacci()throwsInterruptedException,ExecutionException{
- System.out.println(" testingFibonacci...");
- finalintnum=14;//Fordemoonly
- ForkJoinTask<Integer>fjt=newFibonacci(num);
- ForkJoinPoolfjpool=newForkJoinPool();
- Future<Integer>result=fjpool.submit(fjt);
- //dosomething
- System.out.println("Fibonacci("+num+")="+result.get());
- }
- }
運行以上代碼,我們可以得到以下結果:
- testingTaskBarrier...
- I'mworking02
- I'mworking00
- I'mworking01
- Cycleis0;3parties
- I'mworking12
- I'mworking10
- I'mworking11
- Cycleis1;3parties
- I'mworking20
- I'mworking21
- I'mworking22
- Cycleis2;3parties
- I'mworking30
- I'mworking32
- I'mworking31
- Cycleis3;3parties
- I'mworking42
- I'mworking40
- I'mworking41
- Cycleis4;3parties
- I'mworking51
- I'mworking50
- I'mworking52
- Cycleis5;3parties
- I'mworking60
- I'mworking62
- I'mworking61
- Cycleis6;3parties
- I'mworking72
- I'mworking70
- I'mworking71
- Cycleis7;3parties
- I'mworking81
- I'mworking80
- I'mworking82
- Cycleis8;3parties
- I'mworking90
- I'mworking92
- testingFibonacci...
- forknewthreadfor13
- forknewthreadfor12
- forknewthreadfor11
- forknewthreadfor10
- forknewthreadfor12
- forknewthreadfor11
- forknewthreadfor10
- forknewthreadfor9
- forknewthreadfor10
- forknewthreadfor9
- forknewthreadfor11
- forknewthreadfor10
- forknewthreadfor10
- forknewthreadfor9
- Fibonacci(14)=610
結 論
從以上的例子中可以看到,通過使用 Fork/Join 模式,軟件開發人員能夠方便地利用多核平臺的計算能力。盡管還沒有做到對軟件開發人員完全透明,Fork/Join 模式已經極大地簡化了編寫并發程序的瑣碎工作。對于符合 Fork/Join 模式的應用,軟件開發人員不再需要處理各種并行相關事務,例如同步、通信等,以難以調試而聞名的死鎖和 data race 等錯誤也就不會出現,提升了思考問題的層次。你可以把 Fork/Join 模式看作并行版本的 Divide and Conquer 策略,僅僅關注如何劃分任務和組合中間結果,將剩下的事情丟給 Fork/Join 框架。
在實際工作中利用 Fork/Join 模式,可以充分享受多核平臺為應用帶來的免費午餐。
參考資料
◆ 閱讀文章“The Free Lunch Is Over: A Fundamental Turn Toward Concurrency in Software”:了解為什么從現在開始每個嚴肅的軟件工作者都應該了解并行編程方法。
◆ 閱讀 Doug Lea 的文章“A Java Fork/Join Framework”:了解 Fork/Join 模式的實現機制和執行性能。
◆ 閱讀 developerWorks 文章“馴服 Tiger:并發集合”:了解如何使用并行 Collection 庫。
◆ 閱讀 developerWorks 文章“Java 理論與實踐:非阻塞算法簡介”:介紹了 JDK 5 在并行方面的重要增強以及在 JDK5 平臺上如何實現非阻塞算法的一般介紹。
◆ 書籍“Java Concurrency in Practice”:介紹了大量的并行編程技巧、反模式、可行的解決方案等,它對于 JDK 5 中的新特性也有詳盡的介紹。
獲得產品和技術
◆ 訪問 Doug Lea 的 JSR 166 站點獲得最新的源代碼。
◆ 從 Sun 公司 網站下載 Java SE 6。
原文鏈接:http://zhangziyangup.iteye.com/blog/1324592
【編輯推薦】