成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

講完Go并發控制,講講并發抑制

開發 前端
在高并發的狀態下,一般會給熱點數據設置緩存。但數據第一次訪問或者緩存失效的狀態下,如果直接去查詢數據庫,會給數據庫造成極大壓力,甚至直接打爆數據庫。

已知有一個函數search,能夠按照關鍵詞執行搜索,coSearch能夠批量并發查詢。

讓我們把目光定位到search上,search通過查詢數據庫或者調用其他api來完成搜索,這是一個相對耗時和消耗資源的操作。

當多個相同的關鍵詞并發查詢(調用search函數)時,我們希望只產生一次數據庫調用(調用query),第一個查詢未完成時后續的重復查詢會等待,當第一個查詢完成時則會與其他查詢分享結果,這樣一來雖然只執行了一次數據庫調用但是所有查詢都拿到了最終的結果。

圖片圖片

什么是并發抑制:

package main

import (
 "context"
 "fmt"
 "sync"
 "time"

 "golang.org/x/sync/errgroup"
)

func query(ctx context.Context, word string) (string, error) {
 fmt.Println("searching: ", word)
 time.Sleep(5 * time.Second)

 return fmt.Sprintf("result: %s", word), nil // 模擬結果
}

// 實現search,在重復并發調用下僅執行一次query
// 其他并發共享這次query的結果
func search(ctx context.Context, word string) (string, error) {
    return query(ctx, word)
}

func coSearch(ctx context.Context, words []string) ([]string, error) {
 g, ctx := errgroup.WithContext(ctx)
 g.SetLimit(10)

 results := make([]string, len(words))

 for i, word := range words {
  i, word := i, word

  g.Go(func() error {
   result, err := search(ctx, word)
   if err != nil {
    return err
   }

   results[i] = result
   return nil
  })
 }

 err := g.Wait()

 return results, err
}

func main() {
 words := []string{"Go","Go", "Go", "Rust", "PHP", "JavaScript", "Java"}
 results, err := coSearch(context.Background(), words)
 if err != nil {
  fmt.Println(err)
  return
 }

 fmt.Println(results)
}

好了,可以先暫停想想該如何實現search函數了。

一步一步實現并發抑制

我們先假設所有查詢關鍵詞都一樣,那么問題簡化成并發執行search時,只在第一次search時調用query,其他的search并發調用等待并共享這次的查詢結果。

通過waiting變量,其他goroutine等待第一個goroutine數據庫調用完成,那么如何讓其他goroutine等待在這個位置呢?

func main() {
 words := []string{"Go", "Go", "Go", "Go", "Go"}
 results, err := coSearch(context.Background(), words)
 if err != nil {
  fmt.Println(err)
  return
 }

 fmt.Println(results)
}

var (
 waiting bool
 resp    string
 err     error
)

func search(ctx context.Context, word string) (string, error) {
  if waiting {
    // 等待resp, err被賦值,即第一個query完成后再返回
    // ...?
      return resp, err
  }

  waiting = true
  resp, err = query(ctx, word)
  waiting = false

  return resp, err
}

func query(ctx context.Context, word string) (string, error) {
 fmt.Println("searching: ", word)
 time.Sleep(5 * time.Second)

 return fmt.Sprintf("result: %s", word), nil // 模擬結果
}

sync.WaitGroup{}并發控制

sync.WaitGroup{}是并發控制的核心,這里再次重申下用法:

  • 當新運行一個goroutine時,我們需要調用wg.Add(1)。
  • 當一個goroutine運行完成的時候,我們需要調用wg.Done()。
  • wg.Wait()讓程序阻塞在此處,直到所有的goroutine運行完畢。

利用 sync.WaitGroup{}便可實現上文代碼中等待的效果:

var (
 wg      sync.WaitGroup
 waiting bool
 resp    string
 err     error
)

