成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Go 協(xié)程鎖機(jī)制的實(shí)現(xiàn)

開發(fā) 前端
鎖的實(shí)現(xiàn)方式通常是首先通過 CAS 嘗試獲得鎖,如果成功則直接返回,如果獲取失敗可以選擇自旋或者把當(dāng)前的實(shí)體加入鎖的等待隊(duì)列并把當(dāng)前實(shí)體改成阻塞狀態(tài),然后觸發(fā)重新調(diào)度,等待鎖的持有者解鎖然后喚醒等待者。

在日常的開發(fā)中,我們經(jīng)常會通過高并發(fā)來提高系統(tǒng)的處理能力,高并發(fā)帶來的一個問題就是對公共資源的訪問,這時(shí)候必須使用一些互斥機(jī)制保證數(shù)據(jù)的正確性。比如數(shù)據(jù)庫的鎖機(jī)制,或者我們基于 Redis、Zookeeper 等中間件實(shí)現(xiàn)的分布式鎖機(jī)制,線程/進(jìn)程間的鎖機(jī)制等,而在 Go 中,我們還會看到有協(xié)程的鎖機(jī)制。本文主要分析 Go 中協(xié)程鎖機(jī)制的實(shí)現(xiàn)。

鎖的實(shí)現(xiàn)方式通常是首先通過 CAS 嘗試獲得鎖,如果成功則直接返回,如果獲取失敗可以選擇自旋或者把當(dāng)前的實(shí)體加入鎖的等待隊(duì)列并把當(dāng)前實(shí)體改成阻塞狀態(tài),然后觸發(fā)重新調(diào)度,等待鎖的持有者解鎖然后喚醒等待者。但是比較有意思的是"把當(dāng)前的實(shí)體加入鎖的等待隊(duì)列"這個問題,比如有多個線程需要操作公共資源 A,當(dāng) A 被某個線程加鎖成功后,其余的加鎖失敗的多個線程都需要加入鎖 A 的等待隊(duì)列 Q1,這里也涉及到公共資源的訪問,所以對 Q1 的訪問也會執(zhí)行 CAS 加鎖,失敗后加入等待隊(duì)列 Q2,形成了套娃。后面我們會看到這個套娃是如何解決的。

協(xié)程間的鎖機(jī)制

在 Go 中,我們可以通過 Mutex 來實(shí)現(xiàn)多個協(xié)程對公共資源的訪問,它的使用方式很簡單,但是實(shí)現(xiàn)相對來說復(fù)雜很多,因?yàn)?Go 做了很多性能優(yōu)化,比如自旋,解鎖時(shí)喚醒的公平性。下面看一個例子。

package main

import (
 "sync"
)

func main() {
 var m sync.Mutex
 m.Lock()
  // 訪問公共資源
  m.Unlock()
}

鎖機(jī)制的 API 通常不會很復(fù)雜,一般就是 lock/unlock,多一點(diǎn)的就是 trylock 或帶 timeout 的 lock。接著看一下這簡單使用方式的背后 Mutex 是如何實(shí)現(xiàn)的。Mutex 的定義如下。

type Mutex struct {
 mu isync.Mutex
}

func (m *Mutex) Lock() {
 m.mu.Lock()
}

可以看到,Mutex 是對 isync.Mutex 的封裝,接著看 isync.Mutex Lock 的實(shí)現(xiàn)。

type Mutex struct {
 state int32
 sema  uint32
}

func (m *Mutex) Lock() {
  // 獲取鎖,成功就直接返回
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  return
 }
 m.lockSlow()
}

Lock 首先用 CAS 獲得鎖,成功則直接返回,失敗時(shí),Go 在獲得鎖失敗時(shí)會做一系列的優(yōu)化處理,我們只關(guān)注阻塞協(xié)程的部分。

