成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Go語言實現的可讀性更高的并發神庫

開發 前端
ForEach、Map方法可以更優雅的并發處理切片,代碼簡潔易讀,在實現上Iterator中的并發處理使用atomic來控制只創建一個閉包,避免了GC性能問題

前言

哈嘍,大家好,我是asong;前幾天逛github發現了一個有趣的并發庫-conc,其目標是:

  • 更難出現goroutine泄漏
  • 處理panic更友好
  • 并發代碼可讀性高

從簡介上看主要封裝功能如下:

  • 對waitGroup進行封裝,避免了產生大量重復代碼,并且也封裝recover,安全性更高
  • 提供panics.Catcher封裝recover邏輯,統一捕獲panic,打印調用棧一些信息
  • 提供一個并發執行任務的worker池,可以控制并發度、goroutine可以進行復用,支持函數簽名,同時提供了stream方法來保證結果有序
  • 提供ForEach、map方法優雅的處理切片

接下來就區分模塊來介紹一下這個庫;

倉庫地址:https://github.com/sourcegraph/conc

WatiGroup的封裝

Go語言標準庫有提供sync.waitGroup控制等待goroutine,我們一般會寫出如下代碼:

func main(){
var wg sync.WaitGroup
for i:=0; i < 10; i++{
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
// recover panic
err := recover()
if err != nil {
fmt.Println(err)
}
}
// do something
handle()
}
}
wg.Wait()
}

上述代碼我們需要些一堆重復代碼,并且需要單獨在每一個func中處理recover邏輯,所以conc庫對其進行了封裝,代碼簡化如下:

func main() {
wg := conc.NewWaitGroup()
for i := 0; i < 10; i++ {
wg.Go(doSomething)
}
wg.Wait()
}

func doSomething() {
fmt.Println("test")
}

conc庫封裝也比較簡單,結構如下:

type WaitGroup struct {
wg sync.WaitGroup
pc panics.Catcher
}

其自己實現了Catcher類型對recover邏輯進行了封裝,封裝思路如下:

type Catcher struct {
recovered atomic.Pointer[RecoveredPanic]
}

recovered是原子指針類型,RecoveredPanic是捕獲的recover封裝,封裝了堆棧等信息:

type RecoveredPanic struct {
// The original value of the panic.
Value any
// The caller list as returned by runtime.Callers when the panic was
// recovered. Can be used to produce a more detailed stack information with
// runtime.CallersFrames.
Callers []uintptr
// The formatted stacktrace from the goroutine where the panic was recovered.
// Easier to use than Callers.
Stack []byte
}

提供了Try方法執行方法,只會記錄第一個panic的gououtine信息:

func (p *Catcher) Try(f func()) {
defer p.tryRecover()
f()
}

func (p *Catcher) tryRecover() {
if val := recover(); val != nil {
rp := NewRecoveredPanic(1, val)
// 只會記錄第一個panic的goroutine信息
p.recovered.CompareAndSwap(nil, &rp)
}
}

提供了Repanic()方法用來重放捕獲的panic:

func (p *Catcher) Repanic() {
if val := p.Recovered(); val != nil {
panic(val)
}
}

func (p *Catcher) Recovered() *RecoveredPanic {
return p.recovered.Load()
}

waitGroup對此也分別提供了Wait()、WaitAndRecover()方法:

func (h *WaitGroup) Wait() {
h.wg.Wait()

// Propagate a panic if we caught one from a child goroutine.
h.pc.Repanic()
}

func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
h.wg.Wait()

// Return a recovered panic if we caught one from a child goroutine.
return h.pc.Recovered()
}

wait方法只要有一個goroutine發生panic就會向上拋出panic,比較簡單粗暴;

waitAndRecover方法只有有一個goroutine發生panic就會返回第一個recover的gouroutine信息;

總結:conc庫對waitGrouop的封裝總體是比較不錯的,可以減少重復的代碼;

worker池

conc提供了幾種類型的worker池:

  • ContextPool:可以傳遞context的pool,若有goroutine發生錯誤可以cancel其他goroutine
  • ErrorPool:通過參數可以控制只收集第一個error還是所有error
  • ResultContextPool:若有goroutine發生錯誤會cancel其他goroutine并且收集錯誤
  • RestultPool:收集work池中每個任務的執行結果,并不能保證順序,保證順序需要使用stream或者iter.map;

我們來看一個簡單的例子:

import "github.com/sourcegraph/conc/pool"

