一個(gè) Demo 學(xué)會(huì) WorkerPool
本文轉(zhuǎn)載自微信公眾號「Golang來啦」,作者Seekload。轉(zhuǎn)載本文請聯(lián)系Golang來啦公眾號。
四哥水平有限,如有翻譯或理解錯(cuò)誤,煩請幫忙指出,感謝!
今天給大家分享一篇關(guān)于 workPool 的文章,這個(gè)平時(shí)大家應(yīng)該用的比較多,一起來看下。
原文如下:
工作池是這樣一個(gè)池子,會(huì)創(chuàng)建指定數(shù)量的 worker,這些 worker 能獲取任務(wù)并處理。允許多個(gè)任務(wù)同時(shí)處理,但是需要維持固定數(shù)量的 worker 避免系統(tǒng)資源被過度使用。
通常有兩種方式創(chuàng)建任務(wù)池:
- 一種是預(yù)先創(chuàng)建固定數(shù)量的 worker;
- 另外一種是當(dāng)有需要的時(shí)候才會(huì)創(chuàng)建 worker,當(dāng)然也會(huì)有數(shù)量限制;
本文將與大家一起討論第一種方式。當(dāng)我們預(yù)先知道有許多任務(wù)需要同時(shí)運(yùn)行,并且很大概率會(huì)用上最大數(shù)量的 worker,通常會(huì)采用這種方式。
為了演示,我們先創(chuàng)建 Worker 結(jié)構(gòu)體,它獲取任務(wù)并執(zhí)行。
- import (
- "fmt"
- )
- // Worker ...
- type Worker struct {
- ID int
- Name string
- StopChan chan bool
- }
- // Start ...
- func (w *Worker) Start(jobQueue chan Job) {
- w.StopChan = make(chan bool)
- successChan := make(chan bool)
- go func() {
- successChan <- true
- for {
- // take job
- job := <-jobQueue
- if job != nil {
- job.Start(w)
- } else {
- fmt.Printf("worker %s to be stopped\n", w.Name)
- w.StopChan <- true
- break
- }
- }
- }()
- // wait for the worker to start
- <-successChan
- }
- // Stop ...
- func (w *Worker) Stop() {
- // wait for the worker to stop, blocking
- _ = <-w.StopChan
- fmt.Printf("worker %s stopped\n", w.Name)
- }
Worker 有一些屬性保存當(dāng)前的狀態(tài),另外還聲明了兩個(gè)方法分別用于啟動(dòng)、停止 worker。
在 Start() 方法里,創(chuàng)建了兩個(gè) channel 分別用于 worker 的啟動(dòng)和停止。最重要的是 for 循環(huán)里面,worker 會(huì)一直等待獲取 job 并可執(zhí)行的直到任務(wù)隊(duì)列關(guān)閉。
Job 是包含單個(gè)方法 Start() 的接口,所以只要實(shí)現(xiàn) Start() 方法就可以有不同類型的 job。
- // Job ...
- type Job interface {
- Start(worker *Worker) error
- }
一旦 Worker 確定之后,接下來就是創(chuàng)建 pool 來管理 workers。
- import (
- "fmt"
- "sync"
- )
- // Pool ...
- type Pool struct {
- Name string
- Size int
- Workers []*Worker
- QueueSize int
- Queue chan Job
- }
- // Initiualize ...
- func (p *Pool) Initialize() {
- // maintain minimum 1 worker
- if p.Size < 1 {
- p.Size = 1
- }
- p.Workers = []*Worker{}
- for i := 1; i <= p.Size; i++ {
- worker := &Worker{
- ID: i - 1,
- Name: fmt.Sprintf("%s-worker-%d", p.Name, i-1),
- }
- p.Workers = append(p.Workers, worker)
- }
- // maintain min queue size as 1
- if p.QueueSize < 1 {
- p.QueueSize = 1
- }
- p.Queue = make(chan Job, p.QueueSize)
- }
- // Start ...
- func (p *Pool) Start() {
- for _, worker := range p.Workers {
- worker.Start(p.Queue)
- }
- fmt.Println("all workers started")
- }
- // Stop ...
- func (p *Pool) Stop() {
- close(p.Queue) // close the queue channel
- var wg sync.WaitGroup
- for _, worker := range p.Workers {
- wg.Add(1)
- go func(w *Worker) {
- defer wg.Done()
- w.Stop()
- }(worker)
- }
- wg.Wait()
- fmt.Println("all workers stopped")
- }
Pool 包含 worker 切片和用于保存 job 的隊(duì)列。worker 的數(shù)量在初始化的時(shí)候是可以自定義。
關(guān)鍵點(diǎn)在 Stop() 的邏輯,當(dāng)它被調(diào)用時(shí),會(huì)先關(guān)閉 job 隊(duì)列,worker 便會(huì)從 job 隊(duì)列讀到 nil,接著就會(huì)關(guān)閉對應(yīng)的 worker。接著在 for 循環(huán)里,等待 worker 并發(fā)地停止直到最后一個(gè) worker 停止。
為了演示整體邏輯,下面的例子展示了一個(gè)僅僅輸出值的 job。
- import "fmt"
- func main() {
- pool := &Pool{
- Name: "test",
- Size: 5,
- QueueSize: 20,
- }
- pool.Initialize()
- pool.Start()
- defer pool.Stop()
- for i := 1; i <= 100; i++ {
- job := &PrintJob{
- Index: i,
- }
- pool.Queue <- job
- }
- }
- // PrintJob ...
- type PrintJob struct {
- Index int
- }
- func (pj *PrintJob) Start(worker *Worker) error {
- fmt.Printf("job %s - %d\n", worker.Name, pj.Index)
- return nil
- }
如果你看了上面的代碼邏輯,就會(huì)發(fā)現(xiàn)很簡單,創(chuàng)建了有 5 個(gè) worker 的工作池并且 job 隊(duì)列的大小是 20。
接著,模擬 job 創(chuàng)建和處理過程:一旦 job 被創(chuàng)建就會(huì) push 到任務(wù)隊(duì)列里,等待著的 worker 便會(huì)從隊(duì)列里取出 job 并處理。
類似下面這樣的輸出:
- all workers started
- job test-worker-3 - 4
- job test-worker-3 - 6
- job test-worker-3 - 7
- job test-worker-3 - 8
- job test-worker-3 - 9
- job test-worker-3 - 10
- job test-worker-3 - 11
- job test-worker-3 - 12
- job test-worker-3 - 13
- job test-worker-3 - 14
- job test-worker-3 - 15
- job test-worker-3 - 16
- job test-worker-3 - 17
- job test-worker-3 - 18
- job test-worker-3 - 19
- job test-worker-3 - 20
- worker test-worker-3 to be stopped
- job test-worker-4 - 5
- job test-worker-0 - 1
- worker test-worker-3 stopped
- job test-worker-2 - 3
- worker test-worker-2 to be stopped
- worker test-worker-2 stopped
- worker test-worker-4 to be stopped
- worker test-worker-4 stopped
- worker test-worker-0 to be stopped
- worker test-worker-0 stopped
- job test-worker-1 - 2
- worker test-worker-1 to be stopped
- worker test-worker-1 stopped
- all workers stopped
via:https://www.pixelstech.net/article/1611483826-Demo-on-creating-worker-pool-in-GoLang
作者:sonic0002