Linux內核并發同步機制:自旋鎖、信號量、互斥體
在Linux系統中有大量的臨界資源需要保護,如何讓各個任務有條不紊的訪問這些資源,這涉及到Linux中并發訪問的保護機制設計相關知識。后面會詳細介紹這幾個機制。
(據可靠消息,鎖的實現經常出現在筆試環節。既可以考察面試者對鎖的原理的理解,又可以考察面試者編程技能)。
注:部分代碼都是根據ARM64架構匯編代碼翻譯成C語言并經過精簡(例如:spin lock、read-write lock)。
也有部分代碼實現是為了呈現背后設計的原理自己編寫的,而不是精簡Linux中實現的代碼(例如mutex)。
自旋鎖(spin lock)
自旋鎖是Linux中使用非常頻繁的鎖,原理簡單。
內核當發生訪問資源沖突的時候,可以有兩種鎖的解決方案選擇:
- 一個是原地等待
- 一個是掛起當前進程,調度其他進程執行(睡眠)
Spinlock 是內核中提供的一種比較常見的鎖機制,自旋鎖是“原地等待”的方式解決資源沖突的,即,一個線程獲取了一個自旋鎖后,另外一個線程期望獲取該自旋鎖,獲取不到,只能夠原地“打轉”(忙等待)。
由于自旋鎖的這個忙等待的特性,注定了它使用場景上的限制 —— 自旋鎖不應該被長時間的持有(消耗 CPU 資源),一般應用在中斷上下文。
原理
下面以去銀行辦業務為例來講解。
- 銀行的辦事大廳一般會有幾個窗口同步進行。今天很不巧,只有一個窗口提供服務。現在的銀行服務都是采用取號排隊,叫號服務的方式。
- 當你去銀行辦理業務的時候,首先會去取號機器領取小票,上面寫著你排多少號。然后你就可以排隊等待了。一般還會有個顯示屏,上面會顯示一個數字(例如:"請xxx號到1號窗口辦理"),代表當前可以被服務顧客的排隊號碼。每辦理完一個顧客的業務,顯示屏上面的數字都會增加1。等待的顧客都會對比自己手上寫的編號和顯示屏上面是否一致,如果一致的話,就可以去前臺辦理業務了。
- 現在早上剛開業,顧客A是今天的第一個顧客,去取號機器領取0號(next計數)小票,然后看到顯示屏上顯示0(owner計數),顧客A就知道現在輪到自己辦理業務了。
- 顧客A到前臺辦理業務(持有鎖)中,顧客B來了。同樣,顧客B去取號機器拿到1號(next計數)小票。然后乖乖的坐在旁邊等候。顧客A依然在辦理業務中,此時顧客C也來了。顧客C去取號機器拿到2號(next計數)小票。顧客C也乖乖的找個座位繼續等待。
- 終于,顧客A的業務辦完了(釋放鎖)。然后,顯示屏上面顯示1(owner計數)。顧客B和C都對比顯示屏上面的數字和自己手中小票的數字是否相等。顧客B終于可以辦理業務了(持有鎖)。顧客C依然等待中。
- 顧客B的業務辦完了(釋放鎖)。然后,顯示屏上面顯示2(owner計數)。顧客C終于開始辦理業務(持有鎖)。顧客C的業務辦完了(釋放鎖)。
- 3個顧客都辦完了業務離開了。只留下一個銀行柜臺服務員。
- 最終,顯示屏上面顯示3(owner計數)。取號機器的下一個排隊號也是3號(next計數)。無人辦理業務(鎖是釋放狀態)。
Linux中針對每一個spin lock會有兩個計數。
分別是next和owner(初始值為0)。進程A申請鎖時,會判斷next和owner的值是否相等。
如果相等就代表鎖可以申請成功,否則原地自旋。直到owner和next的值相等才會退出自旋。
假設進程A申請鎖成功,然后會next加1。
此時owner值為0,next值為1。
進程B也申請鎖,保存next得值到局部變量temp(temp = 1)中。
由于next和owner值不相等,因此原地自旋讀取owner的值,判斷owner和temp是否相等,直到相等退出自旋狀態。
當然next的值還是加1,變成2。
進程A釋放鎖,此時會將owner的值加1,那么此時B進程的owner和temp的值都是1,因此B進程獲得鎖。當B進程釋放鎖后,同樣會將owner的值加1。
最后owner和next都等于2,代表沒有進程持有鎖。next就是一個記錄申請鎖的次數,而owner是持有鎖進程的計數值。
實際場景
一、考慮下面的場景(內核搶占場景):
(1)進程A在某個系統調用過程中訪問了共享資源 R
(2)進程B在某個系統調用過程中也訪問了共享資源 R 會不會造成沖突呢?
假設在A訪問共享資源R的過程中發生了中斷,中斷喚醒了沉睡中的,優先級更高的B,在中斷返回現場的時候,發生進程切換,B啟動執行,并通過系統調用訪問了R,如果沒有鎖保護,則會出現兩個thread進入臨界區,導致程序執行不正確。
OK,我們加上spin lock看看如何:
A在進入臨界區之前獲取了spin lock,同樣的,在A訪問共享資源R的過程中發生了中斷,中斷喚醒了沉睡中的,優先級更高的B,B在訪問臨界區之前仍然會試圖獲取spin lock,這時候由于A進程持有spin lock而導致B進程進入了永久的spin……怎么破?
linux的kernel很簡單,在A進程獲取spin lock的時候,禁止本CPU上的搶占(上面的永久spin的場合僅僅在本CPU的進程搶占本CPU的當前進程這樣的場景中發生)。
如果A和B運行在不同的CPU上,那么情況會簡單一些:A進程雖然持有spin lock而導致B進程進入spin狀態,不過由于運行在不同的CPU上,A進程會持續執行并會很快釋放spin lock,解除B進程的spin狀態
二、再考慮下面的場景(中斷上下文場景):
(1)運行在CPU0上的進程A在某個系統調用過程中訪問了共享資源 R
(2)運行在CPU1上的進程B在某個系統調用過程中也訪問了共享資源 R
(3)外設P的中斷handler中也會訪問共享資源 R 在這樣的場景下,使用spin lock可以保護訪問共享資源R的臨界區嗎?
我們假設CPU0上的進程A持有spin lock進入臨界區,這時候,外設P發生了中斷事件,并且調度到了CPU1上執行,看起來沒有什么問題,執行在CPU1上的handler會稍微等待一會CPU0上的進程A,
等它立刻臨界區就會釋放spin lock的,但是,如果外設P的中斷事件被調度到了CPU0上執行會怎么樣?
CPU0上的進程A在持有spin lock的狀態下被中斷上下文搶占,而搶占它的CPU0上的handler在進入臨界區之前仍然會試圖獲取spin lock,
悲劇發生了,CPU0上的P外設的中斷handler永遠的進入spin狀態,這時候,CPU1上的進程B也不可避免在試圖持有spin lock的時候失敗而導致進入spin狀態。
為了解決這樣的問題,linux kernel采用了這樣的辦法:如果涉及到中斷上下文的訪問,spin lock需要和禁止本 CPU 上的中斷聯合使用。
三、再考慮下面的場景(底半部場景)
linux kernel中提供了豐富的bottom half的機制,雖然同屬中斷上下文,不過還是稍有不同。
我們可以把上面的場景簡單修改一下:
外設P不是中斷handler中訪問共享資源R,而是在的bottom half中訪問。
使用spin lock+禁止本地中斷當然是可以達到保護共享資源的效果,但是使用牛刀來殺雞似乎有點小題大做,這時候disable bottom half就OK了
四、中斷上下文之間的競爭
同一種中斷handler之間在uni core和multi core上都不會并行執行,這是linux kernel的特性。
如果不同中斷handler需要使用spin lock保護共享資源,對于新的內核(不區分fast handler和slow handler),所有handler都是關閉中斷的,因此使用spin lock不需要關閉中斷的配合。
bottom half又分成softirq和tasklet,同一種softirq會在不同的CPU上并發執行,因此如果某個驅動中的softirq的handler中會訪問某個全局變量,
對該全局變量是需要使用spin lock保護的,不用配合disable CPU中斷或者bottom half。tasklet更簡單,因為同一種tasklet不會多個CPU上并發。
實現
我們首先定義描述自旋鎖的結構體arch_spinlock_t。
typedef struct {
union {
unsigned int slock;
struct __raw_tickets {
unsigned short owner;
unsigned short next;
} tickets;
};
} arch_spinlock_t;
如上面的原理描述,我們需要兩個計數,分別是owner和next。
slock所占內存區域覆蓋owner和next(據說C語言學好的都能看得懂)。
下面實現申請鎖操作 arch_spin_lock。
static inline void arch_spin_lock(arch_spinlock_t *lock) {
arch_spinlock_t old_lock;
old_lock.slock = lock->slock; /* 1 */
lock->tickets.next++; /* 2 */
while (old_lock.tickets.next != old_lock.tickets.owner) { /* 3 */
old_lock.tickets.owner = lock->tickets.owner; /* 4 */
}
}
- 繼續上面的舉例。顧客從取號機器得到排隊號。
- 取號機器更新下個顧客將要拿到的排隊號。
- 看一下顯示屏,判斷是否輪到自己了。
- 一直盯著顯示屏對比是不是輪到自己了。
釋放鎖的操作就非常簡單了。還記得上面銀行辦理業務的例子嗎?
釋放鎖的操作僅僅是顯示屏上面的排隊號加1。我們僅僅需要將owner計數加1即可。arch_spin_unlock實現如下。
static inline void arch_spin_unlock(arch_spinlock_t *lock) {
lock->tickets.owner++;
}
信號量(semaphore)
信號量又稱為信號燈,它是用來協調不同進程間的數據對象的,而最主要的應用是共享內存方式的進程間通信。本質上,信號量是一個計數器,它用來記錄對某個資源(如共享內存)的存取狀況。
它負責協調各個進程,以保證他們能夠正確、合理的使用公共資源。它和spin lock最大的不同之處就是:無法獲取信號量的進程可以睡眠,因此會導致系統調度。一般說來,為了獲得共享資源,進程需要執行下列操作:
(1) 測試控制該資源的信號量。
(2) 若此信號量的值為正,則允許進行使用該資源。進程將信號量減1。
(3) 若此信號量為0,則該資源目前不可用,進程進入睡眠狀態,直至信號量值大于0,進程被喚醒,轉入步驟(1)。
(4) 當進程不再使用一個信號量控制的資源時,信號量值加1。如果此時有進程正在睡眠等待此信號量,則喚醒此進程。
維護信號量狀態的是Linux內核操作系統而不是用戶進程。
我們可以從頭文件/usr/src/linux/include/linux/sem.h 中看到內核用來維護信號量狀態的各個結構的定義。信號量是一個數據集合,用戶可以單獨使用這一集合的每個元素。
信號量(semaphore)是進程間通信處理同步互斥的機制。是在多線程環境下使用的一種措施,它負責協調各個進程,以保證他們能夠正確、合理的使用公共資源。它和spin lock最大的不同之處就是:無法獲取信號量的進程可以睡眠,因此會導致系統調度。
原理
信號量一般可以用來標記可用資源的個數。
舉2個生活中的例子:
- 我們要坐火車從南京到新疆,這個'任務'特別的耗時,只能在車上等著車到站,但是我們沒有必要一直睜著眼睛等著車到站,最好的情況就是我們上車就直接睡覺,醒來就到站,這樣從人(用戶)的角度來說,體驗是最好的,對比于進程,程序在等待一個耗時的任務的時候,沒有必須要占用CPU,可以暫停當前任務使其進入休眠狀態,當等待的事件發生之后再由其他任務喚醒,這種場景采用信號量比較合適。
- 我們在等待電梯、等待洗手間,這種場景需要等待的事件并不是很多,如果我們還要找個地方睡一覺,然后等電梯到了或者洗手間可以用了再醒來,那很顯然這也沒有必要,我們只需要排好隊,刷一刷抖音就可以了,對比于計算機程序,比如驅動在進入中斷例程,在等待某個寄存器被置位,這種場景需要等待的時間很短暫,系統開銷遠小于進入休眠的開銷,所以這種場景采用自旋鎖比較合適。
實現
為了記錄可用資源的數量,我們肯定需要一個count計數,標記當前可用資源數量。當然還要一個可以像圖書管理員一樣的筆記本功能。
用來記錄等待借書的同學。所以,一個雙向鏈表即可。因此只需要一個count計數和等待進程的鏈表頭即可。描述信號量的結構體如下。
struct semaphore {
unsigned int count;
struct list_head wait_list;
};
在Linux中,每個進程就相當于是每個借書的同學。
通知一個同學,就相當于喚醒這個進程。因此,我們還需要一個結構體記錄當前的進程信息(task_struct)。
struct semaphore_waiter {
struct list_head list;
struct task_struct *task;
};
struct semaphore_waiter的list成員是當進程無法獲取信號量的時候掛入semaphore的wait_list成員。task成員就是記錄后續被喚醒的進程信息。
將當前進程賦給task,并利用其list成員將該變量的節點加入到以sem中的wait_list為頭部的一個列表中,假設有多個進程在sem上調用down_interruptible,則sem的wait_list上形成的隊列如下圖:
(注:將一個進程阻塞,一般的經過是先把進程放到等待隊列中,接著改變進程的狀態,比如設為TASK_INTERRUPTIBLE,然后調用調度函數schedule(),后者將會把當前進程從cpu的運行隊列中摘下)
一切準備就緒,現在就可以實現信號量的申請函數。
void down(struct semaphore *sem) {
struct semaphore_waiter waiter;
if (sem->count > 0) {
sem->count--; /* 1 */
return;
}
waiter.task = current; /* 2 */
list_add_tail(&waiter.list, &sem->wait_list); /* 2 */
schedule(); /* 3 */
}
- 如果信號量標記的資源還有剩余,自然可以成功獲取信號量。只需要遞減可用資源計數。
- 既然無法獲取信號量,就需要將當前進程掛入信號量的等待隊列鏈表上。
- schedule()主要是觸發任務調度的示意函數,主動讓出CPU使用權。在讓出之前,需要將當前進程從運行隊列上移除。
釋放信號的實現也是比較簡單。實現如下。
void up(struct semaphore *sem) {
struct semaphore_waiter waiter;
if (list_empty(&sem->wait_list)) {
sem->count++; /* 1 */
return;
}
waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list);
list_del(&waiter->list); /* 2 */
wake_up_process(waiter->task); /* 2 */
}
- 如果等待鏈表沒有進程,那么自然只需要增加資源計數。
- 從等待進程鏈表頭取出第一個進程,并從鏈表上移除。然后就是喚醒該進程。
讀寫鎖(read-write lock)
不管是自旋鎖還是信號量在同一時間只能有一個進程進入臨界區。對于有些情況,我們是可以區分讀寫操作的。因此,我們希望對于讀操作的進程可以并發進行。對于寫操作只限于一個進程進入臨界區。而這種同步機制就是讀寫鎖。讀寫鎖一般具有以下幾種性質。
- 同一時間有且僅有一個寫進程進入臨界區。
- 在沒有寫進程進入臨界區的時候,同時可以有多個讀進程進入臨界區。
- 讀進程和寫進程不可以同時進入臨界區。
讀寫鎖有兩種,一種是信號量類型,另一種是spin lock類型。下面以spin lock類型講解。
原理
下面我們用廁所模型來理解讀寫鎖。
- 我發現公司一般都會有保潔阿姨打掃廁所。如果以男廁所為例的話,我覺得男士進入廁所就相當于讀者進入臨界區。因為可以有多個男士進廁所。而保潔阿姨進入男士廁所就相當于寫者進入臨界區。
- 假設A男士發現保潔阿姨不在打掃廁所,就進入廁所。隨后B和C同時也進入廁所。
- 然后保潔阿姨準備打掃廁所,發現有男士在廁所里面,因此只能在門口等待。
- ABC都離開了廁所。保潔阿姨迅速進入廁所打掃。然后D男士去上廁所,發現保潔阿姨在里面。灰溜溜的出來了在門口等著。現在體會到了寫者(保潔阿姨)具有排他性,讀者(男士)可以并發進入臨界區了吧。
既然我們允許多個讀者進入臨界區,因此我們需要一個計數統計讀者的個數。同時,由于寫者永遠只存在一個進入臨界區,因此只需要一個bit標記是否有寫進程進入臨界區。
所以,我們可以將兩個計數合二為一。只需要1個unsigned int類型即可。最高位(bit31)代表是否有寫者進入臨界區,低31位(0~30bit)統計讀者個數。
在這里插入圖片描述
實現
描述讀寫鎖只需要1個變量即可,因此我們可以定義讀寫鎖的結構體如下。
typedef struct {
volatile unsigned int lock;
} arch_rwlock_t;
既然區分讀寫操作,因此肯定會有兩個申請鎖函數,分別是讀和寫。首先,我們看一下read_lock操作的實現。
static inline void arch_read_lock(arch_rwlock_t *rw)
{
unsigned int tmp;
do {
tmp = rw->lock;
tmp++; /* 1 */
} while(tmp & (1 << 31)); /* 2 */
rw->lock = tmp;
}
- 增加讀者計數,最后會更新到rw->lock中。
- 更新rw->lock前提是沒有寫者,因此這里會判斷是否有寫者已經進入臨界區(判斷方法是rw->lock變量bit 31的值)。
如果,有寫者已經進入臨界區,就在這里循環。一直等到寫者離開。
當讀進程離開臨界區的時候會調用read_unlock釋放鎖。read_unlock實現如下。
static inline void arch_read_unlock(arch_rwlock_t *rw)
{
rw->lock--;
}
遞減讀者計數即可。
讀操作看完了,我們看看寫操作是如何實現的。arch_write_lock實現如下。
static inline void arch_write_lock(arch_rwlock_t *rw)
{
unsigned int tmp;
do {
tmp = rw->lock;
} while(tmp); /* 1 */
rw->lock = 1 << 31; /* 2 */
}
- 由于寫者是排他的(讀者和寫者都不能有),因此這里只有rw->lock的值為0,當前的寫者才可以進入臨界區。
- 置位rw->lock的bit31,代表有寫者進入臨界區。
當寫進程離開臨界區的時候會調用write_unlock釋放鎖。write_unlock實現如下。
static inline void arch_write_unlock(arch_rwlock_t *rw)
{
rw->lock = 0;
}
同樣由于寫者是排他的,因此只需要將rw->lock置0即可。代表沒有任何進程進入臨界區。
畢竟是因為同一時間只能有一個寫者進入臨界區,當這個寫者離開臨界區的時候,肯定是意味著現在沒有任何進程進入臨界區。
以上的代碼實現其實會導致寫進程餓死現象。
例如,A、B、C三個進程進入讀臨界區,D進程嘗試獲得寫鎖,此時只能等待A、B、C三個進程退出臨界區。如果在推出之前又有F、G進程進入讀臨界區,那么將出現D進程餓死現象。
互斥體(mutex)
互斥體實現了“互相排斥”(mutual exclusion)同步的簡單形式(所以名為互斥體(mutex))。
互斥體禁止多個線程同時進入受保護的代碼“臨界區”(critical section)。因此,在任意時刻,只有一個線程被允許進入這樣的代碼保護區。
任何線程在進入臨界區之前,必須獲取(acquire)與此區域相關聯的互斥體的所有權。
如果已有另一線程擁有了臨界區的互斥體,其他線程就不能再進入其中。這些線程必須等待,直到當前的屬主線程釋放(release)該互斥體。
什么時候需要使用互斥體呢?
互斥體用于保護共享的易變代碼,也就是,全局或靜態數據。這樣的數據必須通過互斥體進行保護,以防止它們在多個線程同時訪問時損壞。
前文提到的semaphore在初始化count計數的時候,可以分為計數信號量和互斥信號量(二值信號量)。
mutex和初始化計數為1的二值信號量有很大的相似之處。他們都可以用做資源互斥。但是mutex卻有一個特殊的地方:只有持鎖者才能解鎖。
但是,二值信號量卻可以在一個進程中獲取信號量,在另一個進程中釋放信號量。如果是應用在嵌入式應用的RTOS,針對mutex的實現還會考慮優先級反轉問題。
原理
既然mutex是一種二值信號量,因此就不需要像semaphore那樣需要一個count計數。
由于mutex具有“持鎖者才能解鎖”的特點,所以我們需要一個變量owner記錄持鎖進程。釋放鎖的時候必須是同一個進程才能釋放。
當然也需要一個鏈表頭,主要用來便利睡眠等待的進程。原理和semaphore及其相似,因此在代碼上也有體現。
實現
mutex的實現代碼和Linux中實現會有差異,但是依然可以為你呈現設計的原理。
下面的設計代碼更像是部分RTOS中的代碼。mutex和semaphore一樣,我們需要兩個類似的結構體分別描述mutex。
struct mutex_waiter {
struct list_head list;
struct task_struct *task;
};
struct mutex {
long owner;
struct list_head wait_list;
};
struct mutex_waiter的list成員是當進程無法獲取互斥量的時候掛入mutex的wait_list鏈表。
首先實現申請互斥量的函數。
void mutex_take(struct mutex *mutex)
{
struct mutex_waiter waiter;
if (!mutex->owner) {
mutex->owner = (long)current; /* 1 */
return;
}
waiter.task = current;
list_add_tail(&waiter.list, &mutex->wait_list); /* 2 */
schedule(); /* 2 */
}
- 當mutex->owner的值為0的時候,代表沒有任何進程持有鎖。因此可以直接申請成功。然后,記錄當前申請鎖進程的task_struct。
- 既然不能獲取互斥量,自然就需要睡眠等待,掛入等待鏈表。
互斥量的釋放代碼實現也同樣和semaphore有很多相似之處。
int mutex_release(struct mutex *mutex)
{
struct mutex_waiter *waiter;
if (mutex->owner != (long)current) /* 1 */
return -1;
if (list_empty(&mutex->wait_list)) {
mutex->owner = 0; /* 2 */
return 0;
}
waiter = list_first_entry(&mutex->wait_list, struct mutex_waiter, list);
list_del(&waiter->list);
mutex->owner = (long)waiter->task; /* 3 */
wake_up_process(waiter->task); /* 4 */
return 0;
}
- mutex具有“持鎖者才能解鎖”的特點就是在這行代碼體現。
- 如果等待鏈表沒有進程,那么自然只需要將mutex->owner置0,代表沒有鎖是釋放狀態。
- mutex->owner的值改成當前可以持鎖進程的task_struct。
- 從等待進程鏈表取出第一個進程,并從鏈表上移除。然后就是喚醒該進程。