func ExampleContextPool_WithCancelOnError() {
p := pool.New().
WithMaxGoroutines(4).
WithContext(context.Background()).
WithCancelOnError()
for i := 0; i < 3; i++ {
i := i
p.Go(func(ctx context.Context) error {
if i == 2 {
return errors.New("I will cancel all other tasks!")
}
<-ctx.Done()
return nil
})
}
err := p.Wait()
fmt.Println(err)
// Output:
// I will cancel all other tasks!
}

在創建pool時有如下方法可以調用:

  • p.WithMaxGoroutines()配置pool中goroutine的最大數量
  • p.WithErrors:配置pool中的task是否返回error
  • p.WithContext(ctx):配置pool中運行的task當遇到第一個error要取消
  • p.WithFirstError:配置pool中的task只返回第一個error
  • p.WithCollectErrored:配置pool的task收集所有error

pool的基礎結構如下:

type Pool struct {
handle conc.WaitGroup
limiter limiter
tasks chan func()
initOnce sync.Once
}

limiter是控制器,用chan來控制goroutine的數量:

type limiter chan struct{}

func (l limiter) limit() int {
return cap(l)
}

func (l limiter) release() {
if l != nil {
<-l
}
}

pool的核心邏輯也比較簡單,如果沒有設置limiter,那么就看有沒有空閑的worker,否則就創建一個新的worker,然后投遞任務進去;

如果設置了limiter,達到了limiter worker數量上限,就把任務投遞給空閑的worker,沒有空閑就阻塞等著;

func (p *Pool) Go(f func()) {
p.init()

if p.limiter == nil {
// 沒有限制
select {
case p.tasks <- f:
// A goroutine was available to handle the task.
default:
// No goroutine was available to handle the task.
// Spawn a new one and send it the task.
p.handle.Go(p.worker)
p.tasks <- f
}
} else {
select {
case p.limiter <- struct{}{}:
// If we are below our limit, spawn a new worker rather
// than waiting for one to become available.
p.handle.Go(p.worker)

// We know there is at least one worker running, so wait
// for it to become available. This ensures we never spawn
// more workers than the number of tasks.
p.tasks <- f
case p.tasks <- f:
// A worker is available and has accepted the task.
return
}
}

}

這里work使用的是一個無緩沖的channel,這種復用方式很巧妙,如果goroutine執行很快避免創建過多的goroutine;

使用pool處理任務不能保證有序性,conc庫又提供了Stream方法,返回結果可以保持順序;

Stream

Steam的實現也是依賴于pool,在此基礎上做了封裝保證結果的順序性,先看一個例子:

func ExampleStream() {
times := []int{20, 52, 16, 45, 4, 80}

stream := stream2.New()
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
stream.Go(func() stream2.Callback {
time.Sleep(dur)
// This will print in the order the tasks were submitted
return func() { fmt.Println(dur) }
})
}
stream.Wait()

// Output:
// 20ms
// 52ms
// 16ms
// 45ms
// 4ms
// 80ms
}

stream的結構如下:

type Stream struct {
pool pool.Pool
callbackerHandle conc.WaitGroup
queue chan callbackCh

initOnce sync.Once
}

queue是一個channel類型,callbackCh也是channel類型 - chan func():

type callbackCh chan func()

在提交goroutine時按照順序生成callbackCh傳遞結果:

func (s *Stream) Go(f Task) {
s.init()

// Get a channel from the cache.
ch := getCh()

// Queue the channel for the callbacker.
s.queue <- ch

// Submit the task for execution.
s.pool.Go(func() {
defer func() {
// In the case of a panic from f, we don't want the callbacker to
// starve waiting for a callback from this channel, so give it an
// empty callback.
if r := recover(); r != nil {
ch <- func() {}
panic(r)
}
}()

// Run the task, sending its callback down this task's channel.
callback := f()
ch <- callback
})
}

var callbackChPool = sync.Pool{
New: func() any {
return make(callbackCh, 1)
},
}

func getCh() callbackCh {
return callbackChPool.Get().(callbackCh)
}

func putCh(ch callbackCh) {
callbackChPool.Put(ch)
}

ForEach和map

ForEach

conc庫提供了ForEach方法可以優雅的并發處理切片,看一下官方的例子:

圖片

conc庫使用泛型進行了封裝,我們只需要關注handle代碼即可,避免冗余代碼,我們自己動手寫一個例子:

func main() {
input := []int{1, 2, 3, 4}
iterator := iter.Iterator[int]{
MaxGoroutines: len(input) / 2,
}

iterator.ForEach(input, func(v *int) {
if *v%2 != 0 {
*v = -1
}
})

fmt.Println(input)
}

ForEach內部實現為Iterator結構及核心邏輯如下:

type Iterator[T any] struct {
MaxGoroutines int
}
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
if iter.MaxGoroutines == 0 {
// iter is a value receiver and is hence safe to mutate
iter.MaxGoroutines = defaultMaxGoroutines()
}