func search(ctx context.Context, word string) (string, error) {
 if waiting {
  // 其他goroutine等待第一個goroutine執行完成
  wg.Wait()
  return resp, err
 }

 waiting = true
    
 wg.Add(1)
 resp, err = query(ctx, word)
    wg.Done() // 第一個goroutine執行完成
    
 waiting = false

 return resp, err
}

并發安全

當多個goroutine對同一個內存區域進行讀寫時,就會產生并發安全的問題,它會導致程序運行的結果不符合預期,而上文的程序并發的讀寫了waiting變量,需要給waiting變量加把鎖。

釋放鎖的位置非常的有技巧,如果在在wg.Add(1)之前mu.Unlock(),可能 wg.Add(1)還未來得執行其他goroutine已經執行了wg.Wait(),并獲取到了錯誤的數據。

unlock在add之前;

var (
  wg      sync.WaitGroup
  mu      sync.Mutex
  waiting bool
  resp    string
  err     error
)

func search(ctx context.Context, word string) (string, error) {
 mu.Lock()

 if waiting {
  mu.Unlock()
  wg.Wait()
  return resp, err
 }

 waiting = true
    
 wg.Add(1)
    // 在wg.Add(1)之后釋放鎖,保證其他goroutine被wg.Wait()阻塞
 mu.Unlock()

 resp, err = query(ctx, word)
    wg.Done()

 mu.Lock()
 waiting = false
 mu.Unlock() 
    
 return resp, err
}

完整版本

現在可以針對不同的關鍵詞做區分了,使用一個map來代替原有的waiting,并將每一個關鍵詞查詢的WaitGroup和結果打包到map的value中。

type call struct {
 wg   sync.WaitGroup
 resp string
 err  error
}

var (
    mu sync.Mutex
    m = make(map[string]*call)
)

func search(ctx context.Context, word string) (string, error) {
 mu.Lock()

 if c, ok := m[word]; ok {
  mu.Unlock()
  c.wg.Wait()
  return c.resp, c.err
 }

 c := &call{}
 m[word] = c

 c.wg.Add(1)
 // 在wg.Add(1)之后才釋放鎖,保證其他goroutine被wg.Wait()阻塞
 mu.Unlock()

 c.resp, c.err = query(ctx, word)
 c.wg.Done()

 mu.Lock()
 delete(m, word)
 mu.Unlock()

 return c.resp, c.err
}

開源庫 golang.org/x/sync/singleflight

上面一步一步教大家手搓了一個并發抑制的邏輯,我們的基本邏輯和開源庫golang.org/x/sync/singleflight沒有區別,只是singleflight內部實現更加嚴謹

直接使用singleflight非常簡單的就可以實現我們的訴求

  • singleflight.Group 創建一個需要并發控制的范圍
  • Do函數

第一個參數接收一個key來判斷否重復調用

第二個參數為要執行的函數,函數可以返回正常值或者error

Do函數返回值除了閉包函數的返回值之外,還返回了此次返回值是否由其他goroutine共享

import (
 "golang.org/x/sync/singleflight"
)

var g = new(singleflight.Group)

func search(ctx context.Context, word string) (string, error) {
 resp, err, _ := g.Do(word, func() (interface{}, error) {
  return query(ctx, word)
 })

 return resp.(string), err
}

錯誤處理

因為共享第一個goroutine的結果,因此如果第一次調用失敗,那其他goroutine也都會失敗

如果在某些場景下允許第一個調用失敗后再次嘗試調用該函數,那么可以通過調用Forget方法來忘記這個key

var g = new(singleflight.Group)

func search(ctx context.Context, word string) (string, error) {
 resp, err, _ := g.Do(word, func() (interface{}, error) {
  val, err := query(ctx, word)
  // 當出錯并且允許重試時
  if err != nil && true {
   g.Forget(word)
   return "", err
  }

  return val, err
 })

 return resp.(string), err
}

超時控制