func (m *Mutex) lockSlow() {
 iter := 0
 old := m.state
for {
    // 鎖被其他協(xié)程持有 & 不是饑餓模式 & 可以自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   // 自旋
   runtime_doSpin()
   iter++
   old = m.state
   continue
  }
    // 嘗試獲得鎖
if atomic.CompareAndSwapInt32(&m.state, old, new) {
      // 成功
   if old&(mutexLocked|mutexStarving) == 0 {
    break// locked the mutex with CAS
   }
      // 獲取失敗則阻塞
   runtime_SemacquireMutex(&m.sema, queueLifo, 2)
      iter = 0
  }
 }
}

Go 在獲得鎖失敗時(shí)會先嘗試自旋,如果還是失敗則調(diào)用 runtime_SemacquireMutex(&m.sema, queueLifo, 2) 把協(xié)程加入等待隊(duì)列并阻塞協(xié)程。runtime_SemacquireMutex 對應(yīng)函數(shù)如下。

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
 gp := getg()

// 獲取一個 sudog
 s := acquireSudog()
// semtable 是一個數(shù)組,每個元素對應(yīng)一棵樹,根據(jù) addr 計(jì)算哈希出所屬的數(shù)組索引,并拿到該索引對應(yīng)的樹的根節(jié)點(diǎn)
 root := semtable.rootFor(addr)

// 對樹進(jìn)行加鎖,因?yàn)樾枰?sodog 插入樹中
  lockWithRank(&root.lock, lockRankRoot)
// 把 sudog 插入樹中,sudog 記錄了 addr 和當(dāng)前的 g
  root.queue(addr, s, lifo)
// 把協(xié)程改成阻塞狀態(tài),調(diào)度其他協(xié)程執(zhí)行
  goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
}

semacquire1 的邏輯是首先獲得一個樹的鎖,然后把當(dāng)前 addr 和 g 封裝成 sudog 插入樹中,最終把協(xié)程改成阻塞狀態(tài),重新觸發(fā)調(diào)度。鎖的持有者解鎖后會喚醒鎖的等待者。

func semrelease1(addr *uint32, handoff bool, skipframes int) {
 // 根據(jù)地址找到樹的根節(jié)點(diǎn)
  root := semtable.rootFor(addr)
  // 加鎖操作樹
 lockWithRank(&root.lock, lockRankRoot)
  // 從樹中獲取 addr 對應(yīng)的 sudog
 s, t0, tailtime := root.dequeue(addr)
 unlock(&root.lock)
  // 修改協(xié)程為 ready 狀態(tài),等待調(diào)度執(zhí)行
  readyWithTime(s, 5+skipframes)
}

通過前面的分析可以看到 Go Mutex 的實(shí)現(xiàn)并不是直接使用多線程的鎖機(jī)制,而是 Go 自己實(shí)現(xiàn)的,因?yàn)橹苯邮褂枚嗑€程的鎖會導(dǎo)致一個協(xié)程阻塞引起整個線程阻塞。Go Mutex 的實(shí)現(xiàn)原理大致是首先通過 CAS 獲取鎖,成功則返回,失敗則獲取一個全局?jǐn)?shù)據(jù)結(jié)構(gòu)(樹)的鎖,把當(dāng)前 g 插入樹中等待喚醒。那么這個全局?jǐn)?shù)據(jù)結(jié)構(gòu)的鎖是如何獲取的呢?這就涉及到線程間的鎖機(jī)制了。

線程間的鎖機(jī)制

剛才加鎖全局?jǐn)?shù)據(jù)結(jié)構(gòu)的函數(shù)是 lockWithRank。

func lockWithRank(l *mutex, rank lockRank) {
 lock2(l)
}

