Go并發機制解密:Goroutine調度
Goroutine 是 Go 編程語言中一個極具特色的設計,也是其并發能力的核心亮點之一。Goroutine 本質上是一種協程(Coroutine),是實現并行計算的關鍵。使用 Goroutine 非常簡單,只需通過 go 關鍵字即可啟動一個協程,協程會以異步方式運行。程序無需等待 Goroutine 完成即可繼續執行后續代碼。
go func() // 使用 go 關鍵字啟動一個協程
II. Goroutine 的內部原理
概念介紹
并發(Concurrency)
在單個 CPU 上,可以同時執行多個任務。在極短的時間內,CPU 會在任務之間快速切換(例如,先執行一小段程序 A,然后迅速切換到程序 B)。從宏觀上看,這種任務的時間上有重疊,似乎是同時執行的,但從微觀上看,實際上是順序執行的。這種現象稱為并發。
并行(Parallelism)
當系統擁有多個 CPU 時,每個 CPU 可以同時運行任務,且各自不需要爭奪資源。多個任務真正同時運行,這種現象稱為并行。
進程(Process)
當 CPU 在多個程序之間切換時,如果不保存之前程序的狀態(即上下文),直接切換到下一個程序,那么之前程序的一系列狀態會丟失。為了解決這個問題,引入了進程的概念。進程為程序執行分配所需的資源,因此進程是程序運行的基本資源單位(也可以看作程序執行的實體)。例如,運行一個文本編輯器時,該進程會管理所有資源,如文本緩沖區的內存空間、文件操作資源等。
線程(Thread)
CPU 在多個進程之間切換時,由于需要進入內核模式并讀取用戶模式數據,切換開銷較大。隨著進程數量增加,CPU 調度會消耗大量資源。為了解決這一問題,引入了線程的概念。線程本身消耗的資源很少,它們共享進程內的資源。線程的調度開銷比進程小得多。例如,在一個 Web 服務器應用中,可以使用多個線程同時處理不同的客戶端請求,這些線程共享服務器進程的資源(如網絡連接和內存緩存)。
協程(Coroutine)
協程擁有自己的寄存器上下文和棧。當協程被調度切換時,會保存當前的寄存器上下文和棧;當切換回來時,則恢復之前保存的上下文和棧。因此,協程可以保留上一次調用的狀態(即所有局部狀態的特定組合)。每次重新進入協程時,相當于返回到上次調用時的狀態,即邏輯流程中上次退出的位置。
線程和進程的操作由系統接口觸發,最終由系統執行;而協程的操作由用戶程序自身執行。Goroutine 就是一種協程。
調度模型簡介
Goroutine 的強大并發能力通過 GPM 調度模型實現。以下是 Goroutine 調度模型的核心結構:
調度器中的四個重要結構
- M(Machine)表示內核級線程。每個 M 對應一個線程,Goroutine 運行在 M 上。例如,當一個 Goroutine 被啟動以執行復雜計算時,該 Goroutine 會被分配到一個 M 上執行。M 是一個較大的結構,包含小對象內存緩存(mcache)、當前正在執行的 Goroutine、隨機數生成器等信息。
- G(Goroutine)表示 Goroutine。它有自己的棧,用于存儲函數調用信息,還有一個指令指針,用于指定執行位置。此外,G 還包含其他信息(如等待的通道信息),這些信息用于調度。例如,當一個 Goroutine 等待從通道接收數據時,該信息會存儲在 G 結構中。
- P(Processor)全稱為 Processor,主要用于執行 Goroutine。可以將其視為任務分發器。P 維護一個 Goroutine 隊列,存儲需要由其執行的所有 Goroutine。例如,當創建多個 Goroutine 時,這些 Goroutine 會被添加到 P 的隊列中等待調度。
- Sched(Scheduler)表示調度器。可以看作是中央調度中心,維護 M 和 G 的隊列,以及調度器的一些狀態信息,確保整個系統的高效調度。
調度的實現
調度模型圖
如圖所示,有兩個物理線程 M,每個 M 綁定一個處理器 P,并運行一個 Goroutine。
- P 的數量可以通過 GOMAXPROCS() 設置。它實際上表示真正的并發級別,即可以同時運行的 Goroutine 數量。
- 圖中灰色的 Goroutine 尚未運行,處于就緒狀態,等待被調度。P 維護了這些 Goroutine 的隊列(稱為運行隊列 runqueue)。
- 在 Go 語言中,啟動一個 Goroutine 非常簡單:只需使用 go function。每次執行 go 語句時,都會將一個 Goroutine 添加到運行隊列末尾。在下一個調度點,會從運行隊列中取出一個 Goroutine 執行。
當某個操作系統線程(如 M0)被阻塞時(如下圖所示),P 會切換到另一個線程(如 M1)。M1 可能是新創建的,也可能是從線程緩存中取出的。
線程阻塞切換圖
當 M0 返回時,它需要嘗試獲取一個 P 來運行 Goroutine。如果無法獲取 P,它會將 Goroutine 放入全局運行隊列,并進入休眠狀態(進入線程緩存)。所有 P 會定期檢查全局運行隊列,并運行其中的 Goroutine;否則,全局運行隊列中的 Goroutine 將永遠無法執行。
III. Goroutine 的使用
基本用法
設置 Goroutine 的運行 CPU 數量。Go 的最新版本默認會自動設置。
num := runtime.NumCPU() // 獲取主機的邏輯 CPU 數量
runtime.GOMAXPROCS(num) // 根據主機 CPU 數量設置 Goroutine 的最大并發級別
使用示例
示例 1:簡單的 Goroutine 計算
package main
import (
"fmt"
"time"
)
func cal(a int, b int) {
c := a + b
fmt.Printf("%d + %d = %d\n", a, b, c)
}
func main() {
for i := 0; i < 10; i++ {
go cal(i, i+1) // 啟動 10 個 Goroutine 進行計算
}
time.Sleep(time.Second * 2) // 等待所有任務完成
}
運行結果:
8 + 9 = 17
9 + 10 = 19
4 + 5 = 9
...
Goroutine 異常捕獲
當啟動多個 Goroutine 時,如果其中一個發生異常且未處理,整個程序會終止。因此,建議在每個 Goroutine 的函數中添加異常處理。可以使用 recover 函數捕獲異常。
package main
import (
"fmt"
"time"
)
func addele(a []int, i int) {
deferfunc() {
if err := recover(); err != nil {
fmt.Println("add ele fail")
}
}()
a[i] = i
fmt.Println(a)
}
func main() {
Arry := make([]int, 4)
for i := 0; i < 10; i++ {
go addele(Arry, i)
}
time.Sleep(time.Second * 2)
}
運行結果:
add ele fail
[0 0 0 0]
[0 1 0 0]
...
Goroutine 的同步
由于 Goroutine 是異步執行的,主程序可能在 Goroutine 完成前退出。為確保所有 Goroutine 完成后再退出,Go 提供了 sync 包和 channel 來解決同步問題。
示例 1:使用 sync.WaitGroup 同步 Goroutine
package main
import (
"fmt"
"sync"
)
func cal(a int, b int, n *sync.WaitGroup) {
c := a + b
fmt.Printf("%d + %d = %d\n", a, b, c)
defer n.Done()
}
func main() {
var go_sync sync.WaitGroup
for i := 0; i < 10; i++ {
go_sync.Add(1)
go cal(i, i+1, &go_sync)
}
go_sync.Wait()
}
運行結果:
9 + 10 = 19
2 + 3 = 5
...
Goroutine 間的通信
Goroutine 本質上是協程,可以通過 channel 實現通信或數據共享。
示例:使用 channel 模擬生產者-消費者模式
package main
import (
"fmt"
"sync"
)
func Productor(mychan chan int, data int, wait *sync.WaitGroup) {
mychan <- data
fmt.Println("product data:", data)
wait.Done()
}
func Consumer(mychan chan int, wait *sync.WaitGroup) {
a := <-mychan
fmt.Println("consumer data:", a)
wait.Done()
}
func main() {
datachan := make(chanint, 100)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go Productor(datachan, i, &wg)
}
for j := 0; j < 10; j++ {
wg.Add(1)
go Consumer(datachan, &wg)
}
wg.Wait()
}
運行結果:
product data: 0
consumer data: 0
...