Go 語言中 sync 包的近距離觀察
讓我們來看看負責提供同步原語的 Go 包:sync。
sync.Mutex
sync.Mutex 可能是 sync 包中被廣泛使用的原語。它允許對共享資源進行互斥操作(即不允許同時訪問):
mutex := &sync.Mutex{}
mutex.Lock()
// Update shared variable (e.g. slice, pointer on a structure, etc.)
mutex.Unlock()
必須指出的是 sync.Mutex 無法被復制(就像 sync 包中的所有其他原語一樣)。如果一個結構體有一個 sync 字段,必須通過指針進行傳遞。
sync.RWMutex
sync.RWMutex 是一個讀寫鎖。它提供了與我們剛剛看到的 Lock() 和 Unlock() 相同的方法(因為這兩個結構都實現了 sync.Locker 接口)。然而,它還允許使用 RLock() 和 RUnlock() 方法進行并發讀?。?/p>
mutex := &sync.RWMutex{}
mutex.Lock()
// Update shared variable
mutex.Unlock()
mutex.RLock()
// Read shared variable
mutex.RUnlock()
一個 sync.RWMutex 允許至少一個讀取者或正好一個寫入者,而一個 sync.Mutex 則允許正好一個讀取者或寫入者。
讓我們運行一個快速的基準測試來比較這些方法:
func BenchmarkMutexLock(b *testing.B) {
m := sync.Mutex{}
for i := 0; i < b.N; i++ {
m.Lock()
m.Unlock()
}
}
func BenchmarkRWMutexLock(b *testing.B) {
m := sync.RWMutex{}
for i := 0; i < b.N; i++ {
m.Lock()
m.Unlock()
}
}
func BenchmarkRWMutexRLock(b *testing.B) {
m := sync.RWMutex{}
for i := 0; i < b.N; i++ {
m.RLock()
m.RUnlock()
}
}
BenchmarkMutexLock-4 83497579 17.7 ns/op
BenchmarkRWMutexLock-4 35286374 44.3 ns/op
BenchmarkRWMutexRLock-4 89403342 15.3 ns/op
正如我們注意到的那樣,讀取鎖定/解鎖 sync.RWMutex 比鎖定/解鎖 sync.Mutex 更快。另一方面,調用 Lock()/Unlock() 在 sync.RWMutex 上是最慢的操作。
總的來說,當我們有頻繁的讀取和不經常的寫入時,應該使用 sync.RWMutex。
sync.WaitGroup
sync.WaitGroup 也經常被使用。它是一個 goroutine 等待一組 goroutine 完成的慣用方式。
sync.WaitGroup 擁有一個內部計數器。如果這個計數器等于 0,Wait() 方法會立即返回。否則,它會被阻塞,直到計數器變為 0。
要增加計數器,我們可以使用 Add(int) 方法。要減少計數器,可以使用 Done()(將計數器減 1)或者使用帶有負值的相同的 Add(int) 方法。
在以下示例中,我們將啟動八個 goroutine 并等待它們完成:
wg := &sync.WaitGroup{}
for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
// Do something
wg.Done()
}()
}
wg.Wait()
// Continue execution
每次我們創建一個 goroutine 時,都會使用 wg.Add(1) 來增加 wg 的內部計數器。我們也可以在 for 循環外部調用 wg.Add(8)。
與此同時,每當一個 goroutine 完成時,它會使用 wg.Done() 來減少 wg 的內部計數器。
一旦執行了八個 wg.Done() 語句,主 goroutine 就會繼續執行。
sync.Map
sync.Map 是 Go 中的一個并發版本的 map,我們可以:
- 使用 Store(interface{}, interface{}) 添加元素
- 使用 Load(interface) interface{} 檢索元素
- 使用 Delete(interface{}) 刪除元素
- 使用 LoadOrStore(interface{}, interface{}) (interface, bool) 檢索或添加元素(如果之前不存在)。返回的 bool 值為 true 表示在操作前鍵存在于 map 中。
- 使用 Range 在元素上進行迭代
m := &sync.Map{}
// Put elements
m.Store(1, "one")
m.Store(2, "two")
// Get element 1
value, contains := m.Load(1)
if contains {
fmt.Printf("%s\n", value.(string))
}
// Returns the existing value if present, otherwise stores it
value, loaded := m.LoadOrStore(3, "three")
if !loaded {
fmt.Printf("%s\n", value.(string))
}
// Delete element 3
m.Delete(3)
// Iterate over all the elements
m.Range(func(key, value interface{}) bool {
fmt.Printf("%d: %s\n", key.(int), value.(string))
return true
})
Go 在線測試: https://play.golang.org/p/BO8IDVIDwsr
one
three
1: one
2: two
正如你所看到的,Range 方法接受一個 func(key, value interface{}) bool 函數作為參數。如果我們返回 false,則迭代會停止。有趣的是,即使我們在恒定時間之后返回 false(更多信息),最壞情況下的時間復雜度仍然保持為 O(n)。
何時應該使用 sync.Map 而不是在經典的 map 上加 sync.Mutex 呢?
- 當我們有頻繁讀取和不經常寫入時(與 sync.RWMutex 類似)
- 當多個 goroutine 為不相交的鍵集合讀取、寫入和覆蓋條目。這具體意味著什么?例如,如果我們有一個分片實現,有 4 個 goroutine 每個負責 25% 的鍵(沒有沖突)。在這種情況下,sync.Map 也是首選。
sync.Pool
sync.Pool 是一個并發池,負責安全地保存一組對象。
其公共方法包括:
- Get() interface{} 用于檢索一個元素
- Put(interface{}) 用于添加一個元素
pool := &sync.Pool{}
pool.Put(NewConnection(1))
pool.Put(NewConnection(2))
pool.Put(NewConnection(3))
connection := pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)
connection = pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)
connection = pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)
1
3
2
值得注意的是,就順序而言是沒有保證的。Get 方法指定它從池中獲取一個任意的項目。
也可以指定一個創建方法:
pool := &sync.Pool{
New: func() interface{} {
return NewConnection()
},
}
connection := pool.Get().(*Connection)
每次調用 Get() 時,它將返回由傳遞給 pool.New 的函數創建的對象(在本例中是一個指針)。
何時應該使用 sync.Pool 呢?有兩種情況:
- 第一種情況是當我們需要重用共享且長期存在的對象時,比如一個數據庫連接。
- 第二種情況是優化內存分配。
讓我們考慮一個函數的示例,該函數將數據寫入緩沖區并將結果持久化到文件中。使用 sync.Pool,我們可以重復使用分配給緩沖區的空間,跨不同的函數調用重復使用同一個對象。
第一步是檢索先前分配的緩沖區(或者如果是第一次調用,則創建一個,但這已經被抽象化了)。然后,延遲操作是將緩沖區放回池中。
func writeFile(pool *sync.Pool, filename string) error {
// Gets a buffer object
buf := pool.Get().(*bytes.Buffer)
// Returns the buffer into the pool
defer pool.Put(buf)
// Reset buffer otherwise it will contain "foo" during the first call
// Then "foofoo" etc.
buf.Reset()
buf.WriteString("foo")
return ioutil.WriteFile(filename, buf.Bytes(), 0644)
}
sync.Pool 還有一個要提到的重要點。由于指針可以被放入 Get() 返回的接口值中,無需進行任何分配,因此最好將指針放入池中而不是結構體。
這樣,我們既可以有效地重用已分配的內存,又可以減輕垃圾收集器的壓力,因為如果變量逃逸到堆上,它就不需要再次分配內存。
sync.Once
sync.Once 是一個簡單而強大的原語,用于確保一個函數只被執行一次。
在這個例子中,將只有一個 goroutine 顯示輸出消息:
once := &sync.Once{}
for i := 0; i < 4; i++ {
i := i
go func() {
once.Do(func() {
fmt.Printf("first %d\n", i)
})
}()
}
我們使用了 Do(func()) 方法來指定只有這部分代碼必須被執行一次。
sync.Cond
讓我們以最可能最少使用的原語 sync.Cond 結束。
它用于向 goroutine 發出信號(一對一)或向 goroutine(s) 廣播信號(一對多)。
假設我們有一個場景,需要通知一個 goroutine 共享切片的第一個元素已被更新。
創建一個 sync.Cond 需要一個 sync.Locker 對象(可以是 sync.Mutex 或 sync.RWMutex):
cond := sync.NewCond(&sync.RWMutex{})
接下來,讓我們編寫一個函數來顯示切片的第一個元素:
func printFirstElement(s []int, cond *sync.Cond) {
cond.L.Lock()
cond.Wait()
fmt.Printf("%d\n", s[0])
cond.L.Unlock()
}
正如你所看到的,我們可以使用 cond.L 來訪問內部互斥鎖。一旦鎖被獲取,我們調用 cond.Wait(),它會阻塞直到收到信號。
現在回到主 goroutine。我們將通過傳遞一個共享切片和之前創建的 sync.Cond 來創建一個 printFirstElement 池。然后,我們調用一個 get() 函數,將結果存儲在 s[0] 中并發出一個信號:
s := make([]int, 1)
for i := 0; i < runtime.NumCPU(); i++ {
go printFirstElement(s, cond)
}
i := get()
cond.L.Lock()
s[0] = i
cond.Signal()
cond.L.Unlock()
這個信號將解除一個創建的 goroutine 的阻塞狀態,它將顯示 s[0]。
然而,如果我們退一步來看,我們可能會認為我們的代碼可能違反了 Go 最基本的原則之一:
不要通過共享內存來通信;相反,通過通信來共享內存。
事實上,在這個例子中,最好使用一個通道來傳遞 get() 返回的值。
然而,我們也提到了 sync.Cond 還可以用于廣播信號。
讓我們修改上一個示例的結尾,將 Signal() 改為 Broadcast():
i := get()
cond.L.Lock()
s[0] = i
cond.Broadcast()
cond.L.Unlock()
在這種情況下,所有的 goroutine 都會被觸發。
眾所周知,通道元素只會被一個 goroutine 捕獲。唯一模擬廣播的方式是關閉一個通道,但這不能重復使用。因此,盡管 頗具爭議,這無疑是一個有趣的特性。
還有一個值得提及的 sync.Cond 使用場景,也許是最重要的一個:
示例的 Go Playground 地址:https://play.golang.org/p/ap5qXF5DAg5