探索 Go 并發編程的背后
在操作系統中,進程是資源分配的單位,線程是調度的單位,Go 在線程的基礎上提供了更細粒度的調度單位-協程。協程跑在多個線程中,由 Go 運行時調度,可以原生利用系統的多核,實現強大的并發能力,但是同時也帶來了復雜性。并發編程需要謹慎和更多的思考,一不小心可能就會導致 panic、死鎖或并發引起的邏輯問題。本文嘗試探索 Go 并發編程中的一些問題,以及看 Go 是如何解決的。
并發編程的問題
在并發編程中,多個協程往往需要對共享數據進行操作,這時候情況就會變得復雜,下面看幾個例子。
原子性
原子性問題是指執行一個操作無法一步完成,而是需要執行多個步驟,但是多個實體同時執行時會存在問題,比如下面的例子。
package main
import (
"sync"
)
func main() {
for {
var count int
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
count++
}()
go func() {
defer wg.Done()
count++
}()
wg.Wait()
if count != 2 {
panic("count should be 2")
}
}
}
上面代碼中,因為 count++ 不是原子操作,需要分解成多個步驟,所以 count 可能會出現不等于 2 的情況。在單核中,協程 1 可能執行到獲取 count = 0 時發生了調度,然后協程 2 執行了 count++ 并回寫,最后協程 1 也執行了 count++ 并回寫,最終結果是 1。在多核中,協程 1 和 2 同時獲取了 count 的值是 0,然后加一后寫入,最終結果是 1。類似的問題還有在 32 位系統寫入 64 位數據,或者內存不對齊時,寫入了一部分數據被另一個協程讀取了,導致讀取了錯誤的數據或非法的指針地址。
指令重排
編譯器和 CPU 為了性能優化可能會對指令進行重排,指令重排會保證單個線程內的邏輯符合預期,在多個線程邏輯解耦的情況是非常有意義的,但是如果多線程共享了內存則可能會出現問題,因為代碼的執行順序和我們寫的代碼不一致,可能會導致邏輯出錯。首先看一個 C++ 的例子。
#include <iostream>
#include <thread>
using namespace std;
bool flag = false;
int value = 0;
void func1() {
value = 11111;
// asm volatile("" ::: "memory"); // 編譯器內存屏障
flag = true;
}
void func2() {
if (flag) {
// asm volatile("" ::: "memory");
if (value == 0) {
cout << "value == 0" << endl;
exit(0);
}
}
}
int main() {
while(1) {
flag = false;
value = 0;
thread t1 = thread(func1);
thread t2 = thread (func2);
t1.join();
t2.join();
}
return 0;
}
通過 g++ -O2 main.cpp 編譯上面的代碼然后執行,最終會輸入 value = 0。這個問題看起來非常詭異,和我們的代碼邏輯不太符合預期。可能的原因有:
- 編譯器或 CPU 發生了指令重排,線程 1 執行了 flag = true,后執行 value = 11111。
- 內存可見性問題,即線程 1 中執行變量 1 和 2 的寫操作,但是變量 2 先回刷內存,線程 2 會讀取最新的變量 2,但是讀取到舊的變量 1。
大多數弱內存順序的 CPU 都會保證線程 2 看到 flag 為 true 時,value = 11111 是成立的,所以不是內存可見性問題。接著在代碼中加入 asm volatile("" ::: "memory") 編譯器屏障發現依然會輸出 value = 0,說明不是因為編譯器指令重排引起的,通過編譯后的代碼也可以看到編譯器的確沒有重排指令。
__Z5func1v: ## @_Z5func1v
.cfi_startproc
## %bb.0:
pushq %rbp
.cfi_def_cfa_offset 16
.cfi_offset %rbp, -16
movq %rsp, %rbp
.cfi_def_cfa_register %rbp
movl $11111, _value(%rip) ## 先賦值 value,再賦值 flag
movb $1, _flag(%rip)
popq %rbp
retq
.cfi_endproc
## -- End function
.globl __Z5func2v ## -- Begin function _Z5func2v
.p2align 4, 0x90
__Z5func2v: ## @_Z5func2v
.cfi_startproc
## %bb.0:
pushq %rbp
.cfi_def_cfa_offset 16
.cfi_offset %rbp, -16
movq %rsp, %rbp
.cfi_def_cfa_register %rbp
cmpb $0, _flag(%rip) ## 先判斷 flag
je LBB1_2
## %bb.1:
cmpl $0, _value(%rip) ## 再判斷 value
je LBB1_3
所以只可能是 CPU 指令重排,我們再設置一下 CPU 內存屏障禁止指令重排看一下。
#include <iostream>
#include <thread>
using namespace std;
bool flag = false;
int value = 0;
void func1() {
value = 11111;
// 禁止寫寫重排
asm volatile("dmb ishst" ::: "memory");
flag = true;
}
void func2() {
if (flag) {
// 禁止讀讀重排
asm volatile("dmb ishl" ::: "memory");
if (value == 0) {
cout << "value == 0" << endl;
exit(0);
}
}
}
int main() {
while(1) {
flag = false;
value = 0;
thread t1 = thread(func1);
thread t2 = thread (func2);
t1.join();
t2.join();
}
return 0;
}
這樣就不會出現 value = 0 了。
在 Go 中也會存在一樣的問題。
package main
import (
"sync"
)
func main() {
for {
var flag bool
var count int
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
count = 1
flag = true
}()
go func() {
defer wg.Done()
if flag && count != 1 {
panic("count should be 1")
}
}()
wg.Wait()
}
}
上面的代碼會發生 panic,這里可以通過設置內存屏障解決這個問題,但是實踐中我們一般使用 Go 提供的 API。
package main
/*
void Store() {
asm volatile("dmb ishst" ::: "memory");
}
void Load() {
asm volatile("dmb ishld" ::: "memory");
}
*/
import "C"
import (
"sync"
)
func main() {
for {
var flag bool
var count int
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
count = 1
C.Store()
flag = true
}()
go func() {
defer wg.Done()
if flag {
C.Load()
if count != 1 {
panic("count should be 1")
}
}
}()
wg.Wait()
}
}
內存可見性
因為每個 CPU 都有自己的 L1/2 緩存,所以 CPU 0 寫入的數據,CPU 1 不一定讀到最新的。比如下面的例子。
package main
func main() {
var count int
// 先執行
go func() {
count = 1
}()
// 后執行
go func() {
if count != 1 {
panic("count should be 1")
}
}()
}
上面的代碼假設協程 1 先執行完畢后協程 2 才執行,協程 2 可能會看不到協程 1 的寫入,也就是 count 不等于 1。
從之前的例子中可以看到,在并發編程中存在幾個問題。
- 原子性:一個操作需要多個步驟完成,多個線程同時執行這個操作,導致數據錯亂。
- 指令重排:編譯器和 CPU 為了性能優化會對指令進行重排,出現代碼的執行順序和我們寫的代碼不一致,導致邏輯出錯。
- 內存可見性:因為 CPU 有自己的獨立的緩存,所以 CPU 可能不會立刻看到另一個 CPU 寫入的值,但是 CPU 會提供對應的指令保證讓開發者可以保證內存的可見性。
并發編程的方案
以上的這些問題非常繁瑣且復雜,不同的編譯器、不同的編譯器版本和不同的 CPU 架構處理方式都是不一樣的,但是幸好 Go 通過 sync 包為我們解決了這些問題。
原子操作
package main
import (
"sync"
"sync/atomic"
)
func main() {
for {
var count atomic.Int32
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
count.Add(1)
}()
go func() {
defer wg.Done()
count.Add(1)
}()
wg.Wait()
if count.Load() != 2 {
panic("count should be 2")
}
}
}
通過 atomic.Int32 我們就可以保證對 count 加一的計算是原子的,并且每次操作完對另一個協程來說都是可見的,從而保證 count 的值最終是 2,接下來看一下 atomic.Int32 Add 的實現。
TEXT ·Xadd(SB), NOSPLIT, $0-12
MOVL ptr+0(FP), BX
MOVL delta+4(FP), AX
MOVL AX, CX
LOCK
XADDL AX, 0(BX)
ADDL CX, AX
MOVL AX, ret+8(FP)
RET
從 Xadd 的實現可以看到最終是通過 LOCK 和 XADDL 指令實現了原子操作,LOCK 指令不僅保證了 XADDL 指令的執行是原子的,同時實現了類似內存屏障的功能,禁止了指令重排和保證 LOCK 之前的指令讀取的數據是最新的以及 LOCK 指令后的寫操作會對其他協程可見。
互斥鎖
package main
import (
"sync"
)
func main() {
for {
var mutex sync.Mutex
var flag bool
var count int
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
mutex.Lock()
defer mutex.Unlock()
count = 1
flag = true
}()
go func() {
defer wg.Done()
mutex.Lock()
defer mutex.Unlock()
if flag && count != 1 {
panic("count should be 1")
}
}()
wg.Wait()
}
}
通過互斥鎖,我們可以保證最終讀到 flag 為 true 時 count 肯定是 1。那么這個是如何保證的呢?接下來看一下鎖的實現,我們假設協程 1 執行完后協程 2 才執行。Lock 的實現如下。
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
}
Lock 是通過 CAS 實現加鎖的,CAS 的實現如下。
TEXT ·Cas(SB), NOSPLIT, $0-13
MOVL ptr+0(FP), BX
MOVL old+4(FP), AX
MOVL new+8(FP), CX
LOCK
CMPXCHGL CX, 0(BX)
SETEQ ret+12(FP)
RET
CAS 是通過 LOCK 和 CMPXCHGL 實現的。
- 保證內存可見性,也就是說如果其他代碼獲取了鎖,這里可以感知到,否則就大家都拿到了鎖就有問題了。
- 實現了原子性保證自己拿到鎖時別人肯定拿不到鎖,
- 拿到鎖后也會被其他代碼感知到。
成功拿到鎖后設置了 flag 和 value,然后調 Unlock,Unlock 實現如下。
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
AddInt32 底層是 LOCK 和 XADDL 指令實現的,所以可以保證 Unlock 之前的寫入對其他協程可見。通過互斥鎖我們就保證了操作的互斥性和內存可見性。
WaitGroup
package main
import "sync"
func main() {
for {
var count int
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
count = 1
}()
wg.Wait()
if count != 1 {
panic("count should be 1")
}
}
}
通過 wg.Done() 可以保證 count = 1 的寫入對另一個協程可見。
結論
在并發編程中,涉及到指令重排、內存可見性和內存屏障等一系列復雜的概念,并且不同的編譯器、版本、CPU 行為都不一致。所以在 Go 并發編程中,如果需要在協程間會操作共享數據一定要使用 sync 包提供的能力或 channel 來保證操作的原子性和數據的可見性,否則可能會出現一些隱藏且晦澀的問題。最后以一個例子結尾,下面的例子在 Intel CPU 下是沒問題的,但是在 M4 下就會 panic。
package main
import "sync"
type Client struct {
Count int
}
type Singleton struct {
client *Client
metux sync.Mutex
}
func (s *Singleton) Get() (client *Client, err error) {
if s.client != nil {
return s.client, nil
}
s.metux.Lock()
defer s.metux.Unlock()
// double check
if s.client == nil {
s.client, err = &Client{Count: 1}, nil
if err != nil {
return nil, err
}
}
return s.client, nil
}
func main() {
for {
singleton := &Singleton{}
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
singleton.Get()
}()
go func() {
defer wg.Done()
client, _ := singleton.Get()
if client != nil && client.Count != 1 {
panic("count should be 1")
}
}()
wg.Wait()
}
}