成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

并發編程包之 Errgroup

開發 項目管理
Task 是需要處理單個工作單元;Worker 是一個簡單的 worker 函數,用于執行任務;而 Pool 用于創建、管理 workers。

[[415494]]

本文轉載自微信公眾號「Golang來啦」,作者Seekload 。轉載本文請聯系Golang來啦公眾號。

四哥水平有限,如果有翻譯或理解錯誤的點,煩請幫忙指出,感謝!

這是系列文章的第二篇,第一篇文章點擊這里查看。

原文如下:

基于 goroutine 和 channel 的并發特性,使得 Go 成為了強大的并發語言。上一篇文章,我們討論了如何構建 workerPool 來提高程序的并發性能,換句話說,避免耗盡系統資源。但那只是一個簡單的示例,演示我們應該如何實現。

基于對上一篇文章的學習,在這篇文章里面,我們將構建一個健壯的解決方案,以便在任何其他應用程序里面可以使用該方案。網絡上有其他復雜架構的解決方案,比如使用調度器等等。實際上,我們并不需要這些復雜的設計,僅僅使用一個共享 channel 就可以解決問題。我們一起來看下,該如何構建呢?

代碼結構

我們創建了一個通用的 workerPool 包,根據業務所需的并發性使用 worker 來處理任務。一起來看下目錄結構:

  1. workerpool 
  2. ├── pool.go 
  3. ├── task.go 
  4. └── worker.go 

workerpool 目錄在項目的根目錄下。Task 是需要處理單個工作單元;Worker 是一個簡單的 worker 函數,用于執行任務;而 Pool 用于創建、管理 workers。

實現