numInput := len(input)
if iter.MaxGoroutines > numInput {
// No more concurrent tasks than the number of input items.
iter.MaxGoroutines = numInput
}

var idx atomic.Int64
// 通過atomic控制僅創建一個閉包
task := func() {
i := int(idx.Add(1) - 1)
for ; i < numInput; i = int(idx.Add(1) - 1) {
f(i, &input[i])
}
}

var wg conc.WaitGroup
for i := 0; i < iter.MaxGoroutines; i++ {
wg.Go(task)
}
wg.Wait()
}

可以設置并發的goroutine數量,默認取的是GOMAXPROCS ,也可以自定義傳參;

并發執行這塊設計的很巧妙,僅創建了一個閉包,通過atomic控制idx,避免頻繁觸發GC;

map

conc庫提供的map方法可以得到對切片中元素結果,官方例子:

圖片

使用map可以提高代碼的可讀性,并且減少了冗余代碼,自己寫個例子:

func main() {
input := []int{1, 2, 3, 4}
mapper := iter.Mapper[int, bool]{
MaxGoroutines: len(input) / 2,
}

results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
fmt.Println(results)
// Output:
// [false true false true]
}

map的實現也依賴于Iterator,也是調用的ForEachIdx方法,區別于ForEach是記錄處理結果;

總結

花了小半天時間看了一下這個庫,很多設計點值得我們學習,總結一下我學習到的知識點:

  • conc.WatiGroup對Sync.WaitGroup進行了封裝,對Add、Done、Recover進行了封裝,提高了可讀性,避免了冗余代碼
  • ForEach、Map方法可以更優雅的并發處理切片,代碼簡潔易讀,在實現上Iterator中的并發處理使用atomic來控制只創建一個閉包,避免了GC性能問題
  • pool是一個并發的協程隊列,可以控制協程的數量,實現上也很巧妙,使用一個無緩沖的channel作為worker,如果goroutine執行速度快,避免了創建多個goroutine
  • stream是一個保證順序的并發協程隊列,實現上也很巧妙,使用sync.Pool在提交goroutine時控制順序,值得我們學習;

小伙伴們有時間可以看一下這個并發庫,學習其中的優點,慢慢進步~

責任編輯:武曉燕 來源: Golang夢工廠
相關推薦

2021-10-09 10:24:53

Java 代碼可讀性

2011-09-22 16:10:09

編程語言

2021-04-01 16:43:05

代碼可讀性開發

2017-10-30 15:22:29

代碼可讀性技巧

2024-01-31 08:04:43

PygmentsPython

2015-08-27 13:11:18

JavaScript代碼

2024-04-23 08:01:20

面向對象C 語言代碼

2022-08-23 14:57:43

Python技巧函數

2017-12-19 16:24:20

2021-06-15 09:12:19

TypeScriptTypeScript Javascript

2022-08-29 00:37:53

Python技巧代碼

2022-05-19 14:14:26

go語言限流算法

2021-01-26 09:18:27

Shell腳本網站

2019-12-03 09:32:32

JavaScript代碼開發

2022-11-04 11:18:16

代碼優化可讀性

2014-07-28 10:28:25

程序員

2014-07-29 09:55:33

程序員代碼可讀性

2024-10-11 06:00:00

Python代碼編程

2024-10-07 10:00:00

Python代碼編碼

2024-06-06 09:47:56

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产999精品久久久久久绿帽 | 亚洲国产日韩一区 | 91久久国产综合久久 | 久久精品国产久精国产 | 午夜激情影院 | 五月天国产在线 | 黄色网址av| 91色在线 | 亚洲成人一区二区三区 | 国产一级片在线观看视频 | 欧美福利三区 | 一区二区国产在线 | 一二三在线视频 | 雨宫琴音一区二区在线 | 国产不卡在线观看 | 福利社午夜影院 | 91看片在线观看 | 欧美国产精品一区二区 | 天天欧美 | 国产成人久久精品一区二区三区 | 伊人精品国产 | 国产精品视频一二三 | 欧美在线视频一区二区 | 日韩精品一区二区三区视频播放 | 91精品国产色综合久久 | 日韩视频免费在线 | av网站免费观看 | 一级片视频免费观看 | 久久久久久久97 | 黄网站免费入口 | 中文字幕人成乱码在线观看 | 国产精品久久久久久久久大全 | 国产小视频在线 | 久久精品亚洲精品国产欧美 | 久草视频在线播放 | 久久久人成影片免费观看 | 91视频一88av | 色天堂影院 | 成人免费在线网 | 国产精品色| 欧美综合在线观看 |