走進Windows線程同步API
WIN32 API雖然提供了CreateThead和ExitThread方法,但是在C++中,永遠不應該使用這兩個方法創建或結束線程。而應該使用VC++提供的_beginthread、_beginthreadex方法,相應的結束線程方法_endthread、_endthreadex。后者除了在內部調用CreateThread或ExitThread方法外,還負責CRT的初始化或銷毀。雖然有直接結束線程的方法,但在C++最好通過線程方法正常返回來結束線程。直接結束線程時C++對象的析構函數不會被調用。
#include "stdafx.h"
using namespace std;
class Obj
{
public:
Obj() { cout <<"Obj() called" <<endl; }
~Obj() { cout <<"~Obj() called" <<endl; }
};
unsigned int WINAPI ThreadFunc(void* pvParam){
cout <<static_cast<char*>(pvParam) <<endl;
Obj obj;
_endthreadex(2);
return 1;
}
int _tmain(int argc, _TCHAR* argv[])
{
unsigned int threadId;
char *param = "param";
HANDLE thread = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, param, 0, &threadId);
Sleep(100);
DWORD exitCode;
GetExitCodeThread(thread, &exitCode);
cout <<"ExitCode:" <<exitCode <<endl;
system("pause");
return 0;
}
這段代碼的輸出為:
param
Obj() called
ExitCode:2
請按任意鍵繼續. . .
_beginthreadex的第一個參數為SECURITY_ATTRIBUTES結構指針,可以指定NULL使用默認的安全配置。第二參數為cbStackSize,線程棧大小;設置為0使用默認值,可以通過鏈接參數/STACK:[reserve][,commit]控制。第三個參數為線程入口方法地址,方法簽名如ThreadFunc所示。第四個三處為傳遞給入口方法的參數(值傳遞),具體意義由程序自己解釋。最后一個參數是返回的線程ID。返回值為新創建線程的句柄。__endthreadex方法唯一的參數指定線程的ExitCode。可以通過GetExitCodeThread方法獲得線程退出碼。
InterLocked系列原子方法
InterLocked系列方法可視為原子的。完成其功能時,保證其他線程不會訪問同一個資源。例如最簡單的InterLockedIncrement方法原子自增一個共享變量。
long g_sum(0);
unsigned int WINAPI ThreadFunc(void* pvParam){
for(int i = 0; i <100000; ++i)
{
InterlockedIncrement(&g_sum);
}
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
unsigned int threadId;
HANDLE thread1 = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, NULL, 0, &threadId);
HANDLE thread2 = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, NULL, 0, &threadId);
Sleep(1000);
cout <<"Sum:" <<g_sum <<endl;
system("pause");
return 0;
}
其他方法包括:
InterlockedIncrement64:自增一個64位整數。
InterlockedExchangeAdd、InterlockedExchangeAdd64:加和兩個數并賦值給第一個值。
InterlockedCompareExchange:比較并交換一個數。
還有很多InterLocked方法,具體參考MSDN文檔。
CriticalSection
通過EnterCriticalSection和LeaveCriticalSection方法,可以控制同步一段代碼的訪問。使用前需要使用InitializeCriticalSection初始化CRITICAL_SECTION。使用方法也很簡單。
CRITICAL_SECTION g_cs;
long g_sum(0);
unsigned int WINAPI ThreadFunc(void* pvParam){
for(int i = 0; i <100000; ++i)
{
EnterCriticalSection(&g_cs);
g_sum += 2;
LeaveCriticalSection(&g_cs);
}
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
InitializeCriticalSection(&g_cs);
unsigned int threadId;
HANDLE thread1 = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, NULL, 0, &threadId);
HANDLE thread2 = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, NULL, 0, &threadId);
Sleep(1000);
cout <<"Sum:" <<g_sum <<endl;
DeleteCriticalSection(&g_cs);
system("pause");
return 0;
}
這里有一個問題是,如果同步的代碼塊不是簡單g_sum += 2,而是可能拋出異常的復雜代碼。就需要確保LeaveCriticalSection一定被調用。不再使用后使用DeleteCriticalSection方法刪除之。
class CSManager
{
public:
CSManager(CRITICAL_SECTION *cs) : m_cs(cs)
{
EnterCriticalSection(m_cs);
}
~CSManager()
{
LeaveCriticalSection(m_cs);
}
private:
CRITICAL_SECTION *m_cs;
};
//...
for(int i = 0; i <100000; ++i)
{
CSManager CSMgr(&g_cs);
g_sum += 2;
}
//...
CSManager在構造函數中調用EnterCriticalSection,析構函數中調用LeaveCriticalSection。保證在代碼塊結束時調用Leave方法。
另外除了使用阻塞的Enter方法,還有一個TryEnterCriticalSection,該方法嘗試進去CriticalSetion,如果失敗,不會阻塞,而是立即返回FALSE。
SRWLOCK
SRWLOCK具有和CriticalSection類似的功能。另外還具有讀寫鎖分離的分離的功能。可以使用AcquireSRWLockShared獲取共享的讀鎖。使用AcquireSRWLockExclusive獲取獨占的寫鎖。使用對應的ReleaseSRWLockShared/Exclusive方法施放鎖。同樣地,使用前需要使用InitializeSRWLock初始化。
SRWLOCK g_lock;
long g_sum(0);
unsigned int WINAPI ReadThreadFunc(void* pvParam){
for(int i = 0; i <10; ++i)
{
AcquireSRWLockShared(&g_lock);
cout <<g_sum <<endl;
ReleaseSRWLockShared(&g_lock);
Sleep(1);
}
return 0;
}
unsigned int WINAPI WriteThreadFunc(void* pvParam){
for(int i = 0; i <100000; ++i)
{
AcquireSRWLockExclusive(&g_lock);
g_sum += 2;
ReleaseSRWLockExclusive(&g_lock);
}
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
InitializeSRWLock(&g_lock);
unsigned int threadId;
HANDLE thread1 = (HANDLE)_beginthreadex(NULL, 0, ReadThreadFunc, NULL, 0, &threadId);
HANDLE thread2 = (HANDLE)_beginthreadex(NULL, 0, ReadThreadFunc, NULL, 0, &threadId);
HANDLE thread3 = (HANDLE)_beginthreadex(NULL, 0, WriteThreadFunc, NULL, 0, &threadId);
Sleep(1000);
cout <<"Sum:" <<g_sum <<endl;
system("pause");
return 0;
}
SRWLOCK不具備類似于TryEnterCriticalSection的非阻塞方法。大多數情況下,SRWLOCK比CRITICAL_SECTION有更好的性能。
Condition Variable
為實現近點的生產者消費者問題。我們可以使用兩個CONDITION_VARIABLE:g_full,g_empty來實現。在緩沖區滿的時候,生產者線程調用SleepConditionVariableSRW(&g_full, &g_lock, INFINITE, 0)施放獲得的鎖并等待g_full。緩沖區空的時候,消費者可以調用leepConditionVariableSRW(&g_empty, &g_lock, INFINITE, 0)施放獲得的鎖并等待g_empty。掉進滿足后,可是使用WakeAllConditionVariable喚醒所有等待的線程或者使用WakeConditionVariable喚醒一個等待的線程。
和Condition Variable配置使用的可以使CrticalSection也可以使SRWLock。
BOOL SleepConditionVariableCS(
PCONDITION_VARIABLE pConditionVariable,
PCRITICAL_SECTION pCriticalSection,
DWORD dwMilliseconds);
BOOL SleepConditionVariableSRW(
PCONDITION_VARIABLE pConditionVariable,
PSRWLOCK pSRWLock,
DWORD dwMilliseconds,
ULONG Flags);
參數dwMilliseconds指定等待超時的時間,如果超時方法返回FASLE;INFINITE指定等待不超時。參數Flags指定被喚醒時嘗試獲得的鎖的類型。CONDITION_VARIABLE_LOCKMODE_ SHARED指定獲得共享鎖或者0指定獲得獨占鎖。
const int MAX_SIZE = 10;
CONDITION_VARIABLE g_full;
CONDITION_VARIABLE g_empty;
SRWLOCK g_lock;
list<Product> products;
unsigned int WINAPI ProduceThreadFunc(void* pvParam)
{
int i(0);
while(true)
{
Sleep(rand() % 100);
AcquireSRWLockExclusive(&g_lock);
if (products.size() >= MAX_SIZE)
{
SleepConditionVariableSRW(&g_full, &g_lock, INFINITE, 0);
}
else
{
cout <<"Produce Product:" <<i <<" by thread " <<GetThreadId(GetCurrentThread()) <<endl;
products.push_back(Product(i++));
}
WakeAllConditionVariable(&g_empty);
ReleaseSRWLockExclusive(&g_lock);
}
return 0;
}
unsigned int WINAPI ConsumeThreadFunc(void* pvParam)
{
while(true)
{
Sleep(rand() % 100);
AcquireSRWLockExclusive(&g_lock);
if(products.size() == 0)
{
SleepConditionVariableSRW(&g_empty, &g_lock, INFINITE, 0);
}
else
{
Product p = products.front();
products.pop_front();
cout <<"Consume Product:" <<p.m_no <<" by thread " <<GetThreadId(GetCurrentThread()) <<endl;
}
WakeAllConditionVariable(&g_full);
ReleaseSRWLockExclusive(&g_lock);
}
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
srand((unsigned)time(NULL));
InitializeSRWLock(&g_lock);
unsigned int threadId;
HANDLE thread1 = (HANDLE)_beginthreadex(NULL, 0, ProduceThreadFunc, NULL, 0, &threadId);
HANDLE thread2 = (HANDLE)_beginthreadex(NULL, 0, ConsumeThreadFunc, NULL, 0, &threadId);
HANDLE thread3 = (HANDLE)_beginthreadex(NULL, 0, ConsumeThreadFunc, NULL, 0, &threadId);
WaitForSingleObject(thread1, INFINITE);
WaitForSingleObject(thread2, INFINITE);
WaitForSingleObject(thread3, INFINITE);
system("pause");
return 0;
}
內核態線程同步方法
除了上面介紹的用戶態的線程同步方法。本文繼續通過幾個簡單例子演示內核態的線程同步方法的使用。內核態線程同步方法在性能上肯定比用戶態同步方法要差很多。但可以在多個進程間共享。
創建所有的內核態同步對象都范圍一個內核對象句柄HANDLE。通過WaitForSingleObject或者WaitForMultipleObjects等待內核同步對象轉換為已傳信狀態(signaled)。如果等待的是線程或者進程對象,那么對應線程或進程結束后即轉換為已傳信狀態。同時還可以指定一個超時時間。WaitForSingleObject包括WAIT_OBJECT_0,WAIT_TIMEOUT和WAIT_FAILED。不再使用后調用CloseHandle釋放引用。
DWORD dw = WaitForSingleObject(hProcess, 5000);
switch (dw) {
case WAIT_OBJECT_0:
// The process terminated.
break;
case WAIT_TIMEOUT:
// The process did not terminate within 5000 milliseconds.
break;
case WAIT_FAILED:
// Bad call to function (invalid handle?)
break;
}
WaitForMultipleObjects如果指定參數bWaitAll為TRUE,則等待所有對象都轉換為已傳信狀態后才返回,如果為指定bWaitAll為FALSE,則任意對象轉換為已傳信狀態即返回。可以通過以下方法來判斷是那個內核同步對象。
h[0] = hProcess1;
h[1] = hProcess2;
h[2] = hProcess3;
DWORD dw = WaitForMultipleObjects(3, h, FALSE, 5000);
switch (dw) {
case WAIT_FAILED:
// Bad call to function (invalid handle?)
break;
case WAIT_TIMEOUT:
// None of the objects became signaled within 5000 milliseconds.
break;
case WAIT_OBJECT_0 + 0:
// The process identified by h[0] (hProcess1) terminated.
break;
case WAIT_OBJECT_0 + 1:
// The process identified by h[1] (hProcess2) terminated.
break;
case WAIT_OBJECT_0 + 2:
// The process identified by h[2] (hProcess3) terminated.
break;
}
Event
Event語義上可以理解為一個事件是否發生。SetEvent方法設置Event為Signaled狀態。Event有兩種類型。第一種是自動重置的事件,調用SetEvent方法后,喚醒一個等待的線程后即自動轉換為未傳信狀態。第二種是手動重置事件,調用SetEvent方法后,需要調用ResetEvent方法設置事件為未傳信狀態。PulseEvent相當于調用SetEvent后立即調用ResetEvent。對于手動重置時間,PulseEvent會喚醒所有等待的線程。而對于自動重置的事件PulseEvent只會喚醒一個等待的線程。
HANDLE g_taskEvent;
unsigned int WINAPI ComputationTask(void* pvParam)
{
WaitForSingleObject(g_taskEvent, INFINITE);
for(int i = 0; i <10; ++i)
{
cout <<"comput " <<i <<endl;
}
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
g_taskEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
unsigned int threadId;
HANDLE thread1 = (HANDLE)_beginthreadex(NULL, 0, ComputationTask, NULL, 0, &threadId);
system("pause");
SetEvent(g_taskEvent);
ResetEvent(g_taskEvent);
WaitForSingleObject(thread1, INFINITE);
system("pause");
return 0;
}
上面是一個簡單的例子,ComputationTask線程等待用戶輸入后才開始計算任務。
Semaphore
Semaphore維護一個資源計數count和一個最大計數maxCount。
當count大于0時,semaphore處于已傳信狀態。
當count等于0是,semaphore處于未傳信狀態。
通過ReleaseSemaphore增加count計數,WaitForSingleObject減少cout計數。count不會小于0,也不能大于maxCount。
例如,可以使用semaphore控制能夠同時處理的最大任務線程數。當有超過最大數的更多任務線程開啟時只能等待其他任務完成并調用ReleaseSemaphore方法施放資源引用計數。
HANDLE g_semaphore;
unsigned int WINAPI RequstProcesor(void* pvParam)
{
WaitForSingleObject(g_semaphore, INFINITE);
cout <<"Start process request " <<GetThreadId(GetCurrentThread()) <<endl;
Sleep(1000);
ReleaseSemaphore(g_semaphore, 1, NULL);
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
g_semaphore = CreateSemaphore(NULL, 2, 2, NULL);
HANDLE threads[10];
for(int i = 0; i <10; i++)
{
threads[i] = (HANDLE)_beginthreadex(NULL, 0, RequstProcesor, NULL, 0, NULL);
}
WaitForMultipleObjects(10, threads, TRUE, INFINITE);
system("pause");
return 0;
}
上面的代碼,啟動了10個線程,但只能有2個現場可以同時執行,更多的線程只能等待。
Mutex
mutex的功能和CriticalSection功能很相似。都是控制一段臨界代碼的互斥訪問。通過WaitForSingleObject等待mutex。ReleaseMutex釋放mutex。
mutex維護一個threadId和一個使用計數count。如果CreateMutex的參數bInitialOwner為TRUE,這threadId為調用線程,cout為1。否則都初始為0。
如果threadId為0,mutex沒有被任何線程使用,處于已傳信狀態。如果threadId不為0,mutex處于未傳信狀態。mutex和其他內核同步對象一個不同的特殊地方在于。即時mutex處于未傳信狀態。如果調用WaitForSingleObject的線程是mutex的threadId對應的線程,WaitForSingleObject不會阻塞相當于處于已傳信狀態。下面的例子演示了mutex的使用。
HANDLE g_mutex;
void ProcessA()
{
WaitForSingleObject(g_mutex, INFINITE);
cout <<"ProcessA" <<" by thread " <<GetThreadId(GetCurrentThread()) <<endl;
ReleaseMutex(g_mutex);
}
void ProcessB()
{
WaitForSingleObject(g_mutex, INFINITE);
ProcessA();
cout <<"ProcessB" <<" by thread " <<GetThreadId(GetCurrentThread()) <<endl;
ReleaseMutex(g_mutex);
}
unsigned int WINAPI ThreadFunc(void* pvParam)
{
ProcessB();
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
g_mutex = CreateMutex(NULL, FALSE, NULL);
HANDLE threads[10];
for(int i = 0; i <10; i++)
{
threads[i] = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, NULL, 0, NULL);
}
WaitForMultipleObjects(10, threads, TRUE, INFINITE);
system("pause");
return 0;
}