成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

一個例子,給你講透典型的Go并發控制

開發 前端
本篇從基礎的sync.WaitGroup{}?庫出發,涉及到了并發安全、sync.Once?等內容。最后介紹了并發控制的利器:golang.org/x/sync/errgroup。

Go中可以使用一個go關鍵字讓程序異步執行

一個比較常見的場景:逐個異步調用多個函數,或者循環中異步調用

func main() {
 go do1()
 go do2()
 go do3()
}

// 或者

func main() {
 for i := range []int{1,2,3}{
  go do(i)
 }
}

如果了解Go并發機制,就知道main在其他goroutine運行完成之前就已經結束了,所以上面代碼的運行結果是不符合預期的。我們需要使用一種叫做并發控制的手段,來保證程序正確運行

為了更容易理解,我們虛擬一個??

已知有一個現成的函數search,能夠按照關鍵詞執行搜索

期望實現一個新的函數coSearch能夠進行批量查詢

package main

import (
 "context"
 "errors"
 "fmt"
 "sync"
)

func search(ctx context.Context, word string) (string, error) {
 if word == "Go" {
  return "", errors.New("error: Go") // 模擬結果
 }
 return fmt.Sprintf("result: %s", word), nil // 模擬結果
}

func coSearch(ctx context.Context, words []string) (results []string, err error) {
 //tbd

 return
}

func main() {
 words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
 results, err := coSearch(context.Background(), words)
 if err != nil {
  fmt.Println(err)
  return
 }

 fmt.Println(results)
}

可以先暫停想想該如何實現coSearch函數

并發控制基礎

sync.WaitGroup是Go標準庫中用來控制并發的結構,這里放一個使用WaitGroup實現coSearch的示例

package main

import (
 "context"
 "errors"
 "fmt"
 "sync"
)

func search(ctx context.Context, word string) (string, error) {
 if word == "Go" {
  return "", errors.New("error: Go") // 模擬結果
 }
 return fmt.Sprintf("result: %s", word), nil // 模擬結果
}

func coSearch(ctx context.Context, words []string) ([]string, error) {
 var (
  wg      = sync.WaitGroup{}
  once    = sync.Once{}
  results = make([]string, len(words))
  err     error
 )

 for i, word := range words {
  wg.Add(1)

  go func(word string, i int) {
   defer wg.Done()

   result, e := search(ctx, word)
   if e != nil {
    once.Do(func() {
     err = e
    })

    return
   }

   results[i] = result
  }(word, i)
 }

 wg.Wait()

 return results, err
}

func main() {
 words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
 results, err := coSearch(context.Background(), words)
 if err != nil {
  fmt.Println(err)
  return
 }

 fmt.Println(results)
}

上面的代碼中有非常多的細節,來逐個聊一聊

?? sync.WaitGroup{}并發控制

sync.WaitGroup{}的用法非常簡潔

  • 當新運行一個goroutine時,我們需要調用wg.Add(1)
  • 當一個goroutine運行完成的時候,我們需要調用wg.Done()
  • wg.Wait()讓程序阻塞在此處,直到所有的goroutine運行完畢。

對于coSearch來說,等待所有goroutine運行完成,也就完成了函數的任務,返回最終的結果

var (
    wg      = sync.WaitGroup{}
    //...省略其他代碼
)

for i, word := range words {
    wg.Add(1)

    go func(word string, i int) {
        defer wg.Done()
  //...省略其他代碼
    }(word, i)
}

wg.Wait()

?? for循環中的goroutine!

這是一個Go經典錯誤,如果goroutine中使用了for迭代的變量,所有goroutine都會獲得最后一次循環的值。例如下面的示例,并不會輸出"a", "b", "c" 而是輸出 "c", "c", "c"

func main() {
    done := make(chan bool)

    values := []string{"a", "b", "c"}
    for _, v := range values {
        go func() {
            fmt.Println(v)
            done <- true
        }()
    }

    // wait for all goroutines to complete before exiting
    for _ = range values {
        <-done
    }
}

正確的做法就是像上文示例一樣,將迭代的變量賦值給函數參數,或者賦值給新的變量

for i, word := range words {
 // ...
    go func(word string, i int) {
        // fmt.Println(word, i)
    }(word, i)
}

for i, word := range words {
    i, word := i, word
    go func() {
        // fmt.Println(word, i)
    }()
}

由于這個錯誤實在太常見,從Go 1.22開始Go已經修正了這個經典的錯誤:Fixing For Loops in Go 1.22。

不過Go 1.22默認不會開啟修正,需要設置環境變量GOEXPERIMENT=loopvar才會 開啟

