Go 協(xié)程鎖機(jī)制的實(shí)現(xiàn)
在日常的開發(fā)中,我們經(jīng)常會通過高并發(fā)來提高系統(tǒng)的處理能力,高并發(fā)帶來的一個問題就是對公共資源的訪問,這時(shí)候必須使用一些互斥機(jī)制保證數(shù)據(jù)的正確性。比如數(shù)據(jù)庫的鎖機(jī)制,或者我們基于 Redis、Zookeeper 等中間件實(shí)現(xiàn)的分布式鎖機(jī)制,線程/進(jìn)程間的鎖機(jī)制等,而在 Go 中,我們還會看到有協(xié)程的鎖機(jī)制。本文主要分析 Go 中協(xié)程鎖機(jī)制的實(shí)現(xiàn)。
鎖的實(shí)現(xiàn)方式通常是首先通過 CAS 嘗試獲得鎖,如果成功則直接返回,如果獲取失敗可以選擇自旋或者把當(dāng)前的實(shí)體加入鎖的等待隊(duì)列并把當(dāng)前實(shí)體改成阻塞狀態(tài),然后觸發(fā)重新調(diào)度,等待鎖的持有者解鎖然后喚醒等待者。但是比較有意思的是"把當(dāng)前的實(shí)體加入鎖的等待隊(duì)列"這個問題,比如有多個線程需要操作公共資源 A,當(dāng) A 被某個線程加鎖成功后,其余的加鎖失敗的多個線程都需要加入鎖 A 的等待隊(duì)列 Q1,這里也涉及到公共資源的訪問,所以對 Q1 的訪問也會執(zhí)行 CAS 加鎖,失敗后加入等待隊(duì)列 Q2,形成了套娃。后面我們會看到這個套娃是如何解決的。
協(xié)程間的鎖機(jī)制
在 Go 中,我們可以通過 Mutex 來實(shí)現(xiàn)多個協(xié)程對公共資源的訪問,它的使用方式很簡單,但是實(shí)現(xiàn)相對來說復(fù)雜很多,因?yàn)?Go 做了很多性能優(yōu)化,比如自旋,解鎖時(shí)喚醒的公平性。下面看一個例子。
package main
import (
"sync"
)
func main() {
var m sync.Mutex
m.Lock()
// 訪問公共資源
m.Unlock()
}
鎖機(jī)制的 API 通常不會很復(fù)雜,一般就是 lock/unlock,多一點(diǎn)的就是 trylock 或帶 timeout 的 lock。接著看一下這簡單使用方式的背后 Mutex 是如何實(shí)現(xiàn)的。Mutex 的定義如下。
type Mutex struct {
mu isync.Mutex
}
func (m *Mutex) Lock() {
m.mu.Lock()
}
可以看到,Mutex 是對 isync.Mutex 的封裝,接著看 isync.Mutex Lock 的實(shí)現(xiàn)。
type Mutex struct {
state int32
sema uint32
}
func (m *Mutex) Lock() {
// 獲取鎖,成功就直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
m.lockSlow()
}
Lock 首先用 CAS 獲得鎖,成功則直接返回,失敗時(shí),Go 在獲得鎖失敗時(shí)會做一系列的優(yōu)化處理,我們只關(guān)注阻塞協(xié)程的部分。
func (m *Mutex) lockSlow() {
iter := 0
old := m.state
for {
// 鎖被其他協(xié)程持有 & 不是饑餓模式 & 可以自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 自旋
runtime_doSpin()
iter++
old = m.state
continue
}
// 嘗試獲得鎖
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 成功
if old&(mutexLocked|mutexStarving) == 0 {
break// locked the mutex with CAS
}
// 獲取失敗則阻塞
runtime_SemacquireMutex(&m.sema, queueLifo, 2)
iter = 0
}
}
}
Go 在獲得鎖失敗時(shí)會先嘗試自旋,如果還是失敗則調(diào)用 runtime_SemacquireMutex(&m.sema, queueLifo, 2) 把協(xié)程加入等待隊(duì)列并阻塞協(xié)程。runtime_SemacquireMutex 對應(yīng)函數(shù)如下。
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
gp := getg()
// 獲取一個 sudog
s := acquireSudog()
// semtable 是一個數(shù)組,每個元素對應(yīng)一棵樹,根據(jù) addr 計(jì)算哈希出所屬的數(shù)組索引,并拿到該索引對應(yīng)的樹的根節(jié)點(diǎn)
root := semtable.rootFor(addr)
// 對樹進(jìn)行加鎖,因?yàn)樾枰?sodog 插入樹中
lockWithRank(&root.lock, lockRankRoot)
// 把 sudog 插入樹中,sudog 記錄了 addr 和當(dāng)前的 g
root.queue(addr, s, lifo)
// 把協(xié)程改成阻塞狀態(tài),調(diào)度其他協(xié)程執(zhí)行
goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
}
semacquire1 的邏輯是首先獲得一個樹的鎖,然后把當(dāng)前 addr 和 g 封裝成 sudog 插入樹中,最終把協(xié)程改成阻塞狀態(tài),重新觸發(fā)調(diào)度。鎖的持有者解鎖后會喚醒鎖的等待者。
func semrelease1(addr *uint32, handoff bool, skipframes int) {
// 根據(jù)地址找到樹的根節(jié)點(diǎn)
root := semtable.rootFor(addr)
// 加鎖操作樹
lockWithRank(&root.lock, lockRankRoot)
// 從樹中獲取 addr 對應(yīng)的 sudog
s, t0, tailtime := root.dequeue(addr)
unlock(&root.lock)
// 修改協(xié)程為 ready 狀態(tài),等待調(diào)度執(zhí)行
readyWithTime(s, 5+skipframes)
}
通過前面的分析可以看到 Go Mutex 的實(shí)現(xiàn)并不是直接使用多線程的鎖機(jī)制,而是 Go 自己實(shí)現(xiàn)的,因?yàn)橹苯邮褂枚嗑€程的鎖會導(dǎo)致一個協(xié)程阻塞引起整個線程阻塞。Go Mutex 的實(shí)現(xiàn)原理大致是首先通過 CAS 獲取鎖,成功則返回,失敗則獲取一個全局?jǐn)?shù)據(jù)結(jié)構(gòu)(樹)的鎖,把當(dāng)前 g 插入樹中等待喚醒。那么這個全局?jǐn)?shù)據(jù)結(jié)構(gòu)的鎖是如何獲取的呢?這就涉及到線程間的鎖機(jī)制了。
線程間的鎖機(jī)制
剛才加鎖全局?jǐn)?shù)據(jù)結(jié)構(gòu)的函數(shù)是 lockWithRank。
func lockWithRank(l *mutex, rank lockRank) {
lock2(l)
}
func lock2(l *mutex) {
gp := getg()
k8 := key8(&l.key)
// 直接加鎖成功
v8 := atomic.Xchg8(k8, mutexLocked)
if v8&mutexLocked == 0 {
if v8&mutexSleeping != 0 {
atomic.Or8(k8, mutexSleeping)
}
return
}
// 創(chuàng)建一個線程級的加鎖數(shù)據(jù)結(jié)構(gòu),比如線程互斥結(jié)構(gòu)體
semacreate(gp.m)
v := atomic.Loaduintptr(&l.key)
tryAcquire:
for i := 0; ; i++ {
// 判斷加鎖可以加鎖并且加鎖成功
if v&mutexLocked == 0 {
prev8 := atomic.Xchg8(k8, mutexLocked|mutexSleeping)
if prev8&mutexLocked == 0 {
timer.end()
return
}
v = atomic.Loaduintptr(&l.key)
continue tryAcquire
}
// 阻塞
semasleep(-1)
v = atomic.Loaduintptr(&l.key)
}
}
可以看到 lockWithRank 最終類似協(xié)程加鎖的流程,先嘗試加鎖,成功則返回,失敗則進(jìn)入阻塞流程,但是這個阻塞流程和協(xié)程的阻塞流程是不一樣的??匆幌?semasleep 的實(shí)現(xiàn),semasleep 在不同系統(tǒng)下實(shí)現(xiàn)不一樣,比如 MacOS 下是:
func semasleep(ns int64) int32 {
g := getg()
mp := g.m
pthread_mutex_lock(&mp.mutex)
for {
// 已經(jīng)解鎖了,則返回
if mp.count > 0 {
mp.count--
pthread_mutex_unlock(&mp.mutex)
return0
}
// 阻塞等待條件變量的喚醒
pthread_cond_wait(&mp.cond, &mp.mutex)
}
}
Linux 下是:
func semasleep(ns int64) int32 {
mp := getg().m
for v := atomic.Xadd(&mp.waitsema, -1); ; v = atomic.Load(&mp.waitsema) {
ifint32(v) >= 0 {
return0
}
futexsleep(&mp.waitsema, v, ns)
if ns >= 0 {
ifint32(v) >= 0 {
return0
} else {
return-1
}
}
}
}
func futexsleep(addr *uint32, val uint32, ns int64) {
if ns < 0 {
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
return
}
var ts timespec
ts.setNsec(ns)
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}
MacOS 是通過 C 庫函數(shù)實(shí)現(xiàn),Linux 下則是通過系統(tǒng)調(diào)用實(shí)現(xiàn)的。那么 C 庫和 Futex 是如何實(shí)現(xiàn)線程的鎖機(jī)制的呢?它們的實(shí)現(xiàn)有什么區(qū)別?
暫時(shí)沒有找到 MacOS 下的 C 庫實(shí)現(xiàn),下面是早期 linuxthreads 庫實(shí)現(xiàn)的線程鎖機(jī)制。
int __pthread_mutex_lock(pthread_mutex_t * mutex)
{
pthread_t self;
while(1) {
// 加鎖, acquire 實(shí)現(xiàn)是 while (testandset(spinlock)) sched_yield();
acquire(&mutex->m_spinlock);
switch(mutex->m_kind) {
case PTHREAD_MUTEX_FAST_NP:
// 如果還沒有加鎖則加鎖直接返回
if (mutex->m_count == 0) {
mutex->m_count = 1;
release(&mutex->m_spinlock);
return0;
}
self = thread_self();
break;
default:
return EINVAL;
}
// 獲取失敗,需要阻塞,把當(dāng)前線程插入該互斥鎖的等待隊(duì)列
enqueue(&mutex->m_waiting, self);
release(&mutex->m_spinlock);
// 掛起等待喚醒
suspend(self);
}
}
linuxthreads 的實(shí)現(xiàn)是首先通過自旋鎖獲取一個鎖,這個鎖是用于互斥訪問 mutex 數(shù)據(jù)結(jié)構(gòu),對 mutex 加鎖成功則返回,失敗則把當(dāng)前線程加入 mutex 的等待隊(duì)列,然后再通過 suspend 阻塞在 SIGUSR1 信號,借助操作系統(tǒng)掛起當(dāng)前線程,然后解鎖時(shí)發(fā)送 SIGUSR1 信號喚醒等待線程。
int __pthread_mutex_unlock(pthread_mutex_t * mutex)
{
pthread_t th;
// 獲取 mutex 的鎖
acquire(&mutex->m_spinlock);
switch (mutex->m_kind) {
case PTHREAD_MUTEX_FAST_NP:
mutex->m_count = 0;
break;
default:
return EINVAL;
}
// 取出一個被阻塞的線程
th = dequeue(&mutex->m_waiting);
release(&mutex->m_spinlock);
// 發(fā)送信號喚醒它
if (th != NULL) restart(th);
return0;
}
可以看到 linuxthreads 庫是通過自旋鎖解決了套娃的問題。而 glibc 的線程鎖機(jī)制則是借助 futex 實(shí)現(xiàn)的。
int pthread_mutex_lock (pthread_mutex_t *mutex)
{
// 獲取鎖類型
unsignedint type = PTHREAD_MUTEX_TYPE_ELISION (mutex);
// 普通鎖
if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP))
{
LLL_MUTEX_LOCK_OPTIMIZED (mutex);
}
// 加鎖成功,記錄鎖的持有者
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
mutex->__data.__owner = id;
return0;
}
pthread_mutex_lock 的核心邏輯是通過 LLL_MUTEX_LOCK_OPTIMIZED 實(shí)現(xiàn)的。
# define LLL_MUTEX_LOCK_OPTIMIZED(mutex) lll_mutex_lock_optimized (mutex)
static inline void lll_mutex_lock_optimized (pthread_mutex_t *mutex)
{
intprivate = PTHREAD_MUTEX_PSHARED (mutex);
lll_lock (mutex->__data.__lock, private);
}
#define lll_lock(futex, private) \
__lll_lock (&(futex), private)
#define __lll_lock(futex, private) \
((void) \
({ \
int *__futex = (futex); \
if (__glibc_unlikely \
(atomic_compare_and_exchange_bool_acq (__futex, 1, 0))) \
{ \
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
__lll_lock_wait_private (__futex); \
else \
__lll_lock_wait (__futex, private); \
} \
}))
void __lll_lock_wait (int *futex, intprivate)
{
if (atomic_load_relaxed (futex) == 2)
goto futex;
while (atomic_exchange_acquire (futex, 2) != 0)
{
futex:
futex_wait ((unsignedint *) futex, 2, private); /* Wait if *futex == 2. */
}
}
static inline int
futex_wait (int *futexp, int val)
{
#ifdef __NR_futex
return syscall (__NR_futex, futexp, FUTEX_WAIT, val);
#else
return syscall (__NR_futex_time64, futexp, FUTEX_WAIT, val);
#endif
}
pthread_mutex_lock 通過原子操作進(jìn)行加鎖,成功則返回,失敗則通過 futex 實(shí)現(xiàn)阻塞和喚醒。
Futex
Futex 是通過系統(tǒng)實(shí)現(xiàn)線程阻塞喚醒的一種方式,基于 futex 實(shí)現(xiàn)的鎖邏輯為首先通過 CAS 加鎖,成功后直接返回,失敗則通過 futex 陷入系統(tǒng)把當(dāng)前線程改成阻塞狀態(tài),然后鎖的持有者解鎖時(shí)再通過 futex 喚醒等待者。2.6.11 版本的 Futex 實(shí)現(xiàn)如下。
static int futex_wait(unsigned long uaddr, int val, unsigned long time)
{
// 等待隊(duì)列的節(jié)點(diǎn)
wait_queue_t wait = {
.task = current, // 當(dāng)前線程
.func = default_wake_function,
.task_list = { NULL, NULL }
}
int ret, curval;
struct futex_q q;
retry:
// 加鎖內(nèi)存映射區(qū)域
down_read(¤t->mm->mmap_sem);
// 根據(jù) uaddr 計(jì)算 key
ret = get_futex_key(uaddr, &q.key);
// 插入隊(duì)列
queue_me(&q, -1, NULL);
up_read(¤t->mm->mmap_sem);
// 把當(dāng)前線程改成阻塞狀態(tài)
__set_current_state(TASK_INTERRUPTIBLE);
// 把當(dāng)前線程插入 futex 結(jié)構(gòu)體的等待隊(duì)列
add_wait_queue(&q.waiters, &wait);
if (likely(!list_empty(&q.list)))
time = schedule_timeout(time);
__set_current_state(TASK_RUNNING);
}
futex_wait 首先把 futex 結(jié)構(gòu)體插入到隊(duì)列中,然后把當(dāng)前線程插入到 futex 結(jié)構(gòu)體的隊(duì)列中,最后把當(dāng)前線程改成阻塞狀態(tài)并重新進(jìn)行調(diào)度。那么把 futex 結(jié)構(gòu)體插入到隊(duì)列中又必然會涉及到并發(fā)訪問公共資源的問題,看看 queue_me 是如何解決的。
static void queue_me(struct futex_q *q, int fd, struct file *filp)
{
struct futex_hash_bucket *bh;
q->fd = fd;
q->filp = filp;
init_waitqueue_head(&q->waiters);
get_key_refs(&q->key);
bh = hash_futex(&q->key);
q->lock_ptr = &bh->lock;
spin_lock(&bh->lock);
bh->nqueued++;
list_add_tail(&q->list, &bh->chain);
spin_unlock(&bh->lock);
}
queue_me 先通過 key 從一個全局的結(jié)構(gòu)體數(shù)組中計(jì)算出對應(yīng)的索引,這個索引對應(yīng)的元素是一條鏈表(減少并發(fā)時(shí)的競爭),然后加鎖并把 futex 結(jié)構(gòu)體插入該鏈表中。這里使用的是自旋鎖,那么這里的鎖又是怎么加的呢?
#define spin_lock(lock) _spin_lock(lock)
#define _spin_lock(lock) \
do { \
preempt_disable(); \ // 關(guān)系統(tǒng)搶占
_raw_spin_lock(lock); \
__acquire(lock); \
} while(0)
_raw_spin_lock 在不同的架構(gòu)中實(shí)現(xiàn)不一樣。比如非 SMP 架構(gòu)下因?yàn)殛P(guān)閉了系統(tǒng)搶占,所以 _raw_spin_lock 的實(shí)現(xiàn)實(shí)際上什么都不需要做,因?yàn)椴粫l(fā)生并發(fā)問題。
#define _raw_spin_lock(lock) do { (void)(lock); } while(0)
而 SMP 架構(gòu)下,可能存在多個 CPU 訪問該數(shù)據(jù)結(jié)構(gòu),所以需要真正的加鎖。
static inline void _raw_spin_lock(spinlock_t *lock)
{
__asm__ __volatile__(
spin_lock_string
:"=m" (lock->slock) : : "memory");
}
#define spin_lock_string \
"\n1:\t" \
"lock ; decb %0\n\t" \
"jns 3f\n" \
"2:\t" \
"rep;nop\n\t" \
"cmpb $0,%0\n\t" \
"jle 2b\n\t" \
"jmp 1b\n" \
"3:\n\t"
以上代碼大概是通過原子操作實(shí)現(xiàn)加鎖。
通過分析可以看到,在 Go 中,協(xié)程鎖機(jī)制的實(shí)現(xiàn)是由 Go 自己實(shí)現(xiàn)的,但是在實(shí)現(xiàn)中需要借助線程的鎖機(jī)制來實(shí)現(xiàn)協(xié)程的阻塞等待和喚醒,而線程的鎖機(jī)制又需要通過操作系統(tǒng)的 futex 來實(shí)現(xiàn)線程的阻塞等待和喚醒,而系統(tǒng)的等待和喚醒機(jī)制同樣需要通過鎖機(jī)制來實(shí)現(xiàn),具體實(shí)現(xiàn)在不同架構(gòu)下不一樣,比如在非 SMP 架構(gòu)中,只需要關(guān)系統(tǒng)搶占即可,這樣在執(zhí)行操作系統(tǒng)代碼時(shí)就不會有并發(fā)代碼訪問該公共數(shù)據(jù)結(jié)構(gòu)(需要保證中斷中也不會訪問該數(shù)據(jù)結(jié)構(gòu),否則也需要關(guān)中斷),而在 SMP 架構(gòu)下是通過自旋鎖來實(shí)現(xiàn)的,這樣就解決了套娃的問題。