讓我們一起賞析Singleflight設計
本文轉載自微信公眾號「Golang夢工廠」,作者AsongGo。轉載本文請聯系Golang夢工廠公眾號。
前言
哈嘍,大家好,我是asong。今天想與大家分享一下singleflight這個庫,singleflight僅僅只有100多行卻可以做到防止緩存擊穿,有點厲害哦!所以本文我們就一起來看一看他是怎么設計的~。
注意:本文基于 https://pkg.go.dev/golang.org/x/sync/singleflight進行分析。
緩存擊穿
什么是緩存擊穿
平常在高并發系統中,會出現大量的請求同時查詢一個key的情況,假如此時這個熱key剛好失效了,就會導致大量的請求都打到數據庫上面去,這種現象就是緩存擊穿。緩存擊穿和緩存雪崩有點像,但是又有一點不一樣,緩存雪崩是因為大面積的緩存失效,打崩了DB,而緩存擊穿則是指一個key非常熱點,在不停的扛著高并發,高并發集中對著這一個點進行訪問,如果這個key在失效的瞬間,持續的并發到來就會穿破緩存,直接請求到數據庫,就像一個完好無損的桶上鑿開了一個洞,造成某一時刻數據庫請求量過大,壓力劇增!
如何解決
- 方法一
我們簡單粗暴點,直接讓熱點數據永遠不過期,定時任務定期去刷新數據就可以了。不過這樣設置需要區分場景,比如某寶首頁可以這么做。
- 方法二
為了避免出現緩存擊穿的情況,我們可以在第一個請求去查詢數據庫的時候對他加一個互斥鎖,其余的查詢請求都會被阻塞住,直到鎖被釋放,后面的線程進來發現已經有緩存了,就直接走緩存,從而保護數據庫。但是也是由于它會阻塞其他的線程,此時系統吞吐量會下降。需要結合實際的業務去考慮是否要這么做。
- 方法三
方法三就是singleflight的設計思路,也會使用互斥鎖,但是相對于方法二的加鎖粒度會更細,這里先簡單總結一下singleflight的設計原理,后面看源碼在具體分析。
singleflightd的設計思路就是將一組相同的請求合并成一個請求,使用map存儲,只會有一個請求到達mysql,使用sync.waitgroup包進行同步,對所有的請求返回相同的結果。
截屏2021-07-14 下午8.30.56
源碼賞析
已經迫不及待了,直奔主題吧,下面我們一起來看看singleflight是怎么設計的。
數據結構
singleflight的結構定義如下:
- type Group struct {
- mu sync.Mutex // 互斥鎖,保證并發安全
- m map[string]*call // 存儲相同的請求,key是相同的請求,value保存調用信息。
- }
Group結構還是比較簡單的,只有兩個字段,m是一個map,key是相同請求的標識,value是用來保存調用信息,這個map是懶加載,其實就是在使用時才會初始化;mu是互斥鎖,用來保證m的并發安全。m存儲調用信息也是單獨封裝了一個結構:
- type call struct {
- wg sync.WaitGroup
- // 存儲返回值,在wg done之前只會寫入一次
- val interface{}
- // 存儲返回的錯誤信息
- err error
- // 標識別是否調用了Forgot方法
- forgotten bool
- // 統計相同請求的次數,在wg done之前寫入
- dups int
- // 使用DoChan方法使用,用channel進行通知
- chans []chan<- Result
- }
- // Dochan方法時使用
- type Result struct {
- Val interface{} // 存儲返回值
- Err error // 存儲返回的錯誤信息
- Shared bool // 標示結果是否是共享結果
- }
Do方法
- // 入參:key:標識相同請求,fn:要執行的函數
- // 返回值:v: 返回結果 err: 執行的函數錯誤信息 shard: 是否是共享結果
- func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
- // 代碼塊加鎖
- g.mu.Lock()
- // map進行懶加載
- if g.m == nil {
- // map初始化
- g.m = make(map[string]*call)
- }
- // 判斷是否有相同請求
- if c, ok := g.m[key]; ok {
- // 相同請求次數+1
- c.dups++
- // 解鎖就好了,只需要等待執行結果了,不會有寫入操作了
- g.mu.Unlock()
- // 已有請求在執行,只需要等待就好了
- c.wg.Wait()
- // 區分panic錯誤和runtime錯誤
- if e, ok := c.err.(*panicError); ok {
- panic(e)
- } else if c.err == errGoexit {
- runtime.Goexit()
- }
- return c.val, c.err, true
- }
- // 之前沒有這個請求,則需要new一個指針類型
- c := new(call)
- // sync.waitgroup的用法,只有一個請求運行,其他請求等待,所以只需要add(1)
- c.wg.Add(1)
- // m賦值
- g.m[key] = c
- // 沒有寫入操作了,解鎖即可
- g.mu.Unlock()
- // 唯一的請求該去執行函數了
- g.doCall(c, key, fn)
- return c.val, c.err, c.dups > 0
- }
這里是唯一有疑問的應該是區分panic和runtime錯誤部分吧,這個與下面的docall方法有關聯,看完docall你就知道為什么了。
docall
- // doCall handles the single call for a key.
- func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
- // 標識是否正常返回
- normalReturn := false
- // 標識別是否發生panic
- recovered := false
- defer func() {
- // 通過這個來判斷是否是runtime導致直接退出了
- if !normalReturn && !recovered {
- // 返回runtime錯誤信息
- c.err = errGoexit
- }
- c.wg.Done()
- g.mu.Lock()
- defer g.mu.Unlock()
- // 防止重復刪除key
- if !c.forgotten {
- delete(g.m, key)
- }
- // 檢測是否出現了panic錯誤
- if e, ok := c.err.(*panicError); ok {
- // 如果是調用了dochan方法,為了channel避免死鎖,這個panic要直接拋出去,不能recover住,要不就隱藏錯誤了
- if len(c.chans) > 0 {
- go panic(e) // 開一個寫成panic
- select {} // 保持住這個goroutine,這樣可以將panic寫入crash dump
- } else {
- panic(e)
- }
- } else if c.err == errGoexit {
- // runtime錯誤不需要做任何時,已經退出了
- } else {
- // 正常返回的話直接向channel寫入數據就可以了
- for _, ch := range c.chans {
- ch <- Result{c.val, c.err, c.dups > 0}
- }
- }
- }()
- // 使用匿名函數目的是recover住panic,返回信息給上層
- func() {
- defer func() {
- if !normalReturn {
- // 發生了panic,我們recover住,然后把錯誤信息返回給上層
- if r := recover(); r != nil {
- c.err = newPanicError(r)
- }
- }
- }()
- // 執行函數
- c.val, c.err = fn()
- // fn沒有發生panic
- normalReturn = true
- }()
- // 判斷執行函數是否發生panic
- if !normalReturn {
- recovered = true
- }
- }
這里來簡單描述一下為什么區分panic和runtime錯誤,不區分的情況下如果調用出現了恐慌,但是鎖沒有被釋放,會導致使用相同key的所有后續調用都出現了死鎖,具體可以查看這個issue:https://github.com/golang/go/issues/33519。
Dochan和Forget方法
- //異步返回
- // 入參數:key:標識相同請求,fn:要執行的函數
- // 出參數:<- chan 等待接收結果的channel
- func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
- // 初始化channel
- ch := make(chan Result, 1)
- g.mu.Lock()
- // 懶加載
- if g.m == nil {
- g.m = make(map[string]*call)
- }
- // 判斷是否有相同的請求
- if c, ok := g.m[key]; ok {
- //相同請求數量+1
- c.dups++
- // 添加等待的chan
- c.chans = append(c.chans, ch)
- g.mu.Unlock()
- return ch
- }
- c := &call{chans: []chan<- Result{ch}}
- c.wg.Add(1)
- g.m[key] = c
- g.mu.Unlock()
- // 開一個寫成調用
- go g.doCall(c, key, fn)
- // 返回這個channel等待接收數據
- return ch
- }
- // 釋放某個 key 下次調用就不會阻塞等待了
- func (g *Group) Forget(key string) {
- g.mu.Lock()
- if c, ok := g.m[key]; ok {
- c.forgotten = true
- }
- delete(g.m, key)
- g.mu.Unlock()
- }
注意事項
因為我們在使用singleflight時需要自己寫執行函數,所以如果我們寫的執行函數一直循環住了,就會導致我們的整個程序處于循環的狀態,積累越來越多的請求,所以在使用時,還是要注意一點的,比如這個例子:
- result, err, _ := d.singleGroup.Do(key, func() (interface{}, error) {
- for{
- // TODO
- }
- }
不過這個問題一般也不會發生,我們在日常開發中都會使用context控制超時。
總結
好啦,這篇文章就到這里啦。因為最近我在項目中也使用singleflight這個庫,所以就看了一下源碼實現,真的是厲害,這么短的代碼就實現了這么重要的功能,我怎么就想不到呢。。。。所以說還是要多讀一些源碼庫,真的能學到好多,真是應了那句話:你知道的越多,不知道的就越多!