成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Go官方設計了一個信號量庫

開發 后端
信號量(Semaphore),有時被稱為信號燈,是[多線程環境下使用的一種設施,是可以用來保證兩個或多個關鍵代碼段不被并發調用。在進入一個關鍵代碼段之前,線程必須獲取一個信號量;一旦該關鍵代碼段完成了,那么該線程必須釋放信號量。

[[420244]]

前言

哈嘍,大家好,我是asong。在寫上一篇文章請勿濫用goroutine時,發現Go語言擴展包提供了一個帶權重的信號量庫Semaphore,使用信號量我們可以實現一個"工作池"控制一定數量的goroutine并發工作。因為對源碼抱有好奇的態度,所以在周末仔細看了一下這個庫并進行了解析,在這里記錄一下。

何為信號量

要想知道一個東西是什么,我都愛去百度百科上搜一搜,輸入"信號量",這答案不就來了。

百度百科解釋:

信號量(Semaphore),有時被稱為信號燈,是[多線程環境下使用的一種設施,是可以用來保證兩個或多個關鍵代碼段不被并發調用。在進入一個關鍵代碼段之前,線程必須獲取一個信號量;一旦該關鍵代碼段完成了,那么該線程必須釋放信號量。其它想進入該關鍵代碼段的線程必須等待直到第一個線程釋放信號量。為了完成這個過程,需要創建一個信號量VI,然后將Acquire Semaphore VI以及Release Semaphore VI分別放置在每個關鍵代碼段的首末端。確認這些信號量VI引用的是初始創建的信號量。

通過這段解釋我們可以得知什么是信號量,其實信號量就是一種變量或者抽象數據類型,用于控制并發系統中多個進程對公共資源的訪問,訪問具有原子性。信號量主要分為兩類:

  • 二值信號量:顧名思義,其值只有兩種0或者1,相當于互斥量,當值為1時資源可用,當值為0時,資源被鎖住,進程阻塞無法繼續執行。
  • 計數信號量:信號量是一個任意的整數,起始時,如果計數器的計數值為0,那么創建出來的信號量就是不可獲得的狀態,如果計數器的計數值大于0,那么創建出來的信號量就是可獲得的狀態,并且總共獲取的次數等于計數器的值。

信號量工作原理

信號量是由操作系統來維護的,信號量只能進行兩種操作等待和發送信號,操作總結來說,核心就是PV操作:

  • P原語:P是荷蘭語Proberen(測試)的首字母。為阻塞原語,負責把當前進程由運行狀態轉換為阻塞狀態,直到另外一個進程喚醒它。操作為:申請一個空閑資源(把信號量減1),若成功,則退出;若失敗,則該進程被阻塞;
  • V原語:V是荷蘭語Verhogen(增加)的首字母。為喚醒原語,負責把一個被阻塞的進程喚醒,它有一個參數表,存放著等待被喚醒的進程信息。操作為:釋放一個被占用的資源(把信號量加1),如果發現有被阻塞的進程,則選擇一個喚醒之。

在信號量進行PV操作時都為原子操作,并且在PV原語執行期間不允許有中斷的發生。

PV原語對信號量的操作可以分為三種情況:

  • 把信號量視為某種類型的共享資源的剩余個數,實現對一類共享資源的訪問
  • 把信號量用作進程間的同步
  • 視信號量為一個加鎖標志,實現對一個共享變量的訪問

具體在什么場景使用本文就不在繼續分析,接下來我們重點來看一下Go語言提供的擴展包Semaphore,看看它是怎樣實現的。

官方擴展包Semaphore

我們之前在分析Go語言源碼時總會看到這幾個函數:

  1. func runtime_Semacquire(s *uint32) 
  2. func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int
  3. func runtime_Semrelease(s *uint32, handoff bool, skipframes int

這幾個函數就是信號量的PV操作,不過他們都是給Go內部使用的,如果想使用信號量,那就可以使用官方的擴展包:Semaphore,這是一個帶權重的信號量,接下來我們就重點分析一下這個庫。

安裝方法:go get -u golang.org/x/sync

數據結構

  1. type Weighted struct { 
  2.  size    int64 // 設置一個最大權值 
  3.  cur     int64 // 標識當前已被使用的資源數 
  4.  mu      sync.Mutex // 提供臨界區保護 
  5.  waiters list.List // 阻塞等待的調用者列表 

semaphore庫核心結構就是Weighted,主要有4個字段:

  • size:這個代表的是最大權值,在創建Weighted對象指定
  • cur:相當于一個游標,來記錄當前已使用的權值
  • mu:互斥鎖,并發情況下做臨界區保護
  • waiters:阻塞等待的調用者列表,使用鏈表數據結構保證先進先出的順序,存儲的數據是waiter對象,waiter數據結構如下:
  1. type waiter struct { 
  2.  n     int64 // 等待調用者權重值 
  3.  ready chan<- struct{} // close channel就是喚醒 

這里只有兩個字段:

  • n:這個就是等待調用者的權重值
  • ready:這就是一個channel,利用channel的close機制實現喚醒

semaphore還提供了一個創建Weighted對象的方法,在初始化時需要給定最大權值:

  1. // NewWeighted為并發訪問創建一個新的加權信號量,該信號量具有給定的最大權值。 
  2. func NewWeighted(n int64) *Weighted { 
  3.  w := &Weighted{size: n} 
  4.  return w 

阻塞獲取權值的方法 - Acquire

先直接看代碼吧:

  1. func (s *Weighted) Acquire(ctx context.Context, n int64) error { 
  2.  s.mu.Lock() // 加鎖保護臨界區 
  3.  // 有資源可用并且沒有等待獲取權值的goroutine 
  4.  if s.size-s.cur >= n && s.waiters.Len() == 0 { 
  5.   s.cur += n // 加權 
  6.   s.mu.Unlock() // 釋放鎖 
  7.   return nil 
  8.  } 
  9.  // 要獲取的權值n大于最大的權值了 
  10.  if n > s.size { 
  11.   // 先釋放鎖,確保其他goroutine調用Acquire的地方不被阻塞 
  12.   s.mu.Unlock() 
  13.   // 阻塞等待context的返回 
  14.   <-ctx.Done() 
  15.   return ctx.Err() 
  16.  } 
  17.  // 走到這里就說明現在沒有資源可用了 
  18.  // 創建一個channel用來做通知喚醒 
  19.  ready := make(chan struct{}) 
  20.  // 創建waiter對象 
  21.  w := waiter{n: n, ready: ready} 
  22.  // waiter按順序入隊 
  23.  elem := s.waiters.PushBack(w) 
  24.  // 釋放鎖,等待喚醒,別阻塞其他goroutine 
  25.  s.mu.Unlock() 
  26.  
  27.  // 阻塞等待喚醒 
  28.  select { 
  29.  // context關閉 
  30.  case <-ctx.Done(): 
  31.   err := ctx.Err() // 先獲取context的錯誤信息 
  32.   s.mu.Lock() 
  33.   select { 
  34.   case <-ready: 
  35.    // 在context被關閉后被喚醒了,那么試圖修復隊列,假裝我們沒有取消 
  36.    err = nil 
  37.   default
  38.    // 判斷是否是第一個元素 
  39.    isFront := s.waiters.Front() == elem 
  40.    // 移除第一個元素 
  41.    s.waiters.Remove(elem) 
  42.    // 如果是第一個元素且有資源可用通知其他waiter 
  43.    if isFront && s.size > s.cur { 
  44.     s.notifyWaiters() 
  45.    } 
  46.   } 
  47.   s.mu.Unlock() 
  48.   return err 
  49.  // 被喚醒了 
  50.  case <-ready: 
  51.   return nil 
  52.  } 

注釋已經加到代碼中了,總結一下這個方法主要有三個流程:

  • 流程一:有資源可用時并且沒有等待權值的goroutine,走正常加權流程;
  • 流程二:想要獲取的權值n大于初始化時設置最大的權值了,這個goroutine永遠不會獲取到信號量,所以阻塞等待context的關閉;
  • 流程三:前兩步都沒問題的話,就說明現在系統沒有資源可用了,這時就需要阻塞等待喚醒,在阻塞等待喚醒這里有特殊邏輯;
    • 特殊邏輯二:context關閉后,則根據是否有可用資源決定通知后面等待喚醒的調用者,這樣做的目的其實是為了避免當不同的context控制不同的goroutine時,未關閉的goroutine不會被阻塞住,依然執行,來看這樣一個例子(因為goroutine的搶占式調度,所以這個例子也會具有偶然性):
    • 特殊邏輯一:如果在context被關閉后被喚醒了,那么就先忽略掉這個cancel,試圖修復隊列。
  1. func main()  { 
  2.  s := semaphore.NewWeighted(3) 
  3.  ctx,cancel := context.WithTimeout(context.Background(), time.Second * 2) 
  4.  defer cancel() 
  5.  
  6.  for i :=0; i < 3; i++{ 
  7.    if i != 0{ 
  8.     go func(num int) { 
  9.      if err := s.Acquire(ctx,3); err != nil{ 
  10.       fmt.Printf("goroutine: %d, err is %s\n", num, err.Error()) 
  11.       return 
  12.      } 
  13.      time.Sleep(2 * time.Second
  14.      fmt.Printf("goroutine: %d run over\n",num) 
  15.      s.Release(3) 
  16.  
  17.     }(i) 
  18.    }else { 
  19.     go func(num int) { 
  20.      ct,cancel := context.WithTimeout(context.Background(), time.Second * 3) 
  21.      defer cancel() 
  22.      if err := s.Acquire(ct,3); err != nil{ 
  23.       fmt.Printf("goroutine: %d, err is %s\n", num, err.Error()) 
  24.       return 
  25.      } 
  26.      time.Sleep(3 * time.Second
  27.      fmt.Printf("goroutine: %d run over\n",num) 
  28.      s.Release(3) 
  29.     }(i) 
  30.    } 
  31.  
  32.  } 
  33.  time.Sleep(10 * time.Second

上面的例子中goroutine:0 使用ct對象來做控制,超時時間為3s,goroutine:1和goroutine:2對象使用ctx對象來做控制,超時時間為2s,這三個goroutine占用的資源都等于最大資源數,也就是說只能有一個goruotine運行成功,另外兩個goroutine都會被阻塞,因為goroutine是搶占式調度,所以我們不能確定哪個gouroutine會第一個被執行,這里我們假設第一個獲取到信號量的是gouroutine:2,阻塞等待的調用者列表順序是:goroutine:1 -> goroutine:0,因為在goroutine:2中有一個2s的延時,所以會觸發ctx的超時,ctx會下發Done信號,因為goroutine:2和goroutine:1都是被ctx控制的,所以就會把goroutine:1從等待者隊列中取消,但是因為goroutine:1屬于隊列的第一個隊員,并且因為goroutine:2已經釋放資源,那么就會喚醒goroutine:0繼續執行,畫個圖表示一下:

使用這種方式可以避免goroutine永久失眠。

不阻塞獲取權值的方法 - TryAcquire

  1. func (s *Weighted) TryAcquire(n int64) bool { 
  2.  s.mu.Lock() // 加鎖 
  3.  // 有資源可用并且沒有等待獲取資源的goroutine 
  4.  success := s.size-s.cur >= n && s.waiters.Len() == 0 
  5.  if success { 
  6.   s.cur += n 
  7.  } 
  8.  s.mu.Unlock() 
  9.  return success 

這個方法就簡單很多了,不阻塞地獲取權重為n的信號量,成功時返回true,失敗時返回false并保持信號量不變。

釋放權重

  1. func (s *Weighted) Release(n int64) { 
  2.  s.mu.Lock() 
  3.  // 釋放資源 
  4.  s.cur -= n 
  5.  // 釋放資源大于持有的資源,則會發生panic 
  6.  if s.cur < 0 { 
  7.   s.mu.Unlock() 
  8.   panic("semaphore: released more than held"
  9.  } 
  10.  // 通知其他等待的調用者 
  11.  s.notifyWaiters() 
  12.  s.mu.Unlock() 

這里就是很常規的操作,主要就是資源釋放,同時進行安全性判斷,如果釋放資源大于持有的資源,則會發生panic。

喚醒waiter

在Acquire和Release方法中都調用了notifyWaiters,我們來分析一下這個方法:

  1. func (s *Weighted) notifyWaiters() { 
  2.  for { 
  3.   // 獲取等待調用者隊列中的隊員 
  4.   next := s.waiters.Front() 
  5.   // 沒有要通知的調用者了 
  6.   if next == nil { 
  7.    break // No more waiters blocked. 
  8.   } 
  9.  
  10.   // 斷言出waiter信息 
  11.   w := next.Value.(waiter) 
  12.   if s.size-s.cur < w.n { 
  13.    // 沒有足夠資源為下一個調用者使用時,繼續阻塞該調用者,遵循先進先出的原則, 
  14.    // 避免需要資源數比較大的waiter被餓死 
  15.    // 
  16.    // 考慮一個場景,使用信號量作為讀寫鎖,現有N個令牌,N個reader和一個writer 
  17.    // 每個reader都可以通過Acquire(1)獲取讀鎖,writer寫入可以通過Acquire(N)獲得寫鎖定 
  18.    // 但不包括所有的reader,如果我們允許reader在隊列中前進,writer將會餓死-總是有一個令牌可供每個reader 
  19.    break 
  20.   } 
  21.  
  22.   // 獲取資源 
  23.   s.cur += w.n 
  24.   // 從waiter列表中移除 
  25.   s.waiters.Remove(next
  26.   // 使用channel的close機制喚醒waiter 
  27.   close(w.ready) 
  28.  } 

這里只需要注意一個點:喚醒waiter采用先進先出的原則,避免需要資源數比較大的waiter被餓死。

何時使用Semaphore

到這里我們就把Semaphore的源代碼看了一篇,代碼行數不多,封裝的也很巧妙,那么我們該什么時候選擇使用它呢?

目前能想到一個場景就是Semaphore配合上errgroup實現一個"工作池",使用Semaphore限制goroutine的數量,配合上errgroup做并發控制,示例如下:

  1. const ( 
  2.  limit = 2 
  3. )  
  4.  
  5. func main()  { 
  6.  serviceName := []string{ 
  7.   "cart"
  8.   "order"
  9.   "account"
  10.   "item"
  11.   "menu"
  12.  } 
  13.  eg,ctx := errgroup.WithContext(context.Background()) 
  14.  s := semaphore.NewWeighted(limit) 
  15.  for index := range serviceName{ 
  16.   name := serviceName[index
  17.   if err := s.Acquire(ctx,1); err != nil{ 
  18.    fmt.Printf("Acquire failed and err is %s\n", err.Error()) 
  19.    break 
  20.   } 
  21.   eg.Go(func() error { 
  22.    defer s.Release(1) 
  23.    return callService(name
  24.   }) 
  25.  } 
  26.  
  27.  if err := eg.Wait(); err != nil{ 
  28.   fmt.Printf("err is %s\n", err.Error()) 
  29.   return 
  30.  } 
  31.  fmt.Printf("run success\n"
  32.  
  33. func callService(name string) error { 
  34.  fmt.Println("call ",name
  35.  time.Sleep(1 * time.Second
  36.  return nil 

結果如下:

  1. call  order 
  2. call  cart 
  3. call  account 
  4. call  item 
  5. call  menu 
  6. run success 

總結

本文我們主要賞析了Go官方擴展庫Semaphore的實現,他的設計思路簡單,僅僅用幾十行就完成了完美的封裝,值得我們借鑒學習。不過在實際業務場景中,我們使用信號量的場景并不多,大多數場景我們都可以使用channel來替代,但是有些場景使用Semaphore來實現會更好,比如上篇文章【[警惕] 請勿濫用goroutine】我們使用channel+sync來控制goroutine數量,這種實現方式并不好,因為實際已經起來了多個goroutine,只不過控制了工作的goroutine數量,如果改用semaphore實現才是真正的控制了goroutine數量。

 

文中代碼已上傳github:https://github.com/asong2020/Golang_Dream/blob/master/code_demo/semaphore_demo/semaphore.go,歡迎star。

 

責任編輯:武曉燕 來源: Golang夢工廠
相關推薦

2023-08-28 07:04:17

2021-04-13 09:20:15

鴻蒙HarmonyOS應用開發

2010-04-21 16:50:31

Unix信號量

2020-11-05 09:59:24

Linux內核信號量

2010-04-21 16:25:13

Unix信號量

2021-09-07 07:53:42

Semaphore 信號量源碼

2010-04-21 16:42:48

Unix信號量

2010-04-21 15:37:38

Unix信號量

2020-09-25 07:34:40

Linux系統編程信號量

2021-04-30 00:00:50

Semaphore信號量面試官

2024-10-29 15:23:45

Python線程安全

2019-11-19 09:00:38

JavaAND信號量

2009-12-08 12:14:43

2010-03-17 16:36:10

Java信號量模型

2010-07-15 15:32:10

Perl線程

2010-04-21 17:10:25

Unix信號量

2016-11-23 16:08:24

Python處理器分布式系統

2017-05-11 14:05:25

Consul分布式信號量

2021-02-03 20:10:29

Linux信號量shell

2010-03-16 17:52:27

Java多線程信號量
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产精彩视频在线观看 | 国内自拍偷拍一区 | 欧美aⅴ | 久久一级| 日韩一级免费大片 | 2019天天干天天操 | 亚洲综合在线一区 | 国产男女猛烈无遮掩视频免费网站 | 在线视频国产一区 | 日韩成人免费在线视频 | 久久69精品久久久久久久电影好 | 久久久国产精品入口麻豆 | www.久久| 成人av色 | av在线影院 | 天天干天天干 | 97色在线观看免费视频 | 黄色大片免费网站 | 亚洲欧美日韩精品久久亚洲区 | 日韩精品一区二区三区中文字幕 | 国产高清视频一区二区 | 久久久av| 亚洲人成人一区二区在线观看 | 亚洲国产aⅴ成人精品无吗 国产精品永久在线观看 | 玖玖爱365| 日本三级全黄三级a | 国产亚洲网站 | 日韩福利片 | 亚洲 欧美 精品 | 久久亚洲春色中文字幕久久久 | 日韩欧美一区二区三区 | 在线一区观看 | 国产不卡视频 | 1区2区3区视频 | 亚洲精品视频一区 | 羞羞视频在线观看免费观看 | 嫩草91在线| 一区在线视频 | 国产精品一区二区在线观看 | 亚洲一区| 免费成人高清在线视频 |