當使用Do函數時,如果query長時間未響應(這里假設qeury不具備超時能力),那么所有的goroutine都會被阻塞并等待,利用DoChan+select可以實現超時邏輯

var g = new(singleflight.Group)

func search(ctx context.Context, word string) (string, error) {
    ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
 defer cancel()
    
 result := g.DoChan(word, func() (interface{}, error) {
  return query(ctx, word)
 })

 select {
 case r := <-result:
  return r.Val.(string), r.Err
 case <-ctx.Done():
  return "", ctx.Err()
 }
}

使用場景

預防緩存穿透

在高并發的狀態下,一般會給熱點數據設置緩存。但數據第一次訪問或者緩存失效的狀態下,如果直接去查詢數據庫,會給數據庫造成極大壓力,甚至直接打爆數據庫。

以上各種分享中被反復提到的場景,但!注意!使用singleflight就一勞永逸了么,不是的,在大規模集群下可能有數百臺機器,當處在高并發狀態時,即使每臺機器只發起一個請求,也足以打爆你的數據庫!結合實際,搭配適當的緩存策略、數據預熱、限流等手段才能避免潛在的風險。挖個坑,以后有機會聊聊這些問題

總結

本篇作為一個例子,給你講透典型的Go并發控制的姊妹篇,講述了另外一種并發控制模型,并介紹了開源庫golang.org/x/sync/singleflight。

當由一個goroutine并發向下發展成多個goroutine時,使用golang.org/x/sync/errgroup

當多個goroutine并發向下抑制成一個goroutine時,使用golang.org/x/sync/singleflight

責任編輯:武曉燕 來源: 涼涼的知識庫
相關推薦

2023-01-30 15:41:10

Channel控制并發

2017-08-21 10:56:55

MySQL并發控制

2022-10-17 08:07:13

Go 語言并發編程

2009-09-24 14:43:53

Hibernate樂觀

2022-12-12 09:07:06

Redis并發限流

2021-07-28 08:32:58

Go并發Select

2025-06-03 02:00:00

2024-04-30 10:29:46

前端開發h5開發函數

2013-05-28 09:43:38

GoGo語言并發模式

2021-07-15 23:18:48

Go語言并發

2023-02-10 09:40:36

Go語言并發

2023-12-21 07:09:32

Go語言任務

2023-12-29 08:10:41

Go并發開發

2016-10-28 17:39:47

phpgolangcoroutine

2021-07-30 07:28:15

WorkerPoolGo語言

2023-11-27 18:07:05

Go并發編程

2024-07-08 00:01:00

GPM模型調度器

2025-05-22 09:01:28

2025-06-17 09:32:15

2021-07-06 14:47:30

Go 開發技術
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲一区 中文字幕 | 国产日韩精品久久 | 青青草av网站 | 亚洲电影成人 | 一区二区三区视频 | 日韩不卡一区二区三区 | 国产日韩精品一区二区三区 | 久久91| 国产日韩一区二区三免费高清 | 亚洲国产精品视频 | 国产成人精品一区二区三区四区 | 国产三级网站 | 国产91网站在线观看 | 国产激情自拍视频 | 亚洲国产成人精品久久 | 夜夜久久| 在线高清免费观看视频 | 黄a免费看 | 国产欧美视频一区二区三区 | 中文字幕在线一区 | 亚洲日本一区二区三区四区 | 2022精品国偷自产免费观看 | 天堂中文在线播放 | 成年人视频在线免费观看 | 能看的av| 欧美一区二区在线播放 | 9999国产精品欧美久久久久久 | 久艹av | 中文成人在线 | 色视频www在线播放国产人成 | 日韩一区二区三区在线视频 | 亚洲综合色视频在线观看 | 日本精品视频一区二区 | 成年人网站免费 | 日本精品国产 | 亚洲国产精久久久久久久 | 色网站入口 | 亚洲欧美中文日韩在线v日本 | 亚洲 欧美 日韩在线 | 日韩中文字幕 | 日韩一区欧美一区 |