如何在 Go 中實現一個 Worker-Pool?
本文轉載自微信公眾號「吳親強的深夜食堂」,作者吳親庫里。轉載本文請聯系吳親強的深夜食堂公眾號。
之前寫過一篇文章,它有個響亮的名字:Handling 1 Million Requests per Minute with Go使用 Go 每分鐘處理百萬請求
這是國外的一個作者寫的,我做了一篇說明,起的也是這個標題。
沒想到閱讀量是我最好的一篇,果然文章都是靠標題出彩的…..
今天偶然看到另一篇文章(原文在文末[1])。兩篇文章原理相似:有一批工作任務(job),通過工作池(worker-pool)的方式,達到多worker并發處理job的效果。
他們還是有很多不同的點,實現上差別也是蠻大的。
首先上一篇文章我放了一張圖片,大概就是上篇整體的工作流。
- 每個worker處理完任務就好,不關心結果,不對結果做進一步處理。
- 只要請求不停止,程序就不會停止,沒有控制機制,除非宕機。
這篇文章不同點在于:
首先數據會從generate(生產數據)->并發處理數據->處理結果聚合。
圖大概是這樣的,
然后它可以通過context.context達到控制工作池停止工作的效果。
最后通過代碼,你會發現它不是傳統意義上的worker-pool,后面會說明。
下圖能清晰表達整體流程了。
順便說一句,這篇文章實現的代碼比 使用 Go 每分鐘處理百萬請求 的代碼簡單多了。
首先看job。
這個可以簡單過一下。最終每個job處理完都會包裝成Result返回。
下面這段就是核心代碼了。
整個WorkerPool結構很簡單。jobs是一個緩沖channel。每一個任務都會放入jobs中等待處理woker處理。
results也是一個通道類型,它的作用是保存每個job處理后產生的結果Result。
首先通過New初始化一個worker-pool工作池,然后執行Run開始運行。
初始化的時候傳入worker數,對應每個g運行work(ctx,&wg,wp.jobs,wp.results),組成了worker-pool。
同時通過sync.WaitGroup,我們可以等待所有worker工作結束,也就意味著work-pool結束工作,當然可能是因為任務處理結束,也可能是被停止了。
每個job數據源是如何來的?
對應每個worker的工作,
每個 worker 都嘗試從同一個jobs獲取數據,這是一個典型的fan-out模式。當對應的g獲取到job進行處理后,會把處理結果發送到同一個results channel中,這又是一個fan-in模式。
當然我們通過context.Context可以對每個worker做停止運行控制。
最后是處理結果集合,
那么整體的測試代碼就是:
看了代碼之后,我們知道,這并不是一個傳統意義的worker-pool。它并不像上篇這篇文章一樣,初始化一個真正的worker-pool,一旦接收到job,就嘗試從池中獲取一個worker,把對應的job交給這個work進行處理,等work處理完畢,重新進行到工作池中,等待下一次被利用。
附錄
[1]https://itnext.io/explain-to-me-go-concurrency-worker-pool-pattern-like-im-five-e5f1be71e2b0#fe56