Go 并發中 panic 的處理
Go 中的并發
在 go 中,可以通過原聲關鍵字 go 創建協程。
go func() {
// your code
}()
// go on
your code 和 go on 誰先執行是不確定的,這取決于調度。如果想等協程執行完再繼續執行的話怎么辦呢?比如下面代碼。
go func() {
// your code1
}()
go func() {
// your code2
}()
// go on
其中一種方式是使用 sync.WaitGroup。
var wg sync.WaitGroup
wg.Add(2)
go func() {
wg.Done()
}()
go func() {
wg.Done()
}()
wg.Wait()
// go on
wg.Wait() 會阻塞直到兩個協程執行完后,這個有點類似多線程中的線程屏障。但是這種方式過于靈活,我們需要控制好 Add 和 Done 的邏輯,否則會導致一直阻塞或者 panic。幸好 go 還提供了 errgroup.Group,代碼如下。
var g errgroup.Group
g.Go(func() error {})
g.Go(func() error {})
g.Wait()
在用法上,errgroup.Group 和 sync.WaitGroup 很像,前者也是基于后者實現的,但是前者使用起來相對簡單、并且還實現了自動管理 Add/Done、限制并發數等能力。在日常開發中也是經常使用 errgroup.Group 來實現并發。
并發中的 panic 問題
接下來看一下使用協程實現并發時的 panic 處理問題,前面兩種都是通過 go 關鍵字創建的協程,所以只能在函數里手動處理,所以主要看一下 errgroup.Group 的 panic 處理問題。我們從 errgroup.Group 的 Go 函數開始看。
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}
g.add(f)
}
Go 前面的邏輯用于限制并發,主要是 add 函數。
func (g *Group) add(f func() error) {
g.wg.Add(1)
go func() {
defer g.done()
// panic 處理
defer func() {
v := recover()
g.mu.Lock()
defer g.mu.Unlock()
// 記錄 panic 信息,但是只會記錄第一次 panic 的信息
if v != nil && g.panicValue == nil {
g.panicValue = ...
}
}()
// 用戶函數
err := f()
// 記錄錯誤信息,只記錄第一個錯誤
if err != nil {
g.errOnce.Do(func() {
g.err = err
})
}
}()
}
可以看到 errgroup.Group 處理了 panic 問題并記錄了 panic 信息,看起來我們的函數里可以不處理 panic,那 errgroup.Group 是怎么處理 panic 信息的呢?接著看 Wait 函數中處理。
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel(g.err)
}
if g.panicValue != nil {
panic(g.panicValue)
}
return g.err
}
可以看到最終會在 Wait 函數執行 panic,所以我們只需要處理 Wait 函數的 panic 就行。
defer func() {
v := recover()
// ...
}()
g.Wait()
但是還有有一個問題是 errgroup.Group 只會記錄第一個 panic,如果我們多個協程發生了 panic 則會丟失信息,所以我們最好還是自己處理,代碼如下。
defer func() {
v := recover()
// ...
}()
var g errgroup.Group
g.Go(func() error {
defer func() {
v := recover()
// ...
}()
})
g.Wait()
這樣就可以記錄每一個協程的 panic 信息,那么如果協程里的 defer 中再次發生 panic 怎么辦呢?通過之前的分析可以知道,這個 panic 會被 errgroup.Group 捕獲,并最終在 Wait 中執行 panic,所以即使協程里處理了 panic,我們也需要處理 Wait 的 panic。如果我們運行在一些框架中,框架往往會幫我們處理,比如 kitex 的處理如下。
defer func() {
// panic 處理
if handlerErr := recover(); handlerErr != nil {
err = kerrors.ErrPanic.WithCauseAndStack(
fmt.Errorf(
"[happened in biz handler, method=%s.%s, please check the panic at the server side] %s",
svcInfo.ServiceName, methodName, handlerErr),
string(debug.Stack()))
}
}()
// 執行業務代碼
minfo := svcInfo.MethodInfo(methodName)
implHandlerFunc := minfo.Handler()
err = implHandlerFunc(ctx, svc.handler, args, resp)
無論是協程里處理還是對 Wait 函數的處理,每次都要寫類似的代碼非常麻煩,一旦忘記寫就容易出現 panic,嚴重還會導致進程 crash(如果上層也沒有處理 panic)。我們可以基于 errgroup.Group 提供一個安全版本的 errgroup.Group。
type ErrGroup struct {
errgroup.Group
cancel func(error)
// 可以自定義處理函數
handler func(context.Context, *error)
}
// 默認處理
func handler(_ context.Context, err *error) {
if e := recover(); e != nil {
if err != nil {
*err = fmt.Errorf("panic happen: %v", e)
}
}
}
func WithContext(ctx context.Context) (*ErrGroup, context.Context) {
ctx, cancel := context.WithCancelCause(ctx)
return &ErrGroup{cancel: cancel}, ctx
}
func (e *ErrGroup) SafeGo(ctx context.Context, f func() error) {
e.Go(func() (err error) {
if e.handler != nil {
defer e.handler(ctx, &err)
} else {
defer handler(ctx, &err)
}
return f()
})
}
func (e *ErrGroup) SafeWait() (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic happen: %v", e)
}
}()
err = e.Wait()
if e.cancel != nil {
e.cancel(err)
}
return err
}
用法如下。
ctx := context.Background()
var eg safe_errgroup.ErrGroup
eg.SafeGo(ctx, func() error {
panic("oops")
})
err := eg.Wait()
這樣就可以安全地實現并發了,具體可以參考: