成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Go語言的并發與WorkerPool

開發 后端
Golang 的并發模型非常強大,稱為 CSP(通信順序進程),它將一個問題分解成更小的順序進程,然后調度這些進程的實例(稱為 Goroutine)。這些進程通過 channel 傳遞信息實現通信。

[[414288]]

本文轉載自微信公眾號「Golang來啦」,作者Seekload。轉載本文請聯系Golang來啦公眾號。

四哥水平有限,如有翻譯或理解錯誤,煩請幫忙指出,感謝!

昨天分享關于 workerPool 的文章,有同學在后臺說,昨天的 Demo 恰好符合項目的業務場景,真的非常棒!

所以今天就再來分享一篇 。

原文如下:

現代編程語言中,并發已經成為必不可少的特性。現在絕大多數編程語言都有一些方法實現并發。

其中一些實現方式非常強大,能將負載轉移到不同的系統線程,比如 Java 等;一些則在同一線程上模擬這種行為,比如 Ruby 等。

Golang 的并發模型非常強大,稱為 CSP(通信順序進程),它將一個問題分解成更小的順序進程,然后調度這些進程的實例(稱為 Goroutine)。這些進程通過 channel 傳遞信息實現通信。

本文,我們將探討如何利用 golang 的并發性,以及如何在 workerPool 使用。系列文章的第二篇,我們將探討如何構建一個強大的并發解決方案。

一個簡單的例子