func lock2(l *mutex) {
 gp := getg()

 k8 := key8(&l.key)
// 直接加鎖成功
 v8 := atomic.Xchg8(k8, mutexLocked)
if v8&mutexLocked == 0 {
if v8&mutexSleeping != 0 {
   atomic.Or8(k8, mutexSleeping)
  }
return
 }
// 創(chuàng)建一個線程級的加鎖數(shù)據(jù)結(jié)構(gòu),比如線程互斥結(jié)構(gòu)體
 semacreate(gp.m)

 v := atomic.Loaduintptr(&l.key)
tryAcquire:
for i := 0; ; i++ {
    // 判斷加鎖可以加鎖并且加鎖成功
if v&mutexLocked == 0 {
   prev8 := atomic.Xchg8(k8, mutexLocked|mutexSleeping)
      if prev8&mutexLocked == 0 {
        timer.end()
        return
      }
   v = atomic.Loaduintptr(&l.key)
   continue tryAcquire
  }
    // 阻塞
  semasleep(-1)
  v = atomic.Loaduintptr(&l.key)
 }
}

可以看到 lockWithRank 最終類似協(xié)程加鎖的流程,先嘗試加鎖,成功則返回,失敗則進(jìn)入阻塞流程,但是這個阻塞流程和協(xié)程的阻塞流程是不一樣的??匆幌?semasleep 的實(shí)現(xiàn),semasleep 在不同系統(tǒng)下實(shí)現(xiàn)不一樣,比如 MacOS 下是:

func semasleep(ns int64) int32 {
 g := getg()
 mp := g.m
 pthread_mutex_lock(&mp.mutex)
for {
    // 已經(jīng)解鎖了,則返回
if mp.count > 0 {
   mp.count--
   pthread_mutex_unlock(&mp.mutex)
   return0
  }
    // 阻塞等待條件變量的喚醒
  pthread_cond_wait(&mp.cond, &mp.mutex)
 }
}

Linux 下是:

func semasleep(ns int64) int32 {
 mp := getg().m

for v := atomic.Xadd(&mp.waitsema, -1); ; v = atomic.Load(&mp.waitsema) {
ifint32(v) >= 0 {
   return0
  }
  futexsleep(&mp.waitsema, v, ns)
if ns >= 0 {
   ifint32(v) >= 0 {
    return0
   } else {
    return-1
   }
  }
 }
}

func futexsleep(addr *uint32, val uint32, ns int64) {
if ns < 0 {
  futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
return
 }

var ts timespec
 ts.setNsec(ns)
 futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}

MacOS 是通過 C 庫函數(shù)實(shí)現(xiàn),Linux 下則是通過系統(tǒng)調(diào)用實(shí)現(xiàn)的。那么 C 庫和 Futex 是如何實(shí)現(xiàn)線程的鎖機(jī)制的呢?它們的實(shí)現(xiàn)有什么區(qū)別?

暫時(shí)沒有找到 MacOS 下的 C 庫實(shí)現(xiàn),下面是早期 linuxthreads 庫實(shí)現(xiàn)的線程鎖機(jī)制。

int __pthread_mutex_lock(pthread_mutex_t * mutex)
{
pthread_t self;

while(1) {
    // 加鎖, acquire 實(shí)現(xiàn)是 while (testandset(spinlock)) sched_yield();
    acquire(&mutex->m_spinlock);
    switch(mutex->m_kind) {
    case PTHREAD_MUTEX_FAST_NP:
      // 如果還沒有加鎖則加鎖直接返回
      if (mutex->m_count == 0) {
        mutex->m_count = 1;
        release(&mutex->m_spinlock);
        return0;
      }
      self = thread_self();
      break;
    default:
      return EINVAL;
    }
    // 獲取失敗,需要阻塞,把當(dāng)前線程插入該互斥鎖的等待隊(duì)列
    enqueue(&mutex->m_waiting, self);
    release(&mutex->m_spinlock);
    // 掛起等待喚醒
    suspend(self);
  }
}

