并發編程包之 Errgroup
本文轉載自微信公眾號「Golang來啦」,作者Seekload 。轉載本文請聯系Golang來啦公眾號。
四哥水平有限,如果有翻譯或理解錯誤的點,煩請幫忙指出,感謝!
這是系列文章的第二篇,第一篇文章點擊這里查看。
原文如下:
基于 goroutine 和 channel 的并發特性,使得 Go 成為了強大的并發語言。上一篇文章,我們討論了如何構建 workerPool 來提高程序的并發性能,換句話說,避免耗盡系統資源。但那只是一個簡單的示例,演示我們應該如何實現。
基于對上一篇文章的學習,在這篇文章里面,我們將構建一個健壯的解決方案,以便在任何其他應用程序里面可以使用該方案。網絡上有其他復雜架構的解決方案,比如使用調度器等等。實際上,我們并不需要這些復雜的設計,僅僅使用一個共享 channel 就可以解決問題。我們一起來看下,該如何構建呢?
代碼結構
我們創建了一個通用的 workerPool 包,根據業務所需的并發性使用 worker 來處理任務。一起來看下目錄結構:
- workerpool
- ├── pool.go
- ├── task.go
- └── worker.go
workerpool 目錄在項目的根目錄下。Task 是需要處理單個工作單元;Worker 是一個簡單的 worker 函數,用于執行任務;而 Pool 用于創建、管理 workers。
實現
先看下 Task 代碼:
- // workerpool/task.go
- package workerpool
- import (
- "fmt"
- )
- type Task struct {
- Err error
- Data interface{}
- f func(interface{}) error
- }
- func NewTask(f func(interface{}) error, data interface{}) *Task {
- return &Task{f: f, Data: data}
- }
- func process(workerID int, task *Task) {
- fmt.Printf("Worker %d processes task %v\n", workerID, task.Data)
- task.Err = task.f(task.Data)
- }
Task 是一個簡單的結構體,保存處理任務所需要的一切數據。創建 task 時,傳遞了 Data 和待執行函數 f,process() 函數會處理任務。處理任務時,將 Data 作為參數傳遞給函數 f,并將執行結果保存在 Task.Err 里。
我們來看下 Worker 是如何處理任務的:
- // workerpool/worker.go
- package workerpool
- import (
- "fmt"
- "sync"
- )
- // Worker handles all the work
- type Worker struct {
- ID int
- taskChan chan *Task
- }
- // NewWorker returns new instance of worker
- func NewWorker(channel chan *Task, ID int) *Worker {
- return &Worker{
- ID: ID,
- taskChan: channel,
- }
- }
- // Start starts the worker
- func (wr *Worker) Start(wg *sync.WaitGroup) {
- fmt.Printf("Starting worker %d\n", wr.ID)
- wg.Add(1)
- go func() {
- defer wg.Done()
- for task := range wr.taskChan {
- process(wr.ID, task)
- }
- }()
- }
我們創建了一個小巧的 Worker 結構體,包含 worker ID 和 一個保存待處理任務的 channel。在 Start() 方法里,使用 for range 從 taskChan 讀取任務并處理。可以想象的到,多個 worker 可以并發地執行任務。
workerPool
我們通過實現 Task 和 Worker 來處理任務,但是好像還缺點什么東西,誰負責生成這些 worker 并將任務發送給它們?答案是:Worker Pool。
- // workerpoo/pool.go
- package workerpool
- import (
- "fmt"
- "sync"
- "time"
- )
- // Pool is the worker pool
- type Pool struct {
- Tasks []*Task
- concurrency int
- collector chan *Task
- wg sync.WaitGroup
- }
- // NewPool initializes a new pool with the given tasks and
- // at the given concurrency.
- func NewPool(tasks []*Task, concurrency int) *Pool {
- return &Pool{
- Tasks: tasks,
- concurrency: concurrency,
- collector: make(chan *Task, 1000),
- }
- }
- // Run runs all work within the pool and blocks until it's
- // finished.
- func (p *Pool) Run() {
- for i := 1; i <= p.concurrency; i++ {
- worker := NewWorker(p.collector, i)
- worker.Start(&p.wg)
- }
- for i := range p.Tasks {
- p.collector <- p.Tasks[i]
- }
- close(p.collector)
- p.wg.Wait()
- }
上面的代碼,pool 保存了所有待處理的任務,并且生成與 concurrency 數量一致的 goroutine,用于并發地處理任務。workers 之間共享緩存 channel -- collector。
所以,當我們把這個工作池跑起來時,可以生成滿足所需數量的 worker,workers 之間共享 collector channel。接著,使用 for range 讀取 tasks,并將讀取到的 task 寫入 collector 里。我們使用 sync.WaitGroup 實現協程之間的同步。現在我們有了一個很好的解決方案,一起來測試下。
- // main.go
- package main
- import (
- "fmt"
- "time"
- "github.com/Joker666/goworkerpool/workerpool"
- )
- func main() {
- var allTask []*workerpool.Task
- for i := 1; i <= 100; i++ {
- task := workerpool.NewTask(func(data interface{}) error {
- taskID := data.(int)
- time.Sleep(100 * time.Millisecond)
- fmt.Printf("Task %d processed\n", taskID)
- return nil
- }, i)
- allTask = append(allTask, task)
- }
- pool := workerpool.NewPool(allTask, 5)
- pool.Run()
- }
上面的代碼,創建了 100 個任務并且使用 5 個并發處理這些任務。
輸出如下:
- Worker 3 processes task 98
- Task 92 processed
- Worker 2 processes task 99
- Task 98 processed
- Worker 5 processes task 100
- Task 99 processed
- Task 100 processed
- Took ===============> 2.0056295s
處理 100 個任務花費了 2s,如何我們將并發數提高到 10,我們會看到處理完所有任務只需要大約 1s。
我們通過實現 workerPool 構建了一個健壯的解決方案,具有并發性、錯誤處理、數據處理等功能。這是個通用的包,不耦合具體的實現。我們可以使用它來解決一些大問題。
進一步擴展:后臺處理任務
實際上,我們還可以進一步擴展上面的解決方案,以便 worker 可以在后臺等待我們投遞新的任務并處理。為此,代碼需要做一些修改,Task 結構體保持不變,但是需要小改下 Worker,看下面代碼:
- // workerpool/worker.go
- // Worker handles all the work
- type Worker struct {
- ID int
- taskChan chan *Task
- quit chan bool
- }
- // NewWorker returns new instance of worker
- func NewWorker(channel chan *Task, ID int) *Worker {
- return &Worker{
- ID: ID,
- taskChan: channel,
- quit: make(chan bool),
- }
- }
- ....
- // StartBackground starts the worker in background waiting
- func (wr *Worker) StartBackground() {
- fmt.Printf("Starting worker %d\n", wr.ID)
- for {
- select {
- case task := <-wr.taskChan:
- process(wr.ID, task)
- case <-wr.quit:
- return
- }
- }
- }
- // Stop quits the worker
- func (wr *Worker) Stop() {
- fmt.Printf("Closing worker %d\n", wr.ID)
- go func() {
- wr.quit <- true
- }()
- }
Worker 結構體新加 quit channel,并且新加了兩個方法。StartBackgorund() 在 for 循環里使用 select-case 從 taskChan 隊列讀取任務并處理,如果從 quit 讀取到結束信號就立即返回。Stop() 方法負責往 quit 寫入結束信號。
添加完這兩個新的方法之后,我們來修改下 Pool:
- // workerpool/pool.go
- type Pool struct {
- Tasks []*Task
- Workers []*Worker
- concurrency int
- collector chan *Task
- runBackground chan bool
- wg sync.WaitGroup
- }
- // AddTask adds a task to the pool
- func (p *Pool) AddTask(task *Task) {
- p.collector <- task
- }
- // RunBackground runs the pool in background
- func (p *Pool) RunBackground() {
- go func() {
- for {
- fmt.Print("⌛ Waiting for tasks to come in ...\n")
- time.Sleep(10 * time.Second)
- }
- }()
- for i := 1; i <= p.concurrency; i++ {
- worker := NewWorker(p.collector, i)
- p.Workers = append(p.Workers, worker)
- go worker.StartBackground()
- }
- for i := range p.Tasks {
- p.collector <- p.Tasks[i]
- }
- p.runBackground = make(chan bool)
- <-p.runBackground
- }
- // Stop stops background workers
- func (p *Pool) Stop() {
- for i := range p.Workers {
- p.Workers[i].Stop()
- }
- p.runBackground <- true
- }
Pool 結構體添加了兩個成員:Workers 和 runBackground,Workers 保存所有的 worker,runBackground 用于維持 pool 存活狀態。
添加了三個新的方法,AddTask() 方法用于往 collector 添加任務;RunBackground() 方法衍生出一個無限運行的 goroutine,以便 pool 維持存活狀態,因為 runBackground 信道是空,讀取空的 channel 會阻塞,所以 pool 能維持運行狀態。接著,在協程里面啟動 worker;Stop() 方法用于停止 worker,并且給 runBackground 發送停止信號以便結束 RunBackground() 方法。
我們來看下具體是如何工作的。
如果是在現實的業務場景中,pool 將會與 HTTP 服務器一塊運行并消耗任務。我們通過 for 無限循環模擬這種這種場景,如果滿足某一條件,pool 將會停止。
- // main.go
- ...
- pool := workerpool.NewPool(allTask, 5)
- go func() {
- for {
- taskID := rand.Intn(100) + 20
- if taskID%7 == 0 {
- pool.Stop()
- }
- time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
- task := workerpool.NewTask(func(data interface{}) error {
- taskID := data.(int)
- time.Sleep(100 * time.Millisecond)
- fmt.Printf("Task %d processed\n", taskID)
- return nil
- }, taskID)
- pool.AddTask(task)
- }
- }()
- pool.RunBackground()
當執行上面的代碼時,我們就會看到有隨機的 task 被投遞到后臺運行的 workers,其中某一個 worker 會讀取到任務并完成處理。當滿足某一條件時,程序便會停止退出。
總結
基于上一篇文章的初步解決方案,這篇文章討論了通過 workPool 構建一個強大的解決方案。同時,我們進一步擴展了該方案,實現后臺運行 pool 并處理投遞的任務。
點擊【閱讀原文】直達代碼倉庫[1]。
參考資料
[1]代碼倉庫: https://github.com/Joker666/goworkerpool
via:https://hackernoon.com/concurrency-in-golang-and-workerpool-part-2-l3w31q7
作者:Hasan