假設我們需要調用一個外部 API 接口,整個過程需要花費 100ms。如果我們需要同步地調用該接口 1000 次,則需要花費 100s。

  1. //// model/data.go 
  2.  
  3. package model 
  4.  
  5. type SimpleData struct { 
  6.  ID int 
  7.  
  8. //// basic/basic.go 
  9.  
  10. package basic 
  11.  
  12. import ( 
  13.  "fmt" 
  14.  "github.com/Joker666/goworkerpool/model" 
  15.  "time" 
  16.  
  17. func Work(allData []model.SimpleData) { 
  18.  start := time.Now() 
  19.  for i, _ := range allData { 
  20.   Process(allData[i]) 
  21.  } 
  22.  elapsed := time.Since(start) 
  23.  fmt.Printf("Took ===============> %s\n", elapsed) 
  24.  
  25. func Process(data model.SimpleData) { 
  26.  fmt.Printf("Start processing %d\n", data.ID) 
  27.  time.Sleep(100 * time.Millisecond) 
  28.  fmt.Printf("Finish processing %d\n", data.ID) 
  29.  
  30. //// main.go 
  31.  
  32. package main 
  33.  
  34. import ( 
  35.  "fmt" 
  36.  "github.com/Joker666/goworkerpool/basic" 
  37.  "github.com/Joker666/goworkerpool/model" 
  38.  "github.com/Joker666/goworkerpool/worker" 
  39.  
  40. func main() { 
  41.  // Prepare the data 
  42.  var allData []model.SimpleData 
  43.  for i := 0; i < 1000; i++ { 
  44.   data := model.SimpleData{ ID: i } 
  45.   allData = append(allData, data) 
  46.  } 
  47.  fmt.Printf("Start processing all work \n"
  48.  
  49.  // Process 
  50.  basic.Work(allData) 
  1. Start processing all work 
  2. Took ===============> 1m40.226679665s 

上面的代碼創建了 model 包,包里包含一個結構體,這個結構體只有一個 int 類型的成員。我們同步地處理 data,這顯然不是最佳方案,因為可以并發處理這些任務。我們換一種方案,使用 goroutine 和 channel 來處理。

異步

  1. //// worker/notPooled.go 
  2.  
  3. func NotPooledWork(allData []model.SimpleData) { 
  4.  start := time.Now() 
  5.  var wg sync.WaitGroup 
  6.  
  7.  dataCh := make(chan model.SimpleData, 100) 
  8.  
  9.  wg.Add(1) 
  10.  go func() { 
  11.   defer wg.Done() 
  12.   for data := range dataCh { 
  13.    wg.Add(1) 
  14.    go func(data model.SimpleData) { 
  15.     defer wg.Done() 
  16.     basic.Process(data) 
  17.    }(data) 
  18.   } 
  19.  }() 
  20.  
  21.  for i, _ := range allData { 
  22.   dataCh <- allData[i] 
  23.  } 
  24.  
  25.  close(dataCh) 
  26.  wg.Wait() 
  27.  elapsed := time.Since(start) 
  28.  fmt.Printf("Took ===============> %s\n", elapsed) 
  29.  
  30. //// main.go 
  31.  
  32. // Process 
  33. worker.NotPooledWork(allData) 
  1. Start processing all work 
  2. Took ===============> 101.191534ms 

上面的代碼,我們創建了容量 100 的緩存 channel,并通過 NoPooledWork() 將數據 push 到 channel 里。channel 長度滿 100 之后,我們是無法再向其中添加元素直到有元素被讀取走。使用 for range 讀取 channel,并生成 goroutine 處理。這里我們沒有限制生成 goroutine 的數量,這可以盡可能多地處理任務。從理論上來講,在給定所需資源的情況下,可以處理盡可能多的數據。執行代碼,完成 1000 個任務只花費了 100ms。很瘋狂吧!不全是,接著往下看。

問題

除非我們擁有地球上所有的資源,否則在特定時間內能夠分配的資源是有限的。一個 goroutine 占用的最小內存是 2k,但也能達到 1G。上述并發執行所有任務的解決方案中,假設有一百萬個任務,就會很快耗盡機器的內存和 CPU。我們要么升級機器的配置,要么就尋找其他更好的解決方案。

計算機科學家很久之前就考慮過這個問題,并提出了出色的解決方案 - 使用 Thread Pool 或者 Worker Pool。這個方案是使用 worker 數量受限的工作池來處理任務,workers 會按順序一個接一個處理任務,這樣就避免了 CPU 和內存使用急速增長。

解決方案:Worker Pool

我們通過實現 worker pool 來修復之前遇到的問題。

  1. //// worker/pooled.go 
  2.  
  3. func PooledWork(allData []model.SimpleData) { 
  4.  start := time.Now() 
  5.  var wg sync.WaitGroup 
  6.  workerPoolSize := 100 
  7.  
  8.  dataCh := make(chan model.SimpleData, workerPoolSize) 
  9.  
  10.  for i := 0; i < workerPoolSize; i++ { 
  11.   wg.Add(1) 
  12.   go func() { 
  13.    defer wg.Done() 
  14.  
  15.    for data := range dataCh { 
  16.     basic.Process(data) 
  17.    } 
  18.   }() 
  19.  } 
  20.  
  21.  for i, _ := range allData { 
  22.   dataCh <- allData[i] 
  23.  } 
  24.  
  25.  close(dataCh) 
  26.  wg.Wait() 
  27.  elapsed := time.Since(start) 
  28.  fmt.Printf("Took ===============> %s\n", elapsed) 
  29.  
  30. //// main.go 
  31.  
  32. // Process 
  33. worker.PooledWork(allData) 
  1. Start processing all work 
  2. Took ===============> 1.002972449s 

上面的代碼,worker 數量限制在 100,我們創建了相應數量的 goroutine 來處理任務。我們可以把 channel 看作是隊列,worker goroutine 看作是消費者。多個 goroutine 可以監聽同一個 channel,但是 channel 里的每一個元素只會被處理一次。

Go 語言的 channel 可以當作隊列使用。

這是一個比較好的解決方案,執行代碼,我們看到完成所有任務花費 1s。雖然沒有 100ms 這么快,但已經能滿足業務需要,而且我們得到了一個更好的解決方案,能將負載均攤在不同的時間片上。

處理錯誤

我們能做的還沒完。上面看起來是一個完整的解決方案,但卻不是的,我們沒有處理錯誤情況。所以需要模擬出錯的情形,并且看下我們需要怎么處理。

  1. //// worker/pooledError.go 
  2.  
  3. func PooledWorkError(allData []model.SimpleData) { 
  4.  start := time.Now() 
  5.  var wg sync.WaitGroup 
  6.  workerPoolSize := 100 
  7.  
  8.  dataCh := make(chan model.SimpleData, workerPoolSize) 
  9.  errors := make(chan error, 1000) 
  10.  
  11.  for i := 0; i < workerPoolSize; i++ { 
  12.   wg.Add(1) 
  13.   go func() { 
  14.    defer wg.Done() 
  15.  
  16.    for data := range dataCh { 
  17.     process(data, errors) 
  18.    } 
  19.   }() 
  20.  } 
  21.  
  22.  for i, _ := range allData { 
  23.   dataCh <- allData[i] 
  24.  } 
  25.  
  26.  close(dataCh) 
  27.  
  28.  wg.Add(1) 
  29.  go func() { 
  30.   defer wg.Done() 
  31.   for { 
  32.    select { 
  33.    case err := <-errors: 
  34.     fmt.Println("finished with error:", err.Error()) 
  35.    case <-time.After(time.Second * 1): 
  36.     fmt.Println("Timeout: errors finished"
  37.     return 
  38.    } 
  39.   } 
  40.  }() 
  41.  
  42.  defer close(errors) 
  43.  wg.Wait() 
  44.  elapsed := time.Since(start) 
  45.  fmt.Printf("Took ===============> %s\n", elapsed) 
  46.  
  47. func process(data model.SimpleData, errors chan<- error) { 
  48.  fmt.Printf("Start processing %d\n", data.ID) 
  49.  time.Sleep(100 * time.Millisecond) 
  50.  if data.ID % 29 == 0 { 
  51.   errors <- fmt.Errorf("error on job %v", data.ID) 
  52.  } else { 
  53.   fmt.Printf("Finish processing %d\n", data.ID) 
  54.  } 
  55.  
  56. //// main.go 
  57.  
  58. // Process 
  59. worker.PooledWorkError(allData) 

我們修改了 process() 函數,處理一些隨機的錯誤并將錯誤 push 到 errors chnanel 里。所以,為了處理并發出現的錯誤,我們可以使用 errors channel 保存錯誤數據。在所有任務處理完成之后,可以檢查錯誤 channel 是否有數據。錯誤 channel 里的元素保存了任務 ID,方便需要的時候再處理這些任務。

比之前沒處理錯誤,很明顯這是一個更好的解決方案。但我們還可以做得更好,

我們將在下篇文章討論如何編寫一個強大的 worker pool 包,并且在 worker 數量受限的情況下處理并發任務。

總結

Go 語言的并發模型足夠強大給力,只需要構建一個 worker pool 就能很好地解決問題而無需做太多工作,這就是它沒有包含在標準庫中的原因。但是,我們自己可以構建一個滿足自身需求的方案。很快,我會在下一篇文章中講到,敬請期待!

點擊【閱讀原文】直達代碼倉庫[1]。

參考資料

[1]代碼倉庫: https://github.com/Joker666/goworkerpool?ref=hackernoon.com

via:https://hackernoon.com/concurrency-in-golang-and-workerpool-part-1-e9n31ao

作者:Hasan

 

責任編輯:武曉燕 來源: Golang來啦
相關推薦

2013-05-28 09:43:38

GoGo語言并發模式

2021-07-15 23:18:48

Go語言并發

2023-12-21 07:09:32

Go語言任務

2024-08-12 11:32:12

Go語言程序

2023-02-10 09:40:36

Go語言并發

2023-05-15 08:01:16

Go語言

2023-01-30 15:41:10

Channel控制并發

2021-06-24 06:35:00

Go語言進程

2021-04-13 07:58:42

Go語言函數

2021-04-07 09:02:49

Go 語言變量與常量

2022-04-06 08:19:13

Go語言切片

2014-04-09 09:32:24

Go并發

2022-03-04 10:07:45

Go語言字節池

2021-04-20 09:00:48

Go 語言結構體type

2021-09-30 09:21:28

Go語言并發編程

2025-03-24 00:25:00

Go語言并發編程

2024-07-01 08:44:42

Go語言協程

2024-04-07 00:04:00

Go語言Map

2022-01-10 23:54:56

GoMap并發

2021-07-29 07:55:19

Demo 工作池
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美精品久久久 | 在线高清免费观看视频 | 欧美一区二区三区在线观看 | 日本黄色影片在线观看 | 国产视频综合 | 亚洲天堂精品一区 | 中国黄色在线视频 | 丁香五月网久久综合 | 国产一级免费视频 | 91视频精选 | 久久99成人 | 最新日韩精品 | h免费观看 | 亚洲精品国产成人 | 国产精品欧美一区二区三区 | 亚洲黄色高清视频 | 欧美精品一区二区三区蜜桃视频 | 国产三区在线观看视频 | 91在线 | 综合九九| 欧美freesex黑人又粗又大 | 国产大学生情侣呻吟视频 | 精精国产xxxx视频在线播放7 | 日本亚洲欧美 | 爱草在线 | 男女污污动态图 | 日韩欧美不卡 | 在线欧美亚洲 | 最新国产精品视频 | 国产区在线 | 亚洲精品久久久久久久久久久 | 亚洲日本一区二区 | 欧美精品1区 | 国产精品一区二区三区在线播放 | 午夜电影网站 | 国产成人小视频 | 欧美9999 | 午夜影院在线观看视频 | 精品国产18久久久久久二百 | 免费观看的黄色网址 | 精品亚洲国产成av人片传媒 |