linuxthreads 的實(shí)現(xiàn)是首先通過自旋鎖獲取一個鎖,這個鎖是用于互斥訪問 mutex 數(shù)據(jù)結(jié)構(gòu),對 mutex 加鎖成功則返回,失敗則把當(dāng)前線程加入 mutex 的等待隊(duì)列,然后再通過 suspend 阻塞在 SIGUSR1 信號,借助操作系統(tǒng)掛起當(dāng)前線程,然后解鎖時(shí)發(fā)送 SIGUSR1 信號喚醒等待線程。

int __pthread_mutex_unlock(pthread_mutex_t * mutex)
{
pthread_t th;
// 獲取 mutex 的鎖
  acquire(&mutex->m_spinlock);
switch (mutex->m_kind) {
case PTHREAD_MUTEX_FAST_NP:
    mutex->m_count = 0;
    break;
default:
    return EINVAL;
  }
// 取出一個被阻塞的線程
  th = dequeue(&mutex->m_waiting);
  release(&mutex->m_spinlock);
// 發(fā)送信號喚醒它
if (th != NULL) restart(th);
return0;
}

可以看到 linuxthreads 庫是通過自旋鎖解決了套娃的問題。而 glibc 的線程鎖機(jī)制則是借助 futex 實(shí)現(xiàn)的。

int pthread_mutex_lock (pthread_mutex_t *mutex)
{
// 獲取鎖類型
unsignedint type = PTHREAD_MUTEX_TYPE_ELISION (mutex);
// 普通鎖
if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP))
  {
    LLL_MUTEX_LOCK_OPTIMIZED (mutex);
  }
// 加鎖成功,記錄鎖的持有者
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
  mutex->__data.__owner = id;
return0;
}

pthread_mutex_lock 的核心邏輯是通過 LLL_MUTEX_LOCK_OPTIMIZED 實(shí)現(xiàn)的。

# define LLL_MUTEX_LOCK_OPTIMIZED(mutex) lll_mutex_lock_optimized (mutex)

static inline void lll_mutex_lock_optimized (pthread_mutex_t *mutex)
{
intprivate = PTHREAD_MUTEX_PSHARED (mutex);
  lll_lock (mutex->__data.__lock, private);
}

#define lll_lock(futex, private) \
  __lll_lock (&(futex), private)

#define __lll_lock(futex, private)                                      \
  ((void)                                                               \
   ({                                                                   \
     int *__futex = (futex);                                            \
     if (__glibc_unlikely                                               \
         (atomic_compare_and_exchange_bool_acq (__futex, 1, 0)))        \
       {                                                                \
         if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
           __lll_lock_wait_private (__futex);                           \
         else                                                           \
           __lll_lock_wait (__futex, private);                          \
       }                                                                \
   }))
   
void __lll_lock_wait (int *futex, intprivate)
{
if (atomic_load_relaxed (futex) == 2)
    goto futex;

while (atomic_exchange_acquire (futex, 2) != 0)
    {
    futex:
      futex_wait ((unsignedint *) futex, 2, private); /* Wait if *futex == 2.  */
    }
}

static inline int
futex_wait (int *futexp, int val)
{
#ifdef __NR_futex
return syscall (__NR_futex, futexp, FUTEX_WAIT, val);
#else
return syscall (__NR_futex_time64, futexp, FUTEX_WAIT, val);
#endif
}

pthread_mutex_lock 通過原子操作進(jìn)行加鎖,成功則返回,失敗則通過 futex 實(shí)現(xiàn)阻塞和喚醒。

Futex

Futex 是通過系統(tǒng)實(shí)現(xiàn)線程阻塞喚醒的一種方式,基于 futex 實(shí)現(xiàn)的鎖邏輯為首先通過 CAS 加鎖,成功后直接返回,失敗則通過 futex 陷入系統(tǒng)把當(dāng)前線程改成阻塞狀態(tài),然后鎖的持有者解鎖時(shí)再通過 futex 喚醒等待者。2.6.11 版本的 Futex 實(shí)現(xiàn)如下。

