探究Visual Studio 2010中Parallel的使用
之前51cto曾經報道過關于Visual Studio 2010中Parallel類實現并行計算,本文我們主要分析如何利用Parallel.For和Parallel.ForEach函數來并行化for循環和foreach循環。實際上,Parallel.For和Parallel.ForEach函數主要是針對“并行數據”的并行化操作,所謂并行數據,就是整個數據集中數據單元是相互獨立的,可以同時進行處理。
在實際開發中,我們遇到的可以并行處理的不僅包括“并行數據”,還包括可以同時進行的“并行邏輯”。所謂“并行邏輯”,就是相互獨立,可以同時執行的多個任務。比如,程序員陳良喬每天早上要做兩件事情:燒水洗臉和鍛煉身體。這兩件事情就是相互獨立可以并行的,也就是說他在燒水的時候可以同時鍛煉身體。在以前的單核時代,CPU在同一時間只能完成一件事情,那么陳良喬只能先燒水后鍛煉,或者是先鍛煉后燒水,這導致他上班總是遲到。
進入多核時代,CPU可以在同一時間完成多件事情了,借助.Net Framework 4.0中的Parallel類,我們可以方便地處理“并行邏輯”。現在,程序員陳良喬可以一邊鍛煉一邊燒水,再也沒有遲到過了。他逢人便說:“Parallel真是個好東西!自從用了它,我腰也不酸了,背也不疼了,編程更有勁兒了”
使用Parallel.Invoke處理并行邏輯
跟Parallel.For函數相似,Parallel.Invoke也是Parallel類的一個靜態函數,它可以接受一個Action[]類型的對象作為參數,這個對象,就是我們要執行的任務。系統會根據代碼運行的硬件環境,主要是CPU運算核心的個數,自動地進行線程的創建和分配。這有些類似于我們所熟悉的多線程開發,通過為每個線程指定一個線程函數而讓多個任務同時進行,只是Parallel.Invoke函數簡化了線程的創建和分配等繁瑣的動作,我們只需要提供核心的線程函數就可以了。下面我們來看一個實際的例子。在上文中,我們介紹了程序員陳良喬起床的例子,在以前的單核時代,他起床大約是這個樣子的:
- // 串行式起床
- private static void GetUp()
- {
- Start("GetUp");
- // 先燒水
- boil();
- // 后鍛煉
- exercise();
- End("GetUp");
- }
- // 鍛煉
- private static void exercise()
- {
- Console.WriteLine("Exercise");
- Thread.Sleep(2000);
- Console.WriteLine("Finish Exercise");
- }
- // 燒水
- private static void boil()
- {
- Console.WriteLine("Boil");
- Thread.Sleep(3000);
- Console.WriteLine("Finish Boil");
- }
在單核時代,CPU在同一時間只能做一件事情,所以他只能先燒水,后鍛煉,這樣顯然會耽誤時間。一天,他又因為這事而遲到了,老板罵道,“你是豬啊,你不會用Parallel.Invoke一邊燒水一邊鍛煉啊?”于是,有了下面的并行式起床:
- // 并行式起床
- private static void ParallelGetUp()
- {
- Start("ParallelGetUp");
- // 在燒水的同時,鍛煉身體
- var steps = new Action[] { () => boil(), () => exercise() };
- Parallel.Invoke(steps);
- End("ParallelGetUp");
- }
通過Parallel.Invoke函數,我們將一些相互獨立的任務同時執行,實現了“并行邏輯”,也大大地提高了應用程序的性能和效率。從下面的截圖中,我們可以明顯地看出兩種方式的差別。串行方式所耗費的時間,是兩個步驟的時間總和,而并行方式所耗費的時間,大約是單個任務的耗時最長的哪一個。
#p#
對Parallel.Invoke進行控制
Parallel.Invoke提供了一個重載版本,它可以接受一個ParallelOptions對象作為參數,對Parallel.Invoke的執行進行控制。通過這個對象,我們可以控制并行的最大線程數,各個任務是否取消執行等等。例如,在一個智能化的家中,系統會判斷主人是否離開房間,如果主人離開了房間,則自動關閉屋子里的各種電器。利用Parallel.Invoke我們可以實現如下:
- public static void PInvokeCancel()
- {
- // 創建取消對象
- CancellationTokenSource cts = new CancellationTokenSource();
- // 利用取消對象,創建ParallelOptions
- ParallelOptions pOption = new ParallelOptions() { CancellationToken = cts.Token };
- // 設置最大線程數
- pOption.MaxDegreeOfParallelism = 2;
- // 創建一個守護監視進程
- Task.Factory.StartNew(() =>
- {
- Console.WriteLine("Cancellation in 5 sec.");
- Thread.Sleep(5000);
- // 取消,結束任務的執行
- cts.Cancel();
- Console.WriteLine("Canceled requested");
- });
- try
- {
- // 以ParallelOptions作為參數,
- // 調用Parallel.Invoke
- Parallel.Invoke(pOption, () => ShutdownLights(pOption.CancellationToken),
- () => ShutdownComputer(pOption.CancellationToken));
- //輸出執行結果
- Console.WriteLine("Lights and computer are tuned off.");
- }
- catch (Exception e)
- {
- Console.WriteLine(e.Message);
- }
- }
- private static void ShutdownLights(CancellationToken token)
- {
- while (!token.IsCancellationRequested)
- {
- Console.WriteLine("Light is on. " );
- Thread.Sleep(1000);
- }
- }
- private static void ShutdownComputer(CancellationToken token)
- {
- while (!token.IsCancellationRequested)
- {
- Console.WriteLine("Computer is on." );
- Thread.Sleep(1000);
- }
- }
除了這種方式之外,ParallelOptions更多地應用在取消任務隊列中還未來得及執行的任務。當我們限制了最大并發線程數的時候,如果需要通過Parallel.Invoke執行的任務較多,則有可能部分任務在隊列中排隊而得不到及時的執行,如果到了一定的條件這些任務還沒有執行,我們可能取消這些任務。一個恰當的現實生活中的例子就是火車站買票。火車站買票的人很多,但是售票的窗口有限,當到了下班時間后,窗口就不再售票了,也就是剩下的售票任務需要取消掉。我們可以用下面的代碼來模擬這樣一個場景:
- public static void PInvokeCancel()
- {
- // 創建取消對象
- CancellationTokenSource cts = new CancellationTokenSource();
- // 利用取消對象,創建ParallelOptions
- ParallelOptions pOption = new ParallelOptions() { CancellationToken = cts.Token };
- // 設置最大線程數,也就相當于20個售票窗口
- pOption.MaxDegreeOfParallelism = 20;
- // 創建一個守護監視進程
- // 當到下班時間后就取消剩下的售票活動
- Task.Factory.StartNew(() =>
- {
- Console.WriteLine("Cancellation in 5 sec.");
- Thread.Sleep(5000);
- // 取消,結束任務的執行
- cts.Cancel();
- Console.WriteLine("Canceled requested");
- });
- try
- {
- // 創建售票活動
- Action[] CustomerServices = CreateCustomerService(1000);
- // 以ParallelOptions作為參數,
- // 調用Parallel.Invoke
- Parallel.Invoke(pOption, CustomerServices);
- }
- catch (Exception e)
- {
- // 當任務取消后,拋出一個異常
- Console.WriteLine(e.Message);
- }
- }
- // 創建售票的活動
- static Action[] CreateCustomerService(int n)
- {
- Action[] result = new Action[n];
- for (int i = 0; i < n; i++)
- {
- result[i] = () =>
- {
- Console.WriteLine("Customer Service {0}", Task.CurrentId);
- // 模擬售票需要的時間
- Thread.Sleep(2000);
- };
- }
- return result;
- }
#p#
并行任務之間的同步
有時候我們在處理并行任務的時候,各個任務之間需要同步,也就是同時執行的并行任務,需要在共同到達某一個狀態的后再一共繼續執行。我們可以舉一個現實生活中的例子。陳良喬,賈瑋和單春暉是好朋友,他們相約到電影院看《建國大業》。他們三個住在不同的地方,為了能一起買票進電影院,他們約好先在電影院門口的KFC會合,然后再一起進電影院。這其中就涉及到一個同步的問題:他們需要先在KFC會合。他們是從家里分別到KFC的,但是需要在KFC進行同步,等到三個人都到齊后在完成后后繼的動作,進電影院看電影。
為了完成并行任務之間的同步,.NET Framework中提供了一個類Barrier。顧名思義,Barrier就像一個關卡或者是剪票口一樣,通過Barrier類,我們可以管理并行任務的執行,完成他們之間的同步。Barrier類的使用非常簡單,我們只需要在主線程中聲明一個Barrier對象,同時指明需要同步的任務數。然后,在需要進行同步的地方調用Barrier類的SignalAndWait函數就可以了。 當一個并行任務到達SignalAndWait后,它會暫停執行,等待所有并行任務都到達同步點之后再繼續往下執行。下面我們以一個實際的例子,來看看如何利用Barrier類完成看電影的同步問題。
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace ParallelBarrier
- {
- class Program
- {
- // 用于同步的Barrier對象
- static Barrier sync;
- static void Main(string[] args)
- {
- // 創建Barrier對象,這里我們需要同步
- // 任務有三個
- sync = new Barrier(3);
- // 開始執行并行任務
- var steps = new Action[] { () => gotothecinema("陳良喬", TimeSpan.FromSeconds(5) ),
- () => gotothecinema("賈瑋", TimeSpan.FromSeconds(2) ),
- () => gotothecinema("單春暉", TimeSpan.FromSeconds(4) )};
- Parallel.Invoke(steps);
- Console.ReadKey();
- }
- // 任務
- static void gotothecinema(string strName, TimeSpan timeToKFC )
- {
- Console.WriteLine("[{0}] 從家里出發。", strName);
- // 從家里到KFC
- Thread.Sleep(timeToKFC);
- Console.WriteLine("[{0}] 到達KFC。", strName);
- // 等待其他人到達
- sync.SignalAndWait();
- // 同步后,進行后繼動作
- Console.WriteLine("[{0}] 買票進電影院。", strName);
- }
- }
- }
在這段代碼中,我們首先創建了Barrier對象,因為在這里需要同步的任務有三個,所以創建Barrier對象時是的參數是3。然后就是使用Parallel.Invoke執行并行任務。我們在并行任務gotothecinema中設置了一個同步點,在這里我們調用Barrier對象的SignalAndWait函數,它表示當前任務已經到達同步點并同時等待其他任務到達同步點。當所有任務都到達同步點之后,再繼續往下執行。運行上面的程序,我們可以獲得這樣的輸出:
#p#
更復雜的任務之間的同步
我們在使用Barrier進行并行任務之間的同步時,有這樣一個缺陷,我們需要預先知道所有需要同步的并行任務的數目,如果這個數目是隨機的,就無法使用Barrier進行任務之間的同步了。并行任務數目不定這種情況很常見。我們還是來看上文中看電影的例子,每場進電影院看電影的觀眾數目是不固定的,那么退場的觀眾也是不固定的,甚至還有中途退場的。當所有觀眾都退場后,我們需要打掃電影院的衛生。這里需要的同步的就是所有觀眾都退場。針對這種數目不定的多個并行任務,.NET Framework提供了CountdownEvent這個類來進行任務之間的同步。
就像它的名字一樣,CountdownEvent基于這樣一個簡單的規則:當有新的需要同步的任務產生時,就調用AddCount增加它的計數,當有任務到達同步點是,就調用Signal函數減小它的計數,當CountdownEvent的計數為零時,就表示所有需要同步的任務已經完成,可以開始下一步任務了。下面我們利用CountdownEvent來模擬一下觀眾進場立場的情景。
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace CountdownEventDemo
- {
- // 觀眾類,用來表示一位觀眾
- class Customer
- {
- public Customer(int nID)
- {
- m_nID = nID;
- }
- // 觀眾的ID
- public int m_nID;
- }
- class Program
- {
- static void Main(string[] args)
- {
- // 創建CountdownEvent同步對象
- using (var countdown = new CountdownEvent(1))
- {
- // 產生一個隨機數,表示觀眾的數目
- Random countRandom = new Random(DateTime.Now.Millisecond);
- int nCount = countRandom.Next(10);
- // 構造每一位觀眾看電影的任務
- Action[] seeafilm = new Action[ nCount ];
- for (int i = 0; i < nCount; i++)
- {
- // 構造Customer對象,表示觀眾
- Customer currentCustomer = new Customer( i+1 );
- seeafilm[i] = () =>
- {
- // 觀眾進場
- countdown.AddCount();
- Console.WriteLine("觀眾 {0} 進場。", currentCustomer.m_nID);
- // 模擬看電影的時間
- Thread.Sleep(countRandom.Next(3000,6000));
- // 觀眾退場
- countdown.Signal();
- Console.WriteLine("觀眾 {0} 退場。", currentCustomer.m_nID);
- };
- }
- //并行執行任務
- Parallel.Invoke( seeafilm );
- // 在此同步,最后CountdownEvent的計數變為零
- countdown.Signal();
- countdown.Wait();
- }
- Console.WriteLine("所有觀眾退場,開始打掃衛生。");
- Console.ReadKey();
- }
在這段代碼中,我們使用CountdownEvent進行隨機個數任務之間的同步。最后,我們可以得到這樣的輸出。
通過Parallel.Invoke函數,我們可以輕松地將相互獨立的任務并行執行,同時通過Barrier和CountdownEvent類進行任務之間的同步。這種并行計算的開發方式,比以前那種基于線程的并行計算開發方式簡便很多,解放了程序員的腦袋,讓他們可以把更多的腦力放到業務邏輯問題的解決之上。使用Parallel類,多快好省地開發并行計算應用程序。
【編輯推薦】