Go語(yǔ)言下的并發(fā)編程:Goroutine,Channel 和 Sync
優(yōu)雅的并發(fā)編程范式,完善的并發(fā)支持,出色的并發(fā)性能是 Go 語(yǔ)言區(qū)別于其他語(yǔ)言的一大特色。
在當(dāng)今這個(gè)多核時(shí)代,并發(fā)編程的意義不言而喻。使用 Go 開(kāi)發(fā)并發(fā)程序,操作起來(lái)非常簡(jiǎn)單,語(yǔ)言級(jí)別提供關(guān)鍵字 go 用于啟動(dòng)協(xié)程,并且在同一臺(tái)機(jī)器上可以啟動(dòng)成千上萬(wàn)個(gè)協(xié)程。
下面就來(lái)詳細(xì)介紹。
goroutine
Go 語(yǔ)言的并發(fā)執(zhí)行體稱(chēng)為 goroutine,使用關(guān)鍵詞 go 來(lái)啟動(dòng)一個(gè) goroutine。
go 關(guān)鍵詞后面必須跟一個(gè)函數(shù),可以是有名函數(shù),也可以是無(wú)名函數(shù),函數(shù)的返回值會(huì)被忽略。
go 的執(zhí)行是非阻塞的。
先來(lái)看一個(gè)例子:
- package main
- import (
- "fmt"
- "time"
- )
- func main() {
- go spinner(100 * time.Millisecond)
- const n = 45
- fibN := fib(n)
- fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN) // Fibonacci(45) = 1134903170
- }
- func spinner(delay time.Duration) {
- for {
- for _, r := range `-\|/` {
- fmt.Printf("\r%c", r)
- time.Sleep(delay)
- }
- }
- }
- func fib(x int) int {
- if x < 2 {
- return x
- }
- return fib(x-1) + fib(x-2)
- }
從執(zhí)行結(jié)果來(lái)看,成功計(jì)算出了斐波那契數(shù)列的值,說(shuō)明程序在 spinner 處并沒(méi)有阻塞,而且 spinner 函數(shù)還一直在屏幕上打印提示字符,說(shuō)明程序正在執(zhí)行。
當(dāng)計(jì)算完斐波那契數(shù)列的值,main 函數(shù)打印結(jié)果并退出,spinner 也跟著退出。
再來(lái)看一個(gè)例子,循環(huán)執(zhí)行 10 次,打印兩個(gè)數(shù)的和:
- package main
- import "fmt"
- func Add(x, y int) {
- z := x + y
- fmt.Println(z)
- }
- func main() {
- for i := 0; i < 10; i++ {
- go Add(i, i)
- }
- }
有問(wèn)題了,屏幕上什么都沒(méi)有,為什么呢?
這就要看 Go 程序的執(zhí)行機(jī)制了。當(dāng)一個(gè)程序啟動(dòng)時(shí),只有一個(gè) goroutine 來(lái)調(diào)用 main 函數(shù),稱(chēng)為主 goroutine。新的 goroutine 通過(guò) go 關(guān)鍵詞創(chuàng)建,然后并發(fā)執(zhí)行。當(dāng) main 函數(shù)返回時(shí),不會(huì)等待其他 goroutine 執(zhí)行完,而是直接暴力結(jié)束所有 goroutine。
那有沒(méi)有辦法解決呢?當(dāng)然是有的,請(qǐng)往下看。
channel
一般寫(xiě)多進(jìn)程程序時(shí),都會(huì)遇到一個(gè)問(wèn)題:進(jìn)程間通信。常見(jiàn)的通信方式有信號(hào),共享內(nèi)存等。goroutine 之間的通信機(jī)制是通道 channel。
使用 make 創(chuàng)建通道:
- ch := make(chan int) // ch 的類(lèi)型是 chan int
通道支持三個(gè)主要操作:send,receive 和 close。
- ch <- x // 發(fā)送
- x = <-ch // 接收
- <-ch // 接收,丟棄結(jié)果
- close(ch) // 關(guān)閉
無(wú)緩沖 channel
make 函數(shù)接受兩個(gè)參數(shù),第二個(gè)參數(shù)是可選參數(shù),表示通道容量。不傳或者傳 0 表示創(chuàng)建了一個(gè)無(wú)緩沖通道。
無(wú)緩沖通道上的發(fā)送操作將會(huì)阻塞,直到另一個(gè) goroutine 在對(duì)應(yīng)的通道上執(zhí)行接收操作。相反,如果接收先執(zhí)行,那么接收 goroutine 將會(huì)阻塞,直到另一個(gè) goroutine 在對(duì)應(yīng)通道上執(zhí)行發(fā)送。
所以,無(wú)緩沖通道是一種同步通道。
下面我們使用無(wú)緩沖通道把上面例子中出現(xiàn)的問(wèn)題解決一下。
- package main
- import "fmt"
- func Add(x, y int, ch chan int) {
- z := x + y
- ch <- z
- }
- func main() {
- ch := make(chan int)
- for i := 0; i < 10; i++ {
- go Add(i, i, ch)
- }
- for i := 0; i < 10; i++ {
- fmt.Println(<-ch)
- }
- }
可以正常輸出結(jié)果。
主 goroutine 會(huì)阻塞,直到讀取到通道中的值,程序繼續(xù)執(zhí)行,最后退出。
緩沖 channel
創(chuàng)建一個(gè)容量是 5 的緩沖通道:
- ch := make(chan int, 5)
緩沖通道的發(fā)送操作在通道尾部插入一個(gè)元素,接收操作從通道的頭部移除一個(gè)元素。如果通道滿(mǎn)了,發(fā)送會(huì)阻塞,直到另一個(gè) goroutine 執(zhí)行接收。相反,如果通道是空的,接收會(huì)阻塞,直到另一個(gè) goroutine 執(zhí)行發(fā)送。
有沒(méi)有感覺(jué),其實(shí)緩沖通道和隊(duì)列一樣,把操作都解耦了。
單向 channel
類(lèi)型 chan<- int 是一個(gè)只能發(fā)送的通道,類(lèi)型 <-chan int 是一個(gè)只能接收的通道。
任何雙向通道都可以用作單向通道,但反過(guò)來(lái)不行。
還有一點(diǎn)需要注意,close 只能用在發(fā)送通道上,如果用在接收通道會(huì)報(bào)錯(cuò)。
看一個(gè)單向通道的例子:
- package main
- import "fmt"
- func counter(out chan<- int) {
- for x := 0; x < 10; x++ {
- out <- x
- }
- close(out)
- }
- func squarer(out chan<- int, in <-chan int) {
- for v := range in {
- out <- v * v
- }
- close(out)
- }
- func printer(in <-chan int) {
- for v := range in {
- fmt.Println(v)
- }
- }
- func main() {
- n := make(chan int)
- s := make(chan int)
- go counter(n)
- go squarer(s, n)
- printer(s)
- }
sync
sync 包提供了兩種鎖類(lèi)型:sync.Mutex 和 sync.RWMutex,前者是互斥鎖,后者是讀寫(xiě)鎖。
當(dāng)一個(gè) goroutine 獲取了 Mutex 后,其他 goroutine 不管讀寫(xiě),只能等待,直到鎖被釋放。
- package main
- import (
- "fmt"
- "sync"
- "time"
- )
- func main() {
- var mutex sync.Mutex
- wg := sync.WaitGroup{}
- // 主 goroutine 先獲取鎖
- fmt.Println("Locking (G0)")
- mutex.Lock()
- fmt.Println("locked (G0)")
- wg.Add(3)
- for i := 1; i < 4; i++ {
- go func(i int) {
- // 由于主 goroutine 先獲取鎖,程序開(kāi)始 5 秒會(huì)阻塞在這里
- fmt.Printf("Locking (G%d)\n", i)
- mutex.Lock()
- fmt.Printf("locked (G%d)\n", i)
- time.Sleep(time.Second * 2)
- mutex.Unlock()
- fmt.Printf("unlocked (G%d)\n", i)
- wg.Done()
- }(i)
- }
- // 主 goroutine 5 秒后釋放鎖
- time.Sleep(time.Second * 5)
- fmt.Println("ready unlock (G0)")
- mutex.Unlock()
- fmt.Println("unlocked (G0)")
- wg.Wait()
- }
RWMutex 屬于經(jīng)典的單寫(xiě)多讀模型,當(dāng)讀鎖被占用時(shí),會(huì)阻止寫(xiě),但不阻止讀。而寫(xiě)鎖會(huì)阻止寫(xiě)和讀。
- package main
- import (
- "fmt"
- "sync"
- "time"
- )
- func main() {
- var rwMutex sync.RWMutex
- wg := sync.WaitGroup{}
- Data := 0
- wg.Add(20)
- for i := 0; i < 10; i++ {
- go func(t int) {
- // 第一次運(yùn)行后,寫(xiě)解鎖。
- // 循環(huán)到第二次時(shí),讀鎖定后,goroutine 沒(méi)有阻塞,同時(shí)讀成功。
- fmt.Println("Locking")
- rwMutex.RLock()
- defer rwMutex.RUnlock()
- fmt.Printf("Read data: %v\n", Data)
- wg.Done()
- time.Sleep(2 * time.Second)
- }(i)
- go func(t int) {
- // 寫(xiě)鎖定下是需要解鎖后才能寫(xiě)的
- rwMutex.Lock()
- defer rwMutex.Unlock()
- Data += t
- fmt.Printf("Write Data: %v %d \n", Data, t)
- wg.Done()
- time.Sleep(2 * time.Second)
- }(i)
- }
- wg.Wait()
- }
總結(jié)
并發(fā)編程算是 Go 的特色,也是核心功能之一了,涉及的知識(shí)點(diǎn)其實(shí)是非常多的,本文也只是起到一個(gè)拋磚引玉的作用而已。
本文開(kāi)始介紹了 goroutine 的簡(jiǎn)單用法,然后引出了通道的概念。
通道有三種:
- 無(wú)緩沖通道
- 緩沖通道
- 單向通道
最后介紹了 Go 中的鎖機(jī)制,分別是 sync 包提供的 sync.Mutex(互斥鎖) 和 sync.RWMutex(讀寫(xiě)鎖)。
goroutine 博大精深,后面的坑還是要慢慢踩的。