static int futex_wait(unsigned long uaddr, int val, unsigned long time)
{
// 等待隊(duì)列的節(jié)點(diǎn)
wait_queue_t wait = {    
    .task  = current, // 當(dāng)前線程     
    .func  = default_wake_function,   
    .task_list = { NULL, NULL } 
  }
int ret, curval;
struct futex_q q;

 retry:
// 加鎖內(nèi)存映射區(qū)域
 down_read(¤t->mm->mmap_sem);
// 根據(jù) uaddr 計(jì)算 key
 ret = get_futex_key(uaddr, &q.key);
// 插入隊(duì)列
 queue_me(&q, -1, NULL);

 up_read(¤t->mm->mmap_sem);

// 把當(dāng)前線程改成阻塞狀態(tài)
 __set_current_state(TASK_INTERRUPTIBLE);
// 把當(dāng)前線程插入 futex 結(jié)構(gòu)體的等待隊(duì)列
 add_wait_queue(&q.waiters, &wait);

if (likely(!list_empty(&q.list)))
  time = schedule_timeout(time);
 __set_current_state(TASK_RUNNING);
}

futex_wait 首先把 futex 結(jié)構(gòu)體插入到隊(duì)列中,然后把當(dāng)前線程插入到 futex 結(jié)構(gòu)體的隊(duì)列中,最后把當(dāng)前線程改成阻塞狀態(tài)并重新進(jìn)行調(diào)度。那么把 futex 結(jié)構(gòu)體插入到隊(duì)列中又必然會涉及到并發(fā)訪問公共資源的問題,看看 queue_me 是如何解決的。

static void queue_me(struct futex_q *q, int fd, struct file *filp)
{
	struct futex_hash_bucket *bh;

	q->fd = fd;
	q->filp = filp;

	init_waitqueue_head(&q->waiters);

	get_key_refs(&q->key);
	bh = hash_futex(&q->key);
	q->lock_ptr = &bh->lock;

	spin_lock(&bh->lock);
	bh->nqueued++;
	list_add_tail(&q->list, &bh->chain);
	spin_unlock(&bh->lock);
}

queue_me 先通過 key 從一個全局的結(jié)構(gòu)體數(shù)組中計(jì)算出對應(yīng)的索引,這個索引對應(yīng)的元素是一條鏈表(減少并發(fā)時(shí)的競爭),然后加鎖并把 futex 結(jié)構(gòu)體插入該鏈表中。這里使用的是自旋鎖,那么這里的鎖又是怎么加的呢?

#define spin_lock(lock)  _spin_lock(lock)

#define _spin_lock(lock) \
do { \
 preempt_disable(); \ // 關(guān)系統(tǒng)搶占
 _raw_spin_lock(lock); \
 __acquire(lock); \
} while(0)

_raw_spin_lock 在不同的架構(gòu)中實(shí)現(xiàn)不一樣。比如非 SMP 架構(gòu)下因?yàn)殛P(guān)閉了系統(tǒng)搶占,所以 _raw_spin_lock 的實(shí)現(xiàn)實(shí)際上什么都不需要做,因?yàn)椴粫l(fā)生并發(fā)問題。

#define _raw_spin_lock(lock)	do { (void)(lock); } while(0)

而 SMP 架構(gòu)下,可能存在多個 CPU 訪問該數(shù)據(jù)結(jié)構(gòu),所以需要真正的加鎖。

static inline void _raw_spin_lock(spinlock_t *lock)
{
 __asm__ __volatile__(
  spin_lock_string
  :"=m" (lock->slock) : : "memory");
}

#define spin_lock_string \
"\n1:\t" \
"lock ; decb %0\n\t" \
"jns 3f\n" \
"2:\t" \
"rep;nop\n\t" \
"cmpb $0,%0\n\t" \
"jle 2b\n\t" \
"jmp 1b\n" \
 "3:\n\t"

以上代碼大概是通過原子操作實(shí)現(xiàn)加鎖。

