通俗易懂剖析Go Channel:理解并發通信的核心機制
我們在學習與使用Go語言的過程中,對channel并不陌生,channel是Go語言與眾不同的特性之一,也是非常重要的一環,深入理解Channel,相信能夠在使用的時候更加的得心應手。
一、Channel基本用法
1、channel類別
channel在類型上,可以分為兩種:
- 雙向channel:既能接收又能發送的channel
- 單向channel:只能發送或只能接收的channel,即單向channel可以為分為:
只寫channel
只讀channel
聲明并初始化如下如下:
func main() {
// 聲明并初始化
var ch chan string = make(chan string) // 雙向channel
var readCh <-chan string = make(<-chan string) // 只讀channel
var writeCh chan<- string = make(chan<- string) // 只寫channel
}
上述定義中,<-表示單向的channel。如果箭頭指向chan,就表示只寫channel,可以往chan里邊寫入數據;如果箭頭遠離chan,則表示為只讀channel,可以從chan讀數據。
在定義channel時,可以定義任意類型的channel,因此也同樣可以定義chan類型的channel。例如:
a := make(chan<- chan int) // 定義類型為 chan int 的寫channel
b := make(chan<- <-chan int) // 定義類型為 <-chan int 的寫channel
c := make(<-chan <-chan int) // 定義類型為 <-chan int 的讀channel
d := make(chan (<-chan int)) // 定義類型為 (<-chan int) 的讀channel
當channel未初始化時,其零值為nil。nil 是 chan 的零值,是一種特殊的 chan,對值是 nil 的 chan 的發送接收調用者總是會阻塞。
func main() {
var ch chan string
fmt.Println(ch) // <nil>
}
通過make我們可以初始化一個channel,并且可以設置其容量的大小,如下初始化了一個類型為string,其容量大小為512的channel:
var ch chan string = make(chan string, 512)
當初始化定義了channel的容量,則這樣的channel叫做buffered chan,即**有緩沖channel。如果沒有設置容量,channel的容量為0,這樣的channel叫做unbuffered chan,即無緩沖channel**。
有緩沖channel中,如果channel中還有數據,則從這個channel接收數據時不會被阻塞。如果channel的容量還未滿,那么向這個channel發送數據也不會被阻塞,反之則會被阻塞。
無緩沖channel則只有當讀寫操作都準備好后,才不會阻塞,這也是unbuffered chan在使用過程中非常需要注意的一點,否則可能會出現常見的bug。
channel的常見操作:
發送數據
往channel發送一個數據使用ch <-
func main() {
var ch chan int = make(chan int, 512)
ch <- 2000
}
上述的ch可以是chan int類型,也可以是單向chan <-int。
接收數據
從channel接收一條數據可以使用<-ch
func main() {
var ch chan int = make(chan int, 512)
ch <- 2000 // 發送數據
data := <-ch // 接收數據
fmt.Println(data) // 2000
}
ch 類型是 chan T,也可以是單向<-chan T
在接收數據時,可以返回兩個返回值。第一個返回值返回channel中的元素,第二個返回值為bool類型,表示是否成功地從channel中讀取到一個值。
如果第二個參數是false,則表示channel已經被close而且channel中沒有緩存的數據,這個時候第一個值返回的是零值。
func main() {
var ch chan int = make(chan int, 512)
ch <- 2000 // 發送數據
data1, ok1 := <-ch // 接收數據
fmt.Printf("data1 = %d, ok1 = %t\n", data1, ok1) // data1 = 2000, ok1 = true
close(ch) // 關閉channel
data2, ok2 := <-ch // 接收數據
fmt.Printf("data2 = %d, ok2 = %t", data2, ok2) // data2 = 0, ok2 = false
}
所以,如果從channel讀取到一個零值,可能是發送操作真正發送的零值,也可能是closed關閉channel并且channel沒有緩存元素產生的零值,這是需要注意判別的一個點。
其他操作
Go內建的函數close、cap、len都可以對chan類型進行操作。
- close:關閉channel。
- cap:返回channel的容量。
- len:返回channel緩存中還未被取走的元素數量。
func main() {
var ch chan int = make(chan int, 512)
ch <- 100
ch <- 200
fmt.Println("ch len:", len(ch)) // ch len: 2
fmt.Println("ch cap:", cap(ch)) // ch cap: 512
}
發送操作與接收操作可以作為select語句中的case clause,例如:
func main() {
var ch = make(chan int, 512)
for i := 0; i < 10; i++ {
select {
case ch <- i:
case v := <-ch:
fmt.Println(v)
}
}
}
for-range語句同樣可以在chan中使用,例如:
func main() {
var ch = make(chan int, 512)
ch <- 100
ch <- 200
ch <- 300
for v := range ch {
fmt.Println(v)
}
}
// 執行結果
100
200
300
2、select介紹
在Go語言中,select語句用于監控一組case語句,根據特定的條件執行相對應的case語句或default語句,與switch類似,但不同之處在于select語句中所有case中的表達式都必須是channel的發送或接收操作。select使用示例代碼如下:
select {
case <-ch1:
fmt.Println("ch1")
case ch2 <- 1:
fmt.Println("ch2")
}
上述代碼中,select關鍵字讓當前goroutine同時等待ch1 的可讀和ch2的可寫,在滿足任意一個case分支之前,select 會一直阻塞下去,直到其中的一個 channel 轉為就緒狀態時執行對應case分支的代碼。如果多個channel同時就緒的話則隨機選擇一個case執行。
當使用空select時,空的 select 語句會直接阻塞當前的goroutine,使得該goroutine進入無法被喚醒的永久休眠狀態。空select,即select內不包含任何case。
select{
}
另外當select語句內只有一個case分支時,如果該case分支不滿足,那么當前select就變成了一個阻塞的channel讀/寫操作。
select {
case <-ch1:
fmt.Println("ch1")
}
上述select中,當ch1可讀時,會執行打印操作,反之則阻塞當前goroutine。
當select語句內包含default分支時,如果select內的所有case都不滿足,則會執行default分支的邏輯,用于當其他case都不滿足時執行一些默認操作。
select {
case <-ch1:
fmt.Println("ch1")
case ch2 <- 1:
fmt.Println("ch2")
default:
fmt.Println("default")
}
上述代碼中,當ch1可讀或ch2可寫時,會執行相應的打印操作,否則就執行default語句中的代碼,相當于一個非阻塞的channel讀取操作。
select的使用可以總結為:
- select不存在任何的case且沒有default分支:永久阻塞當前 goroutine;
- select只存在一個case且沒有default分支:阻塞的發送/接收;
- select存在多個case:隨機選擇一個滿足條件的case執行;
- select存在default,其他case都不滿足時:執行default語句中的代碼;
二、Channel實現原理
從代碼的角度剖析channel的實現,能夠讓我們更好的去使用channel。
我們可以從chan類型的數據結構、初始化以及三個操作發送、接收和關閉這幾個方面來了解channel。
1、chan數據結構
chan類型的數據結構定義位于runtime.hchan[1],其結構體定義如下:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
解釋一下上述各個字段的意義:
- qcount:表示chan中已經接收到的數據且還未被取走的元素個數。內建函數len可以返回這個字段的值。
- datasiz:循環隊列的大小。chan在實現上使用一個循環隊列來存放元素的個數,循環隊列適用于生產者-消費者的場景。
- buf:存放元素的循環隊列buffer,buf 字段是一個指向隊列緩沖區的指針,即指向一個dataqsiz元素的數組。buf 字段是使用 unsafe.Pointer 類型來表示隊列緩沖區的起始地址。unsafe.Pointer是一種特殊的指針類型,它可以用于指向任何類型的數據。由于隊列緩沖區的類型是動態分配的,所以不能直接使用某個具體類型的指針來表示。
- elemtype、elemsize:elemtype表示chan中元素的數據類型,elemsize表示其大小。當chan定義后,它的元素類型是固定的,即普通類型或者指針類型,因此元素大小也是固定的。
- sendx:處理發送數據操作的指針在buf隊列中的位置。當channel接收到了新的數據時,該指針就會加上elemsize,移動到下一個位置。buf 的總大小是elemsize的整數倍且buf是一個循環列表。
- recvx:處理接收數據操作的指針在buf隊列中的位置。當從buf中取出數據,此指針會移動到下一個位置。
- recvq:當接收操作發現channel中沒有數據可讀時,會被則色,此時會被加入到recvq隊列中。
- sendq:當發送操作發現buf隊列已滿時,會被進行阻塞,此時會被加入到sendq隊列中。
圖片
2、chan初始化
channel在進行初始化時,Go編譯器會根據是否傳入容量的大小,來選擇調用makechan64,還是makechan。makechan64在實現上底層還是調用makechan來進行初始化,makechan64只是對size做了檢查。
makechan函數根據chan的容量的大小和元素的類型不同,初始化不同的存儲空間。省略一些檢查代碼,makechan函數的主要邏輯如下:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
...
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
...
var c *hchan
switch {
case mem == 0:
// 隊列或元素大小為零,不必創建buf
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素不包含指針,分配一塊連續的內存給hchan數據結構和buf
// hchan數據結構后面緊接著就是buf,在一次調用中分配hchan和buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素包含指針,單獨分配buf
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 記錄元素大小、類型、容量
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
...
return c
}
3、send發送操作
Go在編譯發送數據給channel時,會把發送操作send轉換成chansend1函數,而chansend1函數會調用chansend函數。
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
我們可以來分段分析chansend函數的實現邏輯。
第一部分:
主要是對chan進行判斷,判斷chan是否為nil,若為nil,則判斷是否需要將當前goroutine進行阻塞,阻塞通過gopark來對調用者goroutine park(阻塞休眠)。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 第一部分
if c == nil { // 判斷chan是否為nil
if !block { // 判斷是否需要阻塞當前goroutine
return false
}
// 調用這goroutine park,進行阻塞休眠
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
}
第二部分
第二部分的邏輯判斷是當你往一個容量已滿的chan實例發送數據,且不想當前調用的goroutine被阻塞時(chan未被關閉),那么處理的邏輯是直接返回。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 第二部分
if !block && c.closed == 0 && full(c) {
return false
}
...
}
第三部分
第三部分的邏輯判斷是首先進行互斥鎖加鎖,然后判斷當前chan是否關閉,如果chan已經被close了,則釋放互斥鎖并panic,即對已關閉的chan發送數據會panic。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 第三部分
lock(&c.lock) // 開始加鎖
if c.closed != 0 { // 判斷channel是否關閉
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}
第四部分
第四部分的邏輯主要是判斷接收隊列中是否有正在等待的接收方receiver。如果存在正在等待的receiver(說明此時buf中沒有緩存的數據),則將他從接收隊列中彈出,直接將需要發送到channel的數據交給這個receiver,而無需放入到buf中,讓發送操作速度更快一些。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 第四部分
if sg := c.recvq.dequeue(); sg != nil {
// 找到了一個正在等待的接收者。我們傳遞我們想要發送的值
// 直接傳遞給receiver接收者,繞過channel buf緩存區(如果receiver有的話)
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
}
第五部分
當等待隊列中并沒有正在等待的receiver,則說明當前buf還沒有滿,此時將發送的數據放入到buf中。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 第五部分
if c.qcount < c.dataqsiz { // 判斷buf是否滿了
// channel buf還有可用的空間. 將發送數據入buf循環隊列.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}
第六部分
當邏輯走到第六部分,說明正在處理buf已滿的情況。如果buf已滿,則發送操作的goroutine就會加入到發送者的等待隊列,直到被喚醒。當goroutine被喚醒時,數據或者被取走了,或者chan已經被關閉了。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 第六部分
// chansend1函數調用不會進入if塊里,因為chansend1的block=true
if !block {
unlock(&c.lock)
return false
}
...
c.sendq.enqueue(mysg) // 加入發送隊列
...
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 阻塞
...
}
4、recv接收操作
從channel中接收數據時,Go會將代碼轉換成chanrecv1函數。如果需要返回兩個返回值,則會轉換成chanrecv2,chanrecv1函數和chanrecv2都會調用chanrecv函數。chanrecv1和chanrecv2傳入的 block參數的值是true,兩種調用都是阻塞方式,因此在分析chanrecv函數的實現時,可以不考慮 block=false的情況。
// 從已編譯代碼中進入 <-c 的入口點
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
同樣,省略一些檢查類的代碼,我們也可以分段分析chanrecv函數的邏輯。
第一部分
第一部分主要判斷當前進行接收操作的chan實例是否為nil,若為nil,則從nil chan中接收數據的調用這goroutine會被阻塞。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 第一部分
if c == nil { // 判斷chan是否為nil
if !block { // 是否阻塞,默認為block=true
return
}
// 進行阻塞
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
}
第二部分
這一部分只要是考慮block=false且c為空的情況,block=false的情況我們可以不做考慮。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 檢查未獲得鎖的失敗非阻塞操作。
if !block && empty(c) {
...
}
...
}
第三部分
第三部分的邏輯為判斷當前chan是否被關閉,若當前chan已經被close了,并且緩存隊列中沒有緩沖的元素時,返回true、false。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
lock(&c.lock) // 加鎖,返回時釋放鎖
// 第三部分
if c.closed != 0 { // 當chan已被關閉時
if c.qcount == 0 { // 且 buf區 沒有緩存的數據了
...
unlock(&c.lock) // 解鎖
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}
第四部分
第四部分是處理通道未關閉且buf緩存隊列已滿的情況。只有當緩存隊列已滿時,才能夠從發送等待隊列獲取到sender。若當前的chan為unbuffer的chan,即無緩沖區channel時,則直接將sender的發送數據傳遞給receiver。否則就從緩存隊列的頭部讀取一個元素值,并將獲取的sender攜帶的值加入到buf循環隊列的尾部。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 { // 當chan已被關閉時
} else { // 第四部分,通道未關閉
// 如果sendq隊列中有等待發送的sender
if sg := c.sendq.dequeue(); sg != nil {
// 存在正在等待的sender,如果緩存區的容量為0則直接將發送方的值傳遞給接收方
// 反之,則從緩存隊列的頭部獲取數據,并將獲取的sender的發送值加入到緩存隊列尾部
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}
第五部分
第五部分的主要邏輯是處理發送隊列中沒有等待的sender且buf中有緩存的數據。該段邏輯與外出的互斥鎖共用一把鎖,因此不存在并發問題。當buf緩存區有緩存元素時,則取出該元素傳遞給receiver,同時移動接收指針。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 第五部分
if c.qcount > 0 { // 發送隊列中沒有等待的sender,且buf中有緩存數據
// 直接從緩存隊列中獲取數據
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++ // 移動接收指針
if c.recvx == c.dataqsiz { // 指針若已到末尾則進行重置(循環隊列)
c.recvx = 0
}
c.qcount-- // 獲取數據后,buf緩存區元素個數減一
unlock(&c.lock) // 解鎖
return true, true
}
if !block { // block=true
unlock(&c.lock)
return false, false
}
...
}
第六部分
第六部分的邏輯主要是處理buf緩存區中沒有緩存數據的情況。當buf緩存區沒有緩存數據時,那么當前的receiver就會被阻塞,直到它從sender中接收了數據,或者是chan被close,才會返回。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
c.recvq.enqueue(mysg) // 將當前接收操作入接收隊列
...
// 進行阻塞,等待喚醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
}
5、close關閉
close函數主要用于channel的關閉,Go編譯器會替換成closechan函數的調用。省略一些檢查下的代碼后,closechan函數的主要邏輯如下:
- 如果當前chan為nil,則直接panic
- 如果當前chan已關閉,再次close則直接panic
- 如果chan不為nil,chan也沒有closed,就把等待隊列中的 sender(writer)和 receiver(reader)從隊列中全部移除并喚醒。
func closechan(c *hchan) {
if c == nil { // 若當前chan未nil,則直接panic
panic(plainError("close of nil channel"))
}
lock(&c.lock) // 加鎖
if c.closed != 0 { // 若當前chan已經關閉,則直接panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
...
c.closed = 1 // 設置當前channel的狀態為已關閉
var glist gList
// 釋放接收隊列中所有的reader
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 釋放發送隊列中所有的writer (它們會panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
三、總結
通過學習channel的基本使用,了解其操作背后的實現原理,可以幫助我們更好的使用channel,避免一些操作不當而導致的panic或者說是bug,讓我們在使用channel時能夠更加的得心應手。
channel的值和狀態有多種情況,而不同的操作(send、recv、close)又可能得到不同的結果,這是使用 channel 類型時需要經常注意的點,我們可以將不同channel值下的不同操作進行一個總結,特別注意操作channel時會產生panic的情況,已經可能會導致線程阻塞的情況,都是有可能導致死鎖與goroutine泄漏的罪魁禍首。
channel執行操作\channel狀態 | channel為nil | channel buf為空 | channel buf已滿 | channel buf未滿且不為空 | channel已關閉 |
| 阻塞 | 阻塞 | 讀取數據 | 讀取數據 | 返回buf中緩存的數據 |
| 阻塞 | 寫入數據 | 阻塞 | 寫入數據 | panic |
| panic | 關閉channel,buf中沒有緩存數據 | 關閉channel,保留已緩存的數據 | 關閉channel,保留已緩存的數據 | panic |
原文鏈接
小韜同學 的掘金賬號:Serena[2]
深入解析Go Channel的神秘原理[3]
本文轉載自微信公眾號「 程序員升級打怪之旅」,作者「小韜&王中陽」,可以通過以下二維碼關注。
轉載本文請聯系「 程序員升級打怪之旅」公眾號。