Libtask源碼解析之鎖
本文轉載自微信公眾號「編程雜技」,作者theanarkh 。轉載本文請聯系編程雜技公眾號。
libtask中其實不需要鎖,因為libtask中協程是非搶占式的,不存在競態條件。但是libtask還是實現了一套鎖的機制。我們看一下這個鎖機制的實現。首先我們看一下結構體。
- struct QLock
- {
- // 鎖持有者
- Task *owner;
- // 等待該鎖的隊列
- Tasklist waiting;
- };
接著我們看一下鎖的操作。
加鎖
- static int _qlock(QLock *l, int block)
- {
- // 鎖沒有持有者,則置當前協程為持有者,直接返回,1表示加鎖成功
- if(l->owner == nil){
- l->owner = taskrunning;
- return 1;
- }
- // 非阻塞,則直接返回,0表示加鎖失敗
- if(!block)
- return 0;
- // 插入等待鎖隊列
- addtask(&l->waiting, taskrunning);
- taskstate("qlock");
- // 切換到其他協程
- taskswitch();
- // 切換回來時,如果持有鎖的協程不是當前協程,則異常退出,因為只有持有鎖才會被切換回來,見unqlock
- if(l->owner != taskrunning){
- fprint(2, "qlock: owner=%p self=%p oops\n", l->owner, taskrunning);
- abort();
- }
- return 1;
- }
如果當前鎖沒有持有者,則當前協程X就變成鎖的持有者,否則把協程X插入等待鎖隊列中,然后讓出cpu,切換到其他協程。當后續鎖被釋放并被協程X持有時,協程X就會被喚醒繼續持續。加鎖可以分為阻塞和非阻塞兩種模式。非阻塞就是加鎖失敗也不會切換協程。
- // 阻塞式加鎖
- void qlock(QLock *l)
- {
- _qlock(l, 1);
- }
- // 非阻塞式加鎖
- int
- canqlock(QLock *l)
- {
- return _qlock(l, 0);
- }
釋放鎖
接下來我們看一下釋放鎖的邏輯
- // 釋放鎖
- void qunlock(QLock *l)
- {
- Task *ready;
- // 鎖并沒有持有者,異常退出
- if(l->owner == 0){
- fprint(2, "qunlock: owner=0\n");
- abort();
- }
- // 如果還有協程在等待該鎖,則置為持有者,并且從等待隊列中刪除,然后修改狀態為就緒并加入就緒隊列
- if((l->owner = ready = l->waiting.head) != nil){
- deltask(&l->waiting, ready);
- taskready(ready);
- }
- }
當鎖被釋放時,如果還有協程在等待該鎖,則從等待隊列中摘取一個節點,然后變成鎖的持有者并從等待隊列中刪除。最后插入就緒隊列等待調度。以上是一種互斥鎖的實現。下面我們再來看一下讀寫鎖機制,讀寫鎖也是互斥的,但是在某些情況下也可以共享。我們看一下讀寫鎖的數據結構。
- struct RWLock
- {
- // 正在讀的讀者個數
- int readers;
- // 當前正在寫的寫者,只有一個
- Task *writer;
- // 等待讀和寫的隊列
- Tasklist rwaiting;
- Tasklist wwaiting;
- };
接著我看一下加鎖邏輯。
加讀鎖
- // 加讀鎖
- static int _rlock(RWLock *l, int block)
- {
- /*
- 沒有正在寫并且沒有等待寫,則加鎖成功,并且讀者數加一
- */
- if(l->writer == nil && l->wwaiting.head == nil){
- l->readers++;
- return 1;
- }
- // 非阻塞則直接返回
- if(!block)
- return 0;
- // 插入等待讀隊列
- addtask(&l->rwaiting, taskrunning);
- taskstate("rlock");
- // 切換上下文
- taskswitch();
- // 切換回來了,說明加鎖成功
- return 1;
- }
當且僅當沒有正在寫的寫者和等待寫的寫者時,才能加讀鎖成功,否則根據加鎖模式進行下一步處理,直接返回加鎖失敗或者插入等待隊列,然后切換到其他協程。我們看到當有一個等待寫的協程時(l->wwaiting.head != nil),則后續的讀者就無法加鎖成功,而是被插入等待隊列,否則可能會引起寫者饑餓。
加寫鎖
- // 加寫鎖
- static int _wlock(RWLock *l, int block)
- {
- // 沒有正在寫并且沒有正在讀,則加鎖成功,并置寫者為當前協程
- if(l->writer == nil && l->readers == 0){
- l->writer = taskrunning;
- return 1;
- }
- // 非阻塞則直接返回
- if(!block)
- return 0;
- // 加入等待寫隊列
- addtask(&l->wwaiting, taskrunning);
- taskstate("wlock");
- // 切換
- taskswitch();
- // 切換回來說明拿到鎖了
- return 1;
- }
當且僅當沒有正在寫的寫者和沒有正在讀的讀者時,才能加寫鎖成功。否則類似加讀鎖一樣處理。
釋放讀鎖
- // 釋放讀鎖
- void runlock(RWLock *l)
- {
- Task *t;
- // 讀者減一,如果等于0并且有等待寫的協程,則隊列第一個協程持有該鎖
- if(--l->readers == 0 && (t = l->wwaiting.head) != nil){
- deltask(&l->wwaiting, t);
- l->writer = t;
- taskready(t);
- }
- }
持有讀鎖,說明當前肯定沒有正在寫的寫者,但是可能有等待寫的寫者和等待讀的讀者(因為有等待寫的寫者導致無法加鎖成功)。當釋放讀鎖時,如果還有其他讀者,則其他讀者可以繼續持有鎖,因為讀者可以共享讀鎖,而寫者保持原來狀態。如果這時候沒有讀者但是有等待寫的寫者,則從隊列中選擇第一個節點成為鎖的持有者,其他的寫者則繼續等待,因為寫者不能共享寫鎖。
釋放寫鎖
- // 釋放寫鎖
- void wunlock(RWLock *l)
- {
- Task *t;
- // 沒有正在寫,異常退出
- if(l->writer == nil){
- fprint(2, "wunlock: not locked\n");
- abort();
- }
- // 置空,沒有協程正在寫
- l->writer = nil;
- // 有正在讀,異常退出,寫的時候,是無法讀的
- if(l->readers != 0){
- fprint(2, "wunlock: readers\n");
- abort();
- }
- // 釋放寫鎖時,優先讓讀者持有鎖,因為讀者可以共享持有鎖,提高并發
- // 讀可以共享,把等待讀的協程都加入就緒隊列,并持有鎖
- while((t = l->rwaiting.head) != nil){
- deltask(&l->rwaiting, t);
- l->readers++;
- taskready(t);
- }
- // 釋放寫鎖時,如果又沒有讀者,并且有等待寫的協程,則隊列的第一個等待寫的協程持有鎖
- if(l->readers == 0 && (t = l->wwaiting.head) != nil){
- deltask(&l->wwaiting, t);
- l->writer = t;
- taskready(t);
- }
- }
持有寫鎖,可能有等待寫的寫者和等待讀的讀者。這里是讀者優先持有鎖,因為讀者可以共享持有鎖,提高并發,如果沒有讀者,則再判斷寫者。
總結:單純的互斥鎖是比較簡單的,讀寫鎖就相對復雜一點,主要是要根據讀鎖和寫鎖的特性制定一些策略,比如避免饑餓問題。libtask的方式是,加寫鎖的時候,當無法持有鎖的時候,申請者就會被插入等待等待隊列。這個是沒有什么好說的,加讀者的時候,情況就復雜了點,如果這時候有讀者正在持有鎖,理論上,申請者也可以持有鎖,因為讀鎖是共享的,但是單純這樣處理的話,可能會導致等待寫的寫者一直拿不到鎖,所以這里需要判斷是否有等待寫的寫者,如果有則當前申請者則不能再持有讀鎖,而是要加入等待隊列。那么在釋放鎖的時候,當釋放讀鎖時,優先讓等待寫的寫者持有鎖,再到等待讀的讀者持有鎖。同樣,當釋放寫鎖時,優先讓讀者持有鎖,這樣就能比較好地平衡讀者和寫者持有鎖的機會。