.NET 4多核并行中的Task優化線程池
閱讀本篇前,讀者需對.NET4 System.Threading.Tasks 以及 Task Schedulers 有一定的了解。如果不是很了解,請查閱以下相關信息:
Task: http://msdn.microsoft.com/en-us/library/system.threading.tasks.task%28VS.100%29.aspx
Task Schedulers: http://msdn.microsoft.com/en-us/library/dd997402.aspx
首先回顧相關場景:最近工作需要一直在.NET4下編寫window service。在WindowsService下使用了多線程相關技術。期間就用了到了線程池。使用線程池的目的:在系統中進行多線程并發也擔心并發數量太大影響性能。于是使用線程池進行排隊。一批一批執行多線程。當我在使用傳統的.NET線程池的過程中碰見了一些問題,請看以下代碼:
- try
- {
- ThreadPool.SetMaxThreads(2, 2);
- for (int i = 0; i < 5; i++)
- {
- ThreadPool.QueueUserWorkItem(new WaitCallback(InvokeThread1), i);
- }
- }
- catch
- {
- Console.WriteLine("error");
- }
這里建立一個同時2個線程并發的線程池。在上述代碼第7行傳入InvokeThread1方法:
- static void InvokeThread1(object obj) {
- throw new NullReferenceException();
- }
假設程序發生異常,這個異常卻讓整個程線程池序崩潰了。主程序并未catch到這個exception。也許您會說這本來就是這樣的嘛,有什么好貼出來的。但是在.NET4中我們可以避免掉這個問題。(此時體現出.NET4的異常強大)。還有個問題有必要提到:如果一次有兩個線程同時并發(一共要執行5個線程,每次并發2個)。假設其中一個線程執行過程中出現了異常,要讓這兩個線程以外的三個線程都停止運行,來節省系統資源。傳統的線程池也許可以做到,但是控制起來估計不會讓你太輕松。但是在.NET4的Task機制中,這些都得到了妥善的解決,現將以上兩個問題解決方案給出。如果存在不足的地方,請您指出。
一、自定義TaskScheduler
TaskScheduler代碼如下:
自定義TaskScheduler
- //自定義TaskScheduler
- public class CustomTaskScheduler : TaskScheduler, IDisposable
- {
- //調用Task的線程
- Thread[] _Threads;
- //Task Collection
- BlockingCollection<Task> _Tasks = new BlockingCollection<Task>();
- int _ConcurrencyLevel;
- //設置schedule并發
- public CustomTaskScheduler(int concurrencyLevel)
- {
- _Threads = new Thread[concurrencyLevel];
- this._ConcurrencyLevel = concurrencyLevel;
- for (int i = 0; i < concurrencyLevel; i++)
- {
- _Threads[i] = new Thread(() =>
- {
- foreach (Task task in _Tasks.GetConsumingEnumerable())
- this.TryExecuteTask(task);27
- });
- _Threads[i].Start();
- }
- }
- protected override void QueueTask(Task task)
- {
- _Tasks.Add(task);
- }
- protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
- {
- if (_Threads.Contains(Thread.CurrentThread)) return TryExecuteTask(task);
- return false;
- }
- public override int MaximumConcurrencyLevel50 {
- get
- {
- return _ConcurrencyLevel;54 }
- }
- protected override IEnumerable<Task> GetScheduledTasks()
- {
- return _Tasks.ToArray();
- }
- public void Dispose()
- {
- this._Tasks.CompleteAdding();
- foreach (Thread t in _Threads)
- {
- t.Join();
- }
- }
- }
該scheduler代碼很簡單,重寫相關System.Threading.Tasks.TaskScheduler類下的相關方法即可,代碼中已給出相關注釋。
二、使用自定義的TaskScheduler
調用TaskScheduler代碼:
- List<string> listMsg = new List<string>() { "Task1", "Task2", "Task3", "Task4", "Task5", "Task6" };
- List<Task> listTask = new List<Task>();
- foreach (string msg in listMsg)
- {
- Task myTask = new Task(obj => InvokeThread2((string)obj), msg, token);
- listTask.Add(myTask);
- myTask.Start(customTaskScheduler);
- }
- try
- {
- //等待所有線程全部運行結束
- Task.WaitAll(listTask.ToArray());
- }
- catch (AggregateException ex)
- {
- //.NET4 Task的統一異常處理機制
- foreach (Exception inner in ex.InnerExceptions)
- {
- Console.WriteLine("Exception type {0} from {1}",
- inner.GetType(), inner.Source);
- }
- }
- Console.ReadLine();
InvokeThread2 相關代碼:
- static void InvokeThread2(string msg)
- {
- try
- {
- var x = Convert.ToInt32(msg.Replace("Task", "").Trim());
- Console.WriteLine(msg);
- Thread.Sleep(1000 * 5);
- Console.WriteLine("{0} ok", msg); }
- catch (Exception ex)
- {
- //如果有異常發生則取消正在排隊的所有線程。
- tokenSource.Cancel();
- Exception exception = new Exception("error");
- exception.Source = msg;
- throw exception;
- } }
以上代碼運行效果如下:
接著在TaskScheduler調用代碼中如果將第一行代碼listMsg值修改成 List<string> listMsg = new List<string>() { "Task1", "Task2", "TaskA", "Task3", "Task4", "Task5", "Task6", "Task7", "Task8", "Task9" };這時候我們將得到以下結果:
這個運行結果重點要強調的地方為:后面這7個exception。聰明的您或許已經看出來前6個exception屬于沒有執行的"Task4", "Task5", "Task6", "Task7", "Task8", "Task9",而最后一個exception才是真正的發生異常的"TaskA"。這里主要用到了Task的統一異常處理機制AggregateException??梢詮倪\行結果得到:Task1,Task2,Task3執行成功了,但是TaskA發生了異常導致了后面排隊的"Task4", "Task5", "Task6", "Task7", "Task8", "Task9"都不會執行了。節省了系統資源,同時也提高了系統性能。
三、小結
本文主要用到了的是.NET4 的Task相關技術,Task讓我們在多核并行控制的時候更加簡單,功能更加強大。如果需進一步了解相關技術,博客園已經有不少教程。微軟的MSDN也提供了很多參考資料。 最后希望本文可以給您的開發帶來幫助。
原文鏈接:http://www.cnblogs.com/ryanding/archive/2011/03/22/1990799.html
【編輯推薦】