??  并發安全

簡單理解:當多個goroutine對同一個內存區域進行讀寫時,就會產生并發安全的問題,它會導致程序運行的結果不符合預期

上面的示例把最終的結果放入了results = make([]string, len(words))中。雖然我們在goroutine中并發的對于results變量進行寫入,但因為每一個goroutine都寫在了獨立的位置,且沒有任何讀取的操作,因此results[i] = result是并發安全的

results = [ xxxxxxxx,    xxxxxxxx,    xxxxxxxx,    .... ]
                ^            ^            ^       
                |            |            |       
           goroutine1   goroutine2    goroutine3

這也意味著如果使用results = append(results, result)的方式并發賦值,因為會涉及到slice的擴容等操作,所以并不是并發安全的,需要利用sync.Mutex{}進行加鎖

如果想盡可能的提高程序的并發性能,推薦使用 results[i] = result這種方式賦值

?? sync.Once{}單次賦值

示例coSearch中,會返回第一個出錯的search的error。err是一個全局變量,在并發goroutine中賦值是并發不安全的操作

//...省略其他代碼
go func(word string, i int) {
    defer wg.Done()

    result, e := search(ctx, word)
    if e != nil && err == nil {
        err = e

        return
    }

    results[i] = result
}(word, i)
//...省略其他代碼

對于全局變量的賦值比較常規做法就是利用sync.Mutex{}進行加鎖。但示例的邏輯為單次賦值,我們剛好可以利用同在sync庫的sync.Once{}來簡化代碼

sync.Once{}功能如其名,將我們要執行的邏輯放到它的Do()方法中,無論多少并發都只會執行一次

//...省略其他代碼
go func(word string, i int) {
    defer wg.Done()

    result, e := search(ctx, word)
    if e != nil {
        once.Do(func() {
            err = e
        })

        return
    }

    results[i] = result
}(word, i)
//...省略其他代碼

Further more

上面的示例coSearch已經是一個比較完善的函數了,但我們還可以做得更多

?? goroutine數量控制

coSearch入參的數組可能非常大,如果不加以控制可能導致我們的服務器資源耗盡,我們需要控制并發的數量

利用帶緩沖channel可以實現

tokens := make(chan struct{}, 10)

for i, word := range words {
    tokens <- struct{}{} // 新增
    wg.Add(1)

    go func(word string, i int) {
        defer func() {
            wg.Done()
            <-tokens  // 新增
        }()

        result, e := search(ctx, word)
        if e != nil {
            once.Do(func() {
                err = e
            })

            return
        }

        results[i] = result
    }(word, i)
}

wg.Wait()

如上,代碼中創建了10個緩沖區的channel,當channel被填滿時,繼續寫入會被阻塞;當goroutine運行完成之后,除了原有的wg.Done(),我們需要從channel讀取走一個數據,來允許新的goroutine運行

通過這種方式,我們控制了coSearch最多只能運行10個goroutine,當超過10個時需要等待前面運行的goroutine結束

?? context.Context

并發執行的goroutine只要有一個出錯,其他goroutine就可以停止,沒有必要繼續執行下去了。如何把取消的事件傳導到其他goroutine呢?context.Context就是用來傳遞類似上下文信息的結構

ctx, cancel := context.WithCancelCause(ctx) // 新增
defer cancel(nil) // 新增

for i, word := range words {
    tokens <- struct{}{}
    wg.Add(1)

    go func(word string, i int) {
        defer func() {
            wg.Done()
            <-tokens
        }()

        result, e := search(ctx, word)
        if e != nil {
            once.Do(func() {
                err = e
                cancel(e) // 新增
            })

            return
        }

        results[i] = result
    }(word, i)
}

wg.Wait()

完整的代碼

最終完成的效果如下

package main

import (
 "context"
 "errors"
 "fmt"
 "sync"
)

func search(ctx context.Context, word string) (string, error) {
 select {
 case <-ctx.Done():
  return "", ctx.Err()
 default:
  if word == "Go" || word == "Java" {
   return "", errors.New("Go or Java")
  }
  return fmt.Sprintf("result: %s", word), nil // 模擬結果
 }
}

func coSearch(ctx context.Context, words []string) ([]string, error) {
 ctx, cancel := context.WithCancelCause(ctx)
 defer cancel(nil)

 var (
  wg   = sync.WaitGroup{}
  once = sync.Once{}

  results = make([]string, len(words))
  tokens  = make(chan struct{}, 2)

  err error
 )

 for i, word := range words {
  tokens <- struct{}{}
  wg.Add(1)

  go func(word string, i int) {
   defer func() {
    wg.Done()
    <-tokens
   }()

   result, e := search(ctx, word)
   if e != nil {
    once.Do(func() {
     err = e
     cancel(e)
    })

    return
   }

   results[i] = result
  }(word, i)
 }

 wg.Wait()

 return results, err
}