通過分析可以看到,在 Go 中,協(xié)程鎖機(jī)制的實(shí)現(xiàn)是由 Go 自己實(shí)現(xiàn)的,但是在實(shí)現(xiàn)中需要借助線程的鎖機(jī)制來實(shí)現(xiàn)協(xié)程的阻塞等待和喚醒,而線程的鎖機(jī)制又需要通過操作系統(tǒng)的 futex 來實(shí)現(xiàn)線程的阻塞等待和喚醒,而系統(tǒng)的等待和喚醒機(jī)制同樣需要通過鎖機(jī)制來實(shí)現(xiàn),具體實(shí)現(xiàn)在不同架構(gòu)下不一樣,比如在非 SMP 架構(gòu)中,只需要關(guān)系統(tǒng)搶占即可,這樣在執(zhí)行操作系統(tǒng)代碼時(shí)就不會有并發(fā)代碼訪問該公共數(shù)據(jù)結(jié)構(gòu)(需要保證中斷中也不會訪問該數(shù)據(jù)結(jié)構(gòu),否則也需要關(guān)中斷),而在 SMP 架構(gòu)下是通過自旋鎖來實(shí)現(xiàn)的,這樣就解決了套娃的問題。

責(zé)任編輯:姜華 來源: 編程雜技
相關(guān)推薦

2024-12-03 15:15:22

2016-10-28 17:39:47

phpgolangcoroutine

2013-12-12 16:44:25

Lua協(xié)程

2023-11-23 08:31:51

競爭鎖共享字段

2024-06-27 07:56:49

2024-02-05 09:06:25

Python協(xié)程Asyncio庫

2018-12-04 14:00:41

協(xié)程編程模式PHP

2021-04-25 09:36:20

Go協(xié)程線程

2021-09-16 09:59:13

PythonJavaScript代碼

2023-07-27 13:46:10

go開源項(xiàng)目

2017-05-02 11:38:00

PHP協(xié)程實(shí)現(xiàn)過程

2024-08-27 09:46:39

Go協(xié)程效率

2023-11-17 11:36:59

協(xié)程纖程操作系統(tǒng)

2023-07-13 08:06:05

應(yīng)用協(xié)程阻塞

2024-05-29 08:05:15

Go協(xié)程通信

2025-02-28 09:04:08

2021-09-27 23:28:29

Go多協(xié)程并發(fā)

2023-04-19 21:20:49

Tars-Cpp協(xié)程

2022-10-28 10:45:22

Go協(xié)程GoFrame

2021-05-21 08:21:57

Go語言基礎(chǔ)技術(shù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 男人的天堂在线视频 | 97人澡人人添人人爽欧美 | 综合婷婷| 日韩成人高清 | 亚洲国产精品久久久久秋霞不卡 | 亚洲国产黄色av | 国产精品久久久久久吹潮 | 久久综合香蕉 | 成人av一区二区在线观看 | 久久五月婷 | 99视频在线免费观看 | 国产精品二区三区 | 美女毛片免费看 | 亚洲欧美bt | 精品欧美 | 国产精品久久久久国产a级 欧美日韩国产免费 | 伊色综合久久之综合久久 | 成年人免费看 | 久草.com| 久久久久无码国产精品一区 | 毛片视频观看 | 精品久久国产老人久久综合 | 亚洲欧美日韩国产综合 | 人人干在线视频 | 成年人在线观看 | 久久伊人精品一区二区三区 | 欧美久久综合 | 国产区视频在线观看 | 奇米影视在线 | 日本久久www成人免 成人久久久久 | 久久久www成人免费精品张筱雨 | 国产精品久久久久久久久久不蜜臀 | 新疆少妇videos高潮 | 久久久久久久久久影视 | 日韩成人精品一区二区三区 | 黄色永久免费 | 国产激情91久久精品导航 | 精品一区二区三区av | 高清视频一区 | 欧美激情久久久 | 午夜天堂精品久久久久 |