聊聊 Go 流水線編程模式
文末本文轉(zhuǎn)載自微信公眾號「Golang技術分享」,作者機器鈴砍菜刀 。轉(zhuǎn)載本文請聯(lián)系Golang技術分享公眾號。
流水線工作模型在工業(yè)領域內(nèi)十分常見,它將工作流程分為多個環(huán)節(jié),每個環(huán)節(jié)根據(jù)工作強度安排合適的人員數(shù)量。良好的流水線設計盡量讓各環(huán)節(jié)的流通率平衡,最大化提高產(chǎn)能效率。
Go 是一門實用性語言,流水線工作模型與 Go 融合地非常融洽,只不過我們一般使用另一個名詞來表示流水線:pipeline。
pipeline
pipeline 由多個環(huán)節(jié)組成,具體在 Go 中,環(huán)節(jié)之間通過 channel 通信,同一個環(huán)節(jié)任務可以由多個 goroutine 來同時處理。
pipeline
pipeline 的核心是數(shù)據(jù),通過 channel 來保證數(shù)據(jù)流動,每個環(huán)節(jié)的數(shù)據(jù)處理由 goroutine 完成。
除了開始環(huán)節(jié)和結束環(huán)節(jié),每個環(huán)節(jié)都有任意數(shù)量的輸入 channel 和輸出 channel。開始環(huán)節(jié)被稱為發(fā)送者或生產(chǎn)者,結束環(huán)節(jié)被稱為接收者或消費者。
下面我們來看一個簡單的 pipeline 例子,分為三個環(huán)節(jié)。
第一個環(huán)節(jié),generate 函數(shù):它充當生產(chǎn)者角色,將數(shù)據(jù)寫入 channel,并把該 channel 返回。當所有數(shù)據(jù)寫入完畢,關閉 channel。
- func generate(nums ...int) <-chan int {
- out := make(chan int)
- go func() {
- for _, n := range nums {
- out <- n
- }
- close(out)
- }()
- return out
- }
第二個環(huán)節(jié),square 函數(shù):它是數(shù)據(jù)處理的角色,從開始環(huán)節(jié)中的 channel 取出數(shù)據(jù),計算平方,將結果寫入新的 channel ,并把該新的 channel 返回。當所有數(shù)據(jù)計算完畢,關閉該新 channel。
- func square(in <-chan int) <-chan int {
- out := make(chan int)
- go func() {
- for n := range in {
- out <- n * n
- }
- close(out)
- }()
- return out
- }
main 函數(shù)負責編排整個 pipeline ,并充當消費者角色:讀取第二個環(huán)節(jié)的 channel 數(shù)據(jù),打印出來。
- func main() {
- // Set up the pipeline.
- c := generate(2, 3)
- out := square(c)
- // Consume the output.
- for n := range out {
- fmt.Println(n)
- }
- }
Fan-out,fan-in
在上述例子中,環(huán)節(jié)之間通過非緩沖的 channel 傳遞數(shù)據(jù),節(jié)點中的數(shù)據(jù)都是單個 goroutine 處理與消費。
這種工作模式并不高效,會讓整個流水線的效率取決于最慢的環(huán)節(jié)。因為每個環(huán)節(jié)中的任務量是不同的,這意味著我們需要的機器資源是存在差異的。任務量小的環(huán)節(jié),盡量占有少量的機器資源,任務量重的環(huán)節(jié),需要更多線程并行處理。
以汽車組裝為例,我們可以將組裝輪胎的工作分發(fā)給 4 個人一起干,當輪胎組裝完畢之后,再交由剩下的環(huán)節(jié)。
多個 goroutine 可以從同一個 channel 讀取數(shù)據(jù),直到該通道關閉,這稱為 fan-out(扇出)。
這個稱呼比較形象,它將數(shù)據(jù)進行分散,所以被稱為扇出。扇出是一種分發(fā)任務的模式。
fan-out
單個 goroutine 可以從多個輸入 channel 中讀取數(shù)據(jù),直到所有輸入都關閉。具體做法是將輸入 channel 多路復用到同一個 channel 上,當所有輸入 channel 都關閉時,該 channel 也關閉,這稱為 fan-in(扇入)。
它將數(shù)據(jù)進行聚合,所以被稱為扇入。扇入是一種整合任務結果的模式。
fan-in
在汽車組裝的例子中,分發(fā)輪胎任務給每個人是 Fan-out,合并輪胎組裝結果就是 Fan-in。
channel 的多路復用
扇出的編碼模型比較簡單,本文不多研究,我們提供一個扇入編程示例。
創(chuàng)建一個生成器函數(shù) generate,通過 interval 參數(shù)控制消息生成頻率。生成器返回消息 channel mc與停止 channel sc,停止 channel 用于停止生成器任務。
- func generate(message string, interval time.Duration) (chan string, chan struct{}) {
- mc := make(chan string)
- sc := make(chan struct{})
- go func() {
- defer func() {
- close(sc)
- }()
- for {
- select {
- case <-sc:
- return
- default:
- time.Sleep(interval)
- mc <- message
- }
- }
- }()
- return mc, sc
- }
stopGenerating 函數(shù)通過通過向 sc 中傳入空結構體,通知 generate退出,調(diào)用 close(mc) 關閉消息 channel
- func stopGenerating(mc chan string, sc chan struct{}) {
- sc <- struct{}{}
- close(mc)
- }
多路復用函數(shù) multiplex 創(chuàng)建并返回整合消息 channel 和控制并發(fā)的 wg。
- func multiplex(mcs ...chan string) (chan string, *sync.WaitGroup) {
- mmc := make(chan string)
- wg := &sync.WaitGroup{}
- for _, mc := range mcs {
- wg.Add(1)
- go func(mc chan string, wg *sync.WaitGroup) {
- defer wg.Done()
- for m := range mc {
- mmc <- m
- }
- }(mc, wg)
- }
- return mmc, wg
- }
在 main 函數(shù)中,創(chuàng)建兩個消息 channel 并復用它們生成 mmc ,打印來自 mmc 的每條消息。另外,我們還實現(xiàn)了接收系統(tǒng)斷信號(終端上執(zhí)行 CTRL+C 即可發(fā)送中斷信號)的優(yōu)雅的關閉機制。
- func main() {
- // create two sample message and stop channels
- mc1, sc1 := generate("message from generator 1", 200*time.Millisecond)
- mc2, sc2 := generate("message from generator 2", 300*time.Millisecond)
- // multiplex message channels
- mmc, wg1 := multiplex(mc1, mc2)
- // create errs channel for graceful shutdown
- errs := make(chan error)
- // wait for interrupt or terminate signal
- go func() {
- sc := make(chan os.Signal, 1)
- signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM)
- errs <- fmt.Errorf("%s signal received", <-sc)
- }()
- // wait for multiplexed messages
- wg2 := &sync.WaitGroup{}
- wg2.Add(1)
- go func() {
- defer wg2.Done()
- for m := range mmc {
- fmt.Println(m)
- }
- }()
- // wait for errors
- if err := <-errs; err != nil {
- fmt.Println(err.Error())
- }
- // stop generators
- stopGenerating(mc1, sc1)
- stopGenerating(mc2, sc2)
- wg1.Wait()
- // close multiplexed messages channel
- close(mmc)
- wg2.Wait()
- }
總結
本文簡單介紹了流水線編程模式,它和我們熟悉的生產(chǎn)者-消費者模式非常相似。
具體到 Go 編程實踐中,pipeline 將數(shù)據(jù)流分為多個環(huán)節(jié),channel 用于數(shù)據(jù)流動,goroutine 用于處理數(shù)據(jù)。fan-out 用于分發(fā)任務,fan-in 用于數(shù)據(jù)整合,通過 FAN 模式可以讓流水線更好地并發(fā)。
當然,還有些細節(jié)需要注意,例如停止通知機制,可參照本文 channel 的多路復用章節(jié)示例中的 stopGenerating 函數(shù);如何通過 sync.WaitGroup 做好并發(fā)控制,這些都是需要讀者在實際編碼中去體會掌握的。
參考
Go Concurrency Patterns: Pipelines and cancellation:https://go.dev/blog/pipelines
Multiplexing Channels In Go:https://medium.com/@ermanimer/multiplexing-channels-in-go-a7dccdcc4134