給你簡單介紹一下協程和管道的使用,「點到為止」,要么就上來給你寫GPM模型,看的人「一臉懵逼」,所以我以「實際使用過程中遇到的問題」這個角度出發,可能會分多篇總結一下golang的協程相關的知識點,希望對你有用。

golang的協程相信大家都不陌生,在golang中的使用也很簡單,只要加上一個關鍵字「go」即可,雖然說大家都知道,但是真的在實際使用中又遇到這樣那樣的問題,坑其實還是挺多的。而網上很多文章和教程,要么就是講的太簡單,給你簡單介紹一下協程和管道的使用,「點到為止」,要么就上來給你寫GPM模型,看的人「一臉懵逼」,所以我以「實際使用過程中遇到的問題」這個角度出發,可能會分多篇總結一下golang的協程相關的知識點,希望對你有用。
ps:如果你從來沒有了解過golang的協程,建議先自己搜一些資料簡單的了解一下,還有并發并行那些基礎概念之類的,本文都不會提及。
協程非常容易引發并發問題?
我們先看下列程序:
func main() {
res := make(map[int]int)
for i := 0; i < 100; i++ {
go handleMap(res)
}
time.Sleep(time.Second * 1)
}
func handleMap(res map[int]int) {
for i := 0; i < 200; i++ {
res[i] = i * i
}
}
- 因為map類型作為參數是直接以引用的方式傳遞的,所以handleMap函數不需要返回值,直接操作參數res即可。
- handleMap的作用就是不斷的給map賦值。
- 因為執行handleMap的時候是開啟協程的,所以是多個程序并發的去對res(map類型寫入),所以上述程序是會報錯的,輸出結果如下。
- 程序下方加上time.Sleep(time.Second * 1)的原因是因為主程序(main)執行完畢退出,但是協程還沒執行完畢會被直接關閉。
fatal error: concurrent map writes
goroutine 48 [running]:
runtime.throw(0x100f814d1, 0x15)
/opt/homebrew/Cellar/go@1.16/1.16.13/libexec/src/runtime/panic.go:1117 +0x54 fp=0x14000145f50 sp=0x14000145f20 pc=0x100f16f34
runtime.mapassign_fast64(0x100faeae0, 0x14000106180, 0x1f, 0x14000072200)
/opt/homebrew/Cellar/go@1.16/1.16.13/libexec/src/runtime/map_fast64.go:176 +0x2f8 fp=0x14000145f90 sp=0x14000145f50 pc=0x100ef7188
main.handleMap(0x14000106180)
/Users/test/Sites/beikego/test/rountine.go:22 +0x44 fp=0x14000145fd0 sp=0x14000145f90 pc=0x100f7e644
runtime.goexit()
解決方式(1) 加鎖?
如果有并發問題,我們最容易想到的一個辦法就是加鎖。
func main() {
res := make(map[int]int)
for i := 0; i < 100000; i++ {
go handleMap(res)
}
time.Sleep(time.Second * 1)
lock.Lock() //因為對map的讀取的時候有可能還在寫入,所以這里也需要加鎖
for _, item := range res {
fmt.Println(item)
}
lock.Unlock()
}
func handleMap(res map[int]int) {
lock.Lock() //每一個協程過來請求都先加鎖
for i := 0; i < 2000; i++ {
res[i] = i * i
}
lock.Unlock() //處理完map之后釋放鎖
}
上面過程我畫了一張圖,具體哪里為什么加鎖都有說明。

上述程序執行過程圖示
- 上述例子雖然開啟了100000個協程,但是在每個協程處理map的時候加上了一個lock,處理完畢才釋放,所以「各個協程對map的操作是隔離開的」。
- 在讀取map的時候加鎖的原因,是因為sleep 1s之后,有可能map還在寫入,邊讀邊寫當然會有并發問題 上述方式雖然解決了并發問題,但是也存在一定的問題。主要是「需要sleep,而且sleep多長時間沒法確定」所以這里引入咱們的解決方式2,管道。
解決方式(2)管道channel?
channel本質就是一個「數據結構,隊列」。既然是隊列,當然有著「先進先出」的原則,而且是能保證「線程安全」的,多個gorountine訪問不需要加鎖。
當然如果你還沒有接觸過管道,可以提前找些資料了解一下,下面是一個管道的簡單示意圖。

