多線程編程系列之線程間通信和協作
一、 線程間通信的方式和實現
在多線程編程中,線程間通信是非常常見的需求,它指的是多個線程之間通過某種機制來交換信息,協調彼此的行為。線程間通信的方式常用的有以下幾種:
共享內存:線程之間可以通過共享內存來交換信息,每個線程擁有對共享內存區域的讀寫權限。一般情況下需要使用鎖來保證共享內存的同步與互斥。
using System.Threading;
class Program {
private static int data = 0;
private static bool isRunning = true;
static void Main(string[] args) {
using (var mutex = new Mutex(false, "sharedMutex"))
using (var mappedFile = MemoryMappedFile.CreateOrOpen("sharedMemory", 1024))
using (var accessor = mappedFile.CreateViewAccessor()) {
var thread1 = new Thread(() => {
while (isRunning) {
mutex.WaitOne();
accessor.Write(0, data);
mutex.ReleaseMutex();
Thread.Sleep(1000);
}
});
var thread2 = new Thread(() => {
while (isRunning) {
mutex.WaitOne();
data = accessor.ReadInt32(0);
mutex.ReleaseMutex();
Console.WriteLine($"Data received: {data}");
Thread.Sleep(1000);
}
});
thread1.Start();
thread2.Start();
Console.WriteLine("Press enter to exit.");
Console.ReadLine();
isRunning = false;
thread1.Join();
thread2.Join();
}
}
}
該程序創建了一個名為 sharedMemory 的內存映射文件和一個名為 sharedMutex 的互斥對象。兩個線程分別負責將數據寫入內存映射文件和讀取內存映射文件中的數據。在操作前,需要通過互斥對象進行同步與互斥。
消息傳遞:線程之間可以通過發送消息來交換信息,每個線程擁有一個接收緩沖區和一個發送緩沖區。這里的消息指的是數據包或者數據流,線程之間通過操作緩沖區來完成數據交換。
using System.Collections.Concurrent;
class Program {
private static bool isRunning = true;
private static BlockingCollection<string> messageQueue = new BlockingCollection<string>();
static void Main(string[] args) {
var thread1 = new Thread(() => {
while (isRunning) {
var message = messageQueue.Take();
Console.WriteLine($"Thread 1 received message: {message}");
}
});
var thread2 = new Thread(() => {
while (isRunning) {
var message = messageQueue.Take();
Console.WriteLine($"Thread 2 received message: {message}");
}
});
thread1.Start();
thread2.Start();
// Simulate sending some messages
messageQueue.Add("Hello from thread 1");
messageQueue.Add("Hello from thread 2");
messageQueue.Add("Hello again from thread 1");
messageQueue.Add("Goodbye from thread 2");
Console.WriteLine("Press enter to exit.");
Console.ReadLine();
isRunning = false;
thread1.Join();
thread2.Join();
}
}
該程序使用 BlockingCollection<T> 類來實現簡單的消息隊列,兩個線程分別負責從消息隊列中取出消息并進行處理。在發送消息時,可以將其添加到消息隊列中。
管道:線程之間可以通過管道來交換信息,一個線程將數據寫入管道,另一個線程則從管道中讀取數據。管道本質上也是一種共享內存的方式,并且會自動進行同步(管道的大小是有限制的)
信號量:線程之間可以通過信號量來同步和互斥訪問資源,一個線程獲取信號量后就可以進行訪問操作,其他線程則需要等待。信號量可以用于實現進程之間的同步和互斥,但在多線程應用中使用時需要注意信號量的實現。
using System.Threading;
class Program {
private static bool isRunning = true;
private static AutoResetEvent signal = new AutoResetEvent(false);
static void Main(string[] args) {
var thread1 = new Thread(() => {
while (isRunning) {
Console.WriteLine("Thread 1 is waiting...");
signal.WaitOne();
Console.WriteLine("Thread 1 received signal.");
}
});
var thread2 = new Thread(() => {
while (isRunning) {
Console.WriteLine("Thread 2 is waiting...");
signal.WaitOne();
Console.WriteLine("Thread 2 received signal.");
}
});
thread1.Start();
thread2.Start();
// Send signals to the threads
signal.Set(); // signals only one of the waiting threads
signal.Set(); // signals the other waiting thread
Console.WriteLine("Press enter to exit.");
Console.ReadLine();
isRunning = false;
signal.Set(); // unblock any waiting threads
thread1.Join();
thread2.Join();
}
}
該程序使用 AutoResetEvent 類來實現線程間的同步。兩個線程等待信號并進行處理,主線程發送信號來通知等待的線程進行處理。可以使用 Set() 方法發送信號并使用 WaitOne() 方法等待信號。
using System.Threading;
class Program {
private static bool isRunning = true;
private static ManualResetEvent signal = new ManualResetEvent(false);
static void Main(string[] args) {
var thread1 = new Thread(() => {
while (isRunning) {
Console.WriteLine("Thread 1 is waiting...");
signal.WaitOne();
Console.WriteLine("Thread 1 received signal.");
signal.Reset(); // reset the signal
}
});
var thread2 = new Thread(() => {
while (isRunning) {
Console.WriteLine("Thread 2 is waiting...");
signal.WaitOne();
Console.WriteLine("Thread 2 received signal.");
signal.Reset(); // reset the signal
}
});
thread1.Start();
thread2.Start();
// Send signals to the threads
signal.Set(); // signals both of the waiting threads
Console.WriteLine("Press enter to exit.");
Console.ReadLine();
isRunning = false;
signal.Set(); // unblock any waiting threads
thread1.Join();
thread2.Join();
}
}
該程序使用 ManualResetEvent 類來實現線程間的同步。兩個線程等待信號并進行處理,主線程發送信號來通知等待的線程進行處理。可以使用 Set() 方法發送信號并使用 WaitOne() 方法等待信號,同時使用 Reset() 方法將信號狀態重置為未發出狀態,以便下次等待。
互斥鎖:線程之間可以通過互斥鎖來同步和互斥訪問共享資源,一個線程獲取鎖后就可以進行訪問操作,其他線程則需要等待釋放鎖。互斥鎖是一種經典的同步和互斥機制,在多線程編程中用得比較廣泛,一般和條件變量一起使用。
using System.Threading;
class Program {
private static bool isRunning = true;
private static object lockObject = new object();
private static int counter = 0;
static void Main(string[] args) {
var thread1 = new Thread(() => {
while (isRunning) {
lock (lockObject) {
while (counter % 2 == 1) {
Monitor.Wait(lockObject);
}
Console.WriteLine($"Thread 1: {counter++}");
Monitor.PulseAll(lockObject);
}
}
});
var thread2 = new Thread(() => {
while (isRunning) {
lock (lockObject) {
while (counter % 2 == 0) {
Monitor.Wait(lockObject);
}
Console.WriteLine($"Thread 2: {counter++}");
Monitor.PulseAll(lockObject);
}
}
});
thread1.Start();
thread2.Start();
Console.WriteLine("Press enter to exit.");
Console.ReadLine();
isRunning = false;
thread1.Join();
thread2.Join();
}
}
二、同步和異步線程間通信的比較
同步和異步線程間通信的主要區別在于調用者是否需要等待被調用者完成任務才能繼續執行下一步操作。
同步線程間通信指的是調用者主動向被調用者請求一個任務,并等待被調用者完成后再繼續執行。這種模式對于簡單的應用程序來說很容易實現,但有時會引發線程死鎖的問題,因為如果多個線程都在等待對方完成任務,就會形成死循環。
異步線程間通信則是被調用者在處理任務的同時,通知調用者任務的狀態。這種模式可以提高程序的響應速度,因為調用者可以繼續執行其他任務,而不必等待被調用者完成任務才能進行下一步操作。
C# 語言提供了多種方式來實現線程間的同步和異步通信。其中,同步通信可以使用 Mutex、Semaphore 和 Monitor 等互斥量類來實現線程鎖定和等待,在獲取到資源后再釋放鎖定。異步通信可以使用委托、事件和 Completion 是C# 5.0 開始的異步編程功能,可以使用 async 和 await 關鍵字來快速實現異步編程。
三、 多個線程協作完成任務
在多線程編程中,有時我們需要多個線程協作完成一個復雜的任務。這些線程需要互相通信、協調以達到同一目標。下面是一些常用的多線程協作技術:
信號量 Semaphore:Semaphore 可以用來控制某一資源的訪問權,比如網絡連接數限制、數據庫連接池等。Semaphore 通過計數器來控制資源的數量,并提供了 Acquire 和 Release 等方法來允許或阻塞線程訪問資源。多個線程可以共享一個 Semaphore,當 Semaphore 計數為 0 時,其他線程就需要等待。
Mutex:Mutex 是一種操作系統提供的同步機制,它可以保證在同一時刻只有一個線程訪問共享資源。Mutex 提供了 Lock 和 Unlock 等方法來保護臨界區。如果一個線程獲得 Mutex,其他線程就必須等待直到該線程釋放 Mutex。
AutoResetEvent 和 ManualResetEvent:這兩種事件用于線程間的同步,AutoResetEvent 的 WaitOne 方法會阻塞當前線程直到事件被發出,發出后事件重置為未發出狀態;ManualResetEvent 則不會自動重置,需要調用 Reset 方法手動將事件重置為未發出狀態。
CountdownEvent:CountdownEvent通常用于多個線程都需要完成某個任務后才能繼續執行的場景。當所有線程都完成任務后,調用Done方法通知CountdownEvent,等待的線程就會被喚醒。