并發控制庫errgroup

可以看到要實現一個較為完備的并發控制,需要做的工作非常多。不過Go官方團隊為大家準備了 golang.org/x/sync/errgroup

errgroup提供的能力和上文的示例類似,實現方式也類似,包含并發控制,錯誤傳遞,context.Context傳遞等

package main

import (
 "context"
 "fmt"
 "sync"

 "golang.org/x/sync/errgroup"
)

func coSearch(ctx context.Context, words []string) ([]string, error) {
 g, ctx := errgroup.WithContext(ctx)
 g.SetLimit(10)
 
 results := make([]string, len(words))

 for i, word := range words {
  i, word := i, word

  g.Go(func() error {
   result, err := search(ctx, word)
   if err != nil {
    return err
   }

   results[i] = result
   return nil
  })
 }

 err := g.Wait()

 return results, err
}

errgroup的用法也很簡單

  • 使用 g, ctx := errgroup.WithContext(ctx)來創建goroutine的管理器
  • g.SetLimit()可以設置允許的最大的goroutine數量
  • 類似于go關鍵詞, g.Go異步執行函數
  • g.Wait()和sync.WaitGroup{}的wg.Wait()類似,會阻塞直到所有goroutine都運行完成,并返回其中一個goroutine的錯誤

利用golang.org/x/sync/errgroup大幅簡化了進行并發控制的邏輯,真是一個并發控制的利器?。?/p>

總結

本篇從基礎的sync.WaitGroup{}庫出發,涉及到了并發安全、sync.Once等內容。最后介紹了并發控制的利器:golang.org/x/sync/errgroup。

雖然使用Go語言能夠非常簡單的編寫并發程序,但其中要注意的細節非常多,忽略這些細節不僅沒有提升程序運行的效率,還會產生錯誤的結果

責任編輯:武曉燕 來源: 涼涼的知識庫
相關推薦

2021-07-28 08:32:58

Go并發Select

2024-09-06 12:52:59

2021-04-20 11:40:47

指針類型CPU

2018-07-03 15:20:36

Promise函數借錢

2009-08-10 10:08:45

.NET調用PHP W

2025-03-28 08:30:00

PythonPandasaxis

2021-06-24 06:35:00

Go語言進程

2009-06-11 14:48:48

jbpm工作流引擎jbpm例子

2009-06-18 15:53:37

Hibernate B

2024-01-25 11:41:00

Python開發前端

2023-03-14 08:02:14

靜態路由動態路由設備

2021-03-24 06:06:13

Go并發編程Singlefligh

2025-05-28 02:00:00

AI智能體文本

2023-11-06 13:55:59

聯合索引MySQ

2020-09-06 22:59:35

Linux文件命令

2023-05-25 08:02:09

構建工具源碼JS

2020-03-26 09:18:54

高薪本質因素

2021-07-09 06:11:37

Java泛型Object類型

2024-06-17 08:40:16

2023-01-30 16:21:24

Linux外觀
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 免费观看一级特黄欧美大片 | 精品国产乱码久久久久久图片 | 国产日韩欧美一区二区 | 日韩三 | 国产免费一区二区 | 久久久久久电影 | www在线视频 | 成人在线小视频 | 国产偷自视频区视频 | 在线免费观看黄视频 | 亚洲高清视频一区二区 | 天天色图| 999精品视频 | 国产福利资源在线 | 午夜精品一区二区三区在线观看 | 欧美日韩一区二区在线观看 | 日韩在线播放网址 | 亚洲伊人久久综合 | 天天干天天干 | 亚洲午夜视频在线观看 | 色免费在线视频 | 一区欧美| 日韩蜜桃视频 | 91文字幕巨乱亚洲香蕉 | 欧美一级做a爰片免费视频 国产美女特级嫩嫩嫩bbb片 | 高清成人免费视频 | 成人免费看 | 日韩在线观看中文字幕 | 欧美精品成人一区二区三区四区 | 日韩日韩日韩日韩日韩日韩日韩 | 成人在线精品视频 | 国产亚洲一区二区三区在线观看 | 狠狠撸在线视频 | 国产美女福利在线观看 | 91免费看片 | 免费激情av | 一本在线 | 91麻豆精品国产91久久久久久 | 日韩精品网站 | 国产精品美女久久久久aⅴ国产馆 | 欧美2区 |