管道在使用的過程中需要注意的問題
管道(channel)在使用的過程中有很多需要注意的點,我在這里列一下。
使用管道之前必須make一下,而且指定長度
var intChan chan int
intChan <- 1
fmt.Println(<-intChan)
//返回信息
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send (nil chan)]:
為什么需要make,前面文章已經講過,可以看看,??聊聊golang的make和new函數??指定長度也很好理解,「管道的本質是隊列」,隊列當然是需要指定長度的。
管道寫入的數據數如果超過管道長度,會報錯
intChan := make(chan int, 1) //長度為1
intChan <- 1
intChan <- 2 //這里會報錯
fmt.Println(<-intChan)
//返回結果
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
讀取空管道,會報錯
intChan := make(chan int, 1)
fmt.Println(<-intChan) //此時管道里面還沒有任何內容
//返回結果
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
管道也支持interface,但是拿到結構體具體的屬性的時候,需要斷言
type Person struct {
Name string
}
func main(){
personChan := make(chan interface{}, 10)
personChan <- Person{Name: "小飯"} //寫入結構體類型
personChan <- 1 //寫入int類型
personChan <- "test_string" //寫入string類型
fmt.Println(<-personChan, <-personChan, <-personChan)
}
//返回結果
{小飯} 1 test_string
上面例子我們可以看到,如果管道定義為interface類型,任何類型的數據都是可以寫入并且正常取出的,但是我們寫入「結構體類型」之后,如果想取出結構體的「具體屬性」,則需要斷言。
type Person struct {
Name string
}
func main() {
personChan := make(chan interface{}, 10)
personChan <- Person{Name: "小飯"}
person := <-personChan //取出結構體之后,此時還不知道是什么類型,所以沒法直接取屬性,因為定義的是interface
per := person.(Person) //對取出結果進行斷言
fmt.Println(per.Name)
}
//返回結果
小飯
管道是可以循環的,但是循環之前必須關閉,關閉之后不可寫入任何數據
personChan := make(chan int, 10)
personChan <- 1
personChan <- 2
personChan <- 3
close(personChan) //關閉之后管道不能寫入任何數據,否則就會報 panic: send on closed channel
for item := range personChan { //在for range循環管道之前必須關閉管道,否則會報 fatal error: all goroutines are asleep - deadlock!
fmt.Println(item)
}
- 其實為什么循環之前需要關閉管道,很好理解,因為for rang循環可以簡單理解為一個死循環,當管道數據讀取完了之后會繼續讀取,類似于讀取一個空管道,當然會報錯。
- 管道關閉之后不能寫入更好理解,一個對象銷毀了還能去賦值么?一樣的道理。
切忌不要嘗試用for(i:=0;i<len(chan):i++)的方式去循環
這個很好理解,我就不用代碼演示了,因為每次從管道中取一個數據,len(chan)是變化的,所以這么取數據肯定是有問題的。換句話說也就是「不要隨便用len(chan),坑很多」。
協程和管道的綜合使用
我們前面拋出的問題是,「開啟協程操作map會引發并發問題」,現在我們看看怎么用管道解決他。