先看下 Task 代碼:

  1. // workerpool/task.go 
  2.  
  3. package workerpool 
  4.  
  5. import ( 
  6.  "fmt" 
  7.  
  8. type Task struct { 
  9.  Err  error 
  10.  Data interface{} 
  11.  f    func(interface{}) error 
  12.  
  13. func NewTask(f func(interface{}) error, data interface{}) *Task { 
  14.  return &Task{f: f, Data: data} 
  15.  
  16. func process(workerID int, task *Task) { 
  17.  fmt.Printf("Worker %d processes task %v\n", workerID, task.Data) 
  18.  task.Err = task.f(task.Data) 

Task 是一個簡單的結構體,保存處理任務所需要的一切數據。創建 task 時,傳遞了 Data 和待執行函數 f,process() 函數會處理任務。處理任務時,將 Data 作為參數傳遞給函數 f,并將執行結果保存在 Task.Err 里。

我們來看下 Worker 是如何處理任務的:

  1. // workerpool/worker.go 
  2.  
  3. package workerpool 
  4.  
  5. import ( 
  6.  "fmt" 
  7.  "sync" 
  8.  
  9. // Worker handles all the work 
  10. type Worker struct { 
  11.  ID       int 
  12.  taskChan chan *Task 
  13.  
  14. // NewWorker returns new instance of worker 
  15. func NewWorker(channel chan *Task, ID int) *Worker { 
  16.  return &Worker{ 
  17.   ID:       ID, 
  18.   taskChan: channel, 
  19.  } 
  20.  
  21. // Start starts the worker 
  22. func (wr *Worker) Start(wg *sync.WaitGroup) { 
  23.  fmt.Printf("Starting worker %d\n", wr.ID) 
  24.  
  25.  wg.Add(1) 
  26.  go func() { 
  27.   defer wg.Done() 
  28.   for task := range wr.taskChan { 
  29.    process(wr.ID, task) 
  30.   } 
  31.  }() 

我們創建了一個小巧的 Worker 結構體,包含 worker ID 和 一個保存待處理任務的 channel。在 Start() 方法里,使用 for range 從 taskChan 讀取任務并處理。可以想象的到,多個 worker 可以并發地執行任務。

workerPool

我們通過實現 Task 和 Worker 來處理任務,但是好像還缺點什么東西,誰負責生成這些 worker 并將任務發送給它們?答案是:Worker Pool。

  1. // workerpoo/pool.go 
  2.  
  3. package workerpool 
  4.  
  5. import ( 
  6.  "fmt" 
  7.  "sync" 
  8.  "time" 
  9.  
  10. // Pool is the worker pool 
  11. type Pool struct { 
  12.  Tasks   []*Task 
  13.  
  14.  concurrency   int 
  15.  collector     chan *Task 
  16.  wg            sync.WaitGroup 
  17.  
  18. // NewPool initializes a new pool with the given tasks and 
  19. // at the given concurrency. 
  20. func NewPool(tasks []*Task, concurrency int) *Pool { 
  21.  return &Pool{ 
  22.   Tasks:       tasks, 
  23.   concurrency: concurrency, 
  24.   collector:   make(chan *Task, 1000), 
  25.  } 
  26.  
  27. // Run runs all work within the pool and blocks until it's 
  28. // finished. 
  29. func (p *Pool) Run() { 
  30.  for i := 1; i <= p.concurrency; i++ { 
  31.   worker := NewWorker(p.collector, i) 
  32.   worker.Start(&p.wg) 
  33.  } 
  34.  
  35.  for i := range p.Tasks { 
  36.   p.collector <- p.Tasks[i] 
  37.  } 
  38.  close(p.collector) 
  39.  
  40.  p.wg.Wait() 

上面的代碼,pool 保存了所有待處理的任務,并且生成與 concurrency 數量一致的 goroutine,用于并發地處理任務。workers 之間共享緩存 channel -- collector。

所以,當我們把這個工作池跑起來時,可以生成滿足所需數量的 worker,workers 之間共享 collector channel。接著,使用 for range 讀取 tasks,并將讀取到的 task 寫入 collector 里。我們使用 sync.WaitGroup 實現協程之間的同步。現在我們有了一個很好的解決方案,一起來測試下。

  1. // main.go 
  2.  
  3. package main 
  4.  
  5. import ( 
  6.  "fmt" 
  7.  "time" 
  8.  
  9.  "github.com/Joker666/goworkerpool/workerpool" 
  10.  
  11. func main() { 
  12.  var allTask []*workerpool.Task 
  13.  for i := 1; i <= 100; i++ { 
  14.   task := workerpool.NewTask(func(data interface{}) error { 
  15.    taskID := data.(int
  16.    time.Sleep(100 * time.Millisecond) 
  17.    fmt.Printf("Task %d processed\n", taskID) 
  18.    return nil 
  19.   }, i) 
  20.   allTask = append(allTask, task) 
  21.  } 
  22.  
  23.  pool := workerpool.NewPool(allTask, 5) 
  24.  pool.Run() 

上面的代碼,創建了 100 個任務并且使用 5 個并發處理這些任務。

輸出如下:

  1. Worker 3 processes task 98 
  2. Task 92 processed 
  3. Worker 2 processes task 99 
  4. Task 98 processed 
  5. Worker 5 processes task 100 
  6. Task 99 processed 
  7. Task 100 processed 
  8. Took ===============> 2.0056295s 

處理 100 個任務花費了 2s,如何我們將并發數提高到 10,我們會看到處理完所有任務只需要大約 1s。

我們通過實現 workerPool 構建了一個健壯的解決方案,具有并發性、錯誤處理、數據處理等功能。這是個通用的包,不耦合具體的實現。我們可以使用它來解決一些大問題。

進一步擴展:后臺處理任務

實際上,我們還可以進一步擴展上面的解決方案,以便 worker 可以在后臺等待我們投遞新的任務并處理。為此,代碼需要做一些修改,Task 結構體保持不變,但是需要小改下 Worker,看下面代碼:

  1. // workerpool/worker.go 
  2.  
  3. // Worker handles all the work 
  4. type Worker struct { 
  5.  ID       int 
  6.  taskChan chan *Task 
  7.  quit     chan bool 
  8.  
  9. // NewWorker returns new instance of worker 
  10. func NewWorker(channel chan *Task, ID int) *Worker { 
  11.  return &Worker{ 
  12.   ID:       ID, 
  13.   taskChan: channel, 
  14.   quit:     make(chan bool), 
  15.  } 
  16.  
  17. .... 
  18.  
  19. // StartBackground starts the worker in background waiting 
  20. func (wr *Worker) StartBackground() { 
  21.  fmt.Printf("Starting worker %d\n", wr.ID) 
  22.  
  23.  for { 
  24.   select { 
  25.   case task := <-wr.taskChan: 
  26.    process(wr.ID, task) 
  27.   case <-wr.quit: 
  28.    return 
  29.   } 
  30.  } 
  31.  
  32. // Stop quits the worker 
  33. func (wr *Worker) Stop() { 
  34.  fmt.Printf("Closing worker %d\n", wr.ID) 
  35.  go func() { 
  36.   wr.quit <- true 
  37.  }() 

Worker 結構體新加 quit channel,并且新加了兩個方法。StartBackgorund() 在 for 循環里使用 select-case 從 taskChan 隊列讀取任務并處理,如果從 quit 讀取到結束信號就立即返回。Stop() 方法負責往 quit 寫入結束信號。

添加完這兩個新的方法之后,我們來修改下 Pool:

  1. // workerpool/pool.go 
  2.  
  3. type Pool struct { 
  4.  Tasks   []*Task 
  5.  Workers []*Worker 
  6.  
  7.  concurrency   int 
  8.  collector     chan *Task 
  9.  runBackground chan bool 
  10.  wg            sync.WaitGroup 
  11.  
  12. // AddTask adds a task to the pool 
  13. func (p *Pool) AddTask(task *Task) { 
  14.  p.collector <- task 
  15.  
  16. // RunBackground runs the pool in background 
  17. func (p *Pool) RunBackground() { 
  18.  go func() { 
  19.   for { 
  20.    fmt.Print("⌛ Waiting for tasks to come in ...\n"
  21.    time.Sleep(10 * time.Second
  22.   } 
  23.  }() 
  24.  
  25.  for i := 1; i <= p.concurrency; i++ { 
  26.   worker := NewWorker(p.collector, i) 
  27.   p.Workers = append(p.Workers, worker) 
  28.   go worker.StartBackground() 
  29.  } 
  30.  
  31.  for i := range p.Tasks { 
  32.   p.collector <- p.Tasks[i] 
  33.  } 
  34.  
  35.  p.runBackground = make(chan bool) 
  36.  <-p.runBackground 
  37.  
  38. // Stop stops background workers 
  39. func (p *Pool) Stop() { 
  40.  for i := range p.Workers { 
  41.   p.Workers[i].Stop() 
  42.  } 
  43.  p.runBackground <- true 

Pool 結構體添加了兩個成員:Workers 和 runBackground,Workers 保存所有的 worker,runBackground 用于維持 pool 存活狀態。

添加了三個新的方法,AddTask() 方法用于往 collector 添加任務;RunBackground() 方法衍生出一個無限運行的 goroutine,以便 pool 維持存活狀態,因為 runBackground 信道是空,讀取空的 channel 會阻塞,所以 pool 能維持運行狀態。接著,在協程里面啟動 worker;Stop() 方法用于停止 worker,并且給 runBackground 發送停止信號以便結束 RunBackground() 方法。

我們來看下具體是如何工作的。

如果是在現實的業務場景中,pool 將會與 HTTP 服務器一塊運行并消耗任務。我們通過 for 無限循環模擬這種這種場景,如果滿足某一條件,pool 將會停止。

  1. // main.go 
  2.  
  3. ... 
  4.  
  5. pool := workerpool.NewPool(allTask, 5) 
  6. go func() { 
  7.  for { 
  8.   taskID := rand.Intn(100) + 20 
  9.  
  10.   if taskID%7 == 0 { 
  11.    pool.Stop() 
  12.   } 
  13.  
  14.   time.Sleep(time.Duration(rand.Intn(5)) * time.Second
  15.   task := workerpool.NewTask(func(data interface{}) error { 
  16.    taskID := data.(int
  17.    time.Sleep(100 * time.Millisecond) 
  18.    fmt.Printf("Task %d processed\n", taskID) 
  19.    return nil 
  20.   }, taskID) 
  21.   pool.AddTask(task) 
  22.  } 
  23. }() 
  24. pool.RunBackground() 

當執行上面的代碼時,我們就會看到有隨機的 task 被投遞到后臺運行的 workers,其中某一個 worker 會讀取到任務并完成處理。當滿足某一條件時,程序便會停止退出。

總結

基于上一篇文章的初步解決方案,這篇文章討論了通過 workPool 構建一個強大的解決方案。同時,我們進一步擴展了該方案,實現后臺運行 pool 并處理投遞的任務。

點擊【閱讀原文】直達代碼倉庫[1]。

參考資料

[1]代碼倉庫: https://github.com/Joker666/goworkerpool

via:https://hackernoon.com/concurrency-in-golang-and-workerpool-part-2-l3w31q7

作者:Hasan

 

責任編輯:武曉燕 來源: Golang來啦
相關推薦

2021-02-26 13:08:27

Java高并發AQS

2018-09-12 15:38:42

Javaatomic編程

2023-07-03 09:59:00

并發編程并發容器

2021-03-18 00:14:29

JavaCyclicBarri高并發

2021-03-04 07:24:24

JavaSemaphore高并發

2010-01-15 09:15:09

Scala Actor并發

2023-10-27 07:47:58

Java語言順序性

2021-03-11 00:05:55

Java高并發編程

2017-09-19 14:53:37

Java并發編程并發代碼設計

2011-12-12 11:16:02

iOS并發編程

2022-10-17 08:07:13

Go 語言并發編程

2023-07-06 08:06:47

LockCondition公平鎖

2011-12-29 13:31:15

Java

2025-02-17 00:00:25

Java并發編程

2025-02-19 00:05:18

Java并發編程

2025-06-18 08:10:00

Java并發編程開發

2022-07-08 14:14:04

并發編程異步編程

2022-10-24 00:48:58

Go語言errgroup

2010-10-14 10:43:43

編程

2023-11-27 18:07:05

Go并發編程
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲毛片 | 免费看黄色国产 | 午夜欧美一区二区三区在线播放 | 在线欧美一区二区 | 99精品久久久久 | 特级特黄特色的免费大片 | 欧美日韩中 | 国产美女黄色片 | 精品久久香蕉国产线看观看亚洲 | www久久 | 中文字幕一区二区不卡 | 色婷婷综合久久久久中文一区二区 | www.99热.com | 黄网免费看 | 91精品国产91久久综合桃花 | 男女视频在线免费观看 | 欧美日日| 成人av电影免费在线观看 | 国产精品久久久久久久久久免费 | 欧美成ee人免费视频 | 日韩av手机在线观看 | 亚洲日本一区二区三区四区 | 91精品国产91久久久久久丝袜 | www.久草.com| 一级毛片大全免费播放 | 亚洲成人精品 | 激情a| 91精品在线播放 | 中文字幕在线一区二区三区 | 欧美久操网 | 精品欧美一区二区三区久久久小说 | 国产精品久久久久久久久久久久 | 伊人免费网 | 中文字幕一区二区三区日韩精品 | 久久久久久久国产精品影院 | 国产成人精品一区二区三 | 日本黄色一级视频 | 欧美高清视频 | 欧洲高清转码区一二区 | 男女国产视频 | 日韩中文字幕网 |