協程和管道配合解決map寫入并發問題
- 注意這里用到了「兩個管道」,管道「chan map是用于map的讀寫用的」,「exitChan是用于告訴main函數可以退出用的」、
- 首先開啟一個writeMap的協程,把map數據都寫入到管道(chan map)中,需要注意的是數據寫完之后需要把協程關閉掉、
- 在開啟一個readMap的協程,把管道中(chan map)數據一個一個的讀出來、
- 當readMap把數據全部讀取完成中后,給main函數發送一個信號(也就是往exitChan中寫一條數據)、
- main函數監聽exitChan,收到數據直接退出即可。
var chanMap chan map[int]int
var exitChan chan int
func main() {
size := 50000
chanMap := make(chan map[int]int, size)
exitChan := make(chan int, 1)
go WriteMap(chanMap, size) //開啟寫map協程
go ReadMap(chanMap, exitChan) //開啟讀map協程
for {
exit := <-exitChan //監聽exitChan 收到信號直接return即可
if exit != 0 {
return
}
}
}
//寫map數據
func WriteMap(chanMap chan map[int]int, size int) {
for i := 1; i <= size; i++ {
temp := make(map[int]int, 1)
temp[i] = i
chanMap <- temp
fmt.Println("寫入數據:", temp)
}
close(chanMap) //注意數據寫完需要關閉管道
}
//讀map數據
func ReadMap(chanMap chan map[int]int, exitChan chan int) {
for {
val, ok := <-chanMap
if !ok {
break
}
fmt.Println("讀取到:", val)
}
exitChan <- 1 //數據讀取完畢通知main函數可退出
}
協程和管道到底能提升多高的效率?
咱們用協程的目的就是想提高程序的運行效率,管道可以簡單理解為是協助協程一起使用的,但是效率到底能提升多少呢?咱們一起來看一看。
判斷素數
大家都知道,判斷素數的復雜度是N2,比較慢,咱們先看一看傳統的一個一個的去判斷需要多長時間
判斷100000以內的數字哪些是素數
func CheckPrime(num int) bool { //判斷一個數字是否是素數
res := true
for i := 2; i < num; i++ {
if num%i == 0 {
res = false
}
}
return res
}
func main(){
t := time.Now()
size := 100000
for i := 0; i < size; i++ {
if CheckPrime(i) {
fmt.Println(i, "是素數")
}
}
elapsed := time.Since(t)
fmt.Println("app elapsed:", elapsed)
return
}
「上述程序運行了3.33秒多,看來還是比較慢的」
接下來我們用協程和管道的方式看看,還是老規矩,我們先看看流程圖。

?協程和管道配合查找素數
- 先把每個需要判斷的數字寫入initChan。
- 開啟「多個協程拉取initChan的數據一個一個的判斷」,這一步是程序速度加快的關鍵,如果不是素數,不處理即可,如果是素數,就寫入PrimeChan,判斷完之后寫入exitChan,通知主程序即可。
- 主程序監聽primeChan并輸出,同時監聽exitChan,收到信號退出即可。
//初始化,把需要被判斷的數字寫入initChan
func initChan(intChan chan int, size int) {
for i := 1; i <= size; i++ {
intChan <- i
}
close(intChan)
}
//讀取initChan中的數據,一個一個的判斷,如果是素數,就寫入PrimeChan,并且寫入exitChan
func CheckPrimeChan(intChan, primeChan chan int, exitChan chan bool) {
for {
num, ok := <-intChan
if !ok {
break
}
if CheckPrime(num) {
primeChan <- num
}
}
exitChan <- true
}
func main() {
t := time.Now()
size := 100000
intChan := make(chan int, size)
primeChan := make(chan int, size)
exitChan := make(chan bool, 1)
go initChan(intChan, size) //初始化initChan
checkChannelNum := 8
for i := 0; i < checkChannelNum; i++ { //開啟8個協程同時拉取initChan的數據并判斷是否是素數
go CheckPrimeChan(intChan, primeChan, exitChan)
}
go func() {
for i := 0; i < checkChannelNum; i++ {
<-exitChan
}
close(primeChan)
}()
for {
value, ok := <-primeChan
if !ok {
break
}
fmt.Println(value, "是素數")
}
elapsed := time.Since(t)
fmt.Println("app elapsed:", elapsed)
}
//程序執行消耗時間
848.455084m
上述程序執行時間為「848.455084ms」,是傳統的方式的時間的「四分之一」,可見協程在提高運行效率這塊的作用還是顯而易見的。