成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Go 分布式令牌桶限流 + 兜底保障

開發(fā) 后端 分布式
單位時(shí)間按照一定速率勻速的生產(chǎn) token 放入桶內(nèi),直到達(dá)到桶容量上限。處理請(qǐng)求,每次嘗試獲取一個(gè)或多個(gè)令牌,如果拿到則處理請(qǐng)求,失敗則拒絕請(qǐng)求。

 

本文轉(zhuǎn)載自微信公眾號(hào)「微服務(wù)實(shí)踐」,作者歐陽(yáng)安。轉(zhuǎn)載本文請(qǐng)聯(lián)系微服務(wù)實(shí)踐公眾號(hào)。

上篇文章提到固定時(shí)間窗口限流無法處理突然請(qǐng)求洪峰情況,本文講述的令牌桶線路算法則可以比較好的處理此場(chǎng)景。

工作原理

單位時(shí)間按照一定速率勻速的生產(chǎn) token 放入桶內(nèi),直到達(dá)到桶容量上限。

處理請(qǐng)求,每次嘗試獲取一個(gè)或多個(gè)令牌,如果拿到則處理請(qǐng)求,失敗則拒絕請(qǐng)求。

優(yōu)缺點(diǎn)

優(yōu)點(diǎn)

可以有效處理瞬間的突發(fā)流量,桶內(nèi)存量 token 即可作為流量緩沖區(qū)平滑處理突發(fā)流量。

缺點(diǎn)

實(shí)現(xiàn)較為復(fù)雜。

代碼實(shí)現(xiàn)

  1. core/limit/tokenlimit.go 

分布式環(huán)境下考慮使用 redis 作為桶和令牌的存儲(chǔ)容器,采用 lua 腳本實(shí)現(xiàn)整個(gè)算法流程。

redis lua 腳本

  1. -- 每秒生成token數(shù)量即token生成速度 
  2. local rate = tonumber(ARGV[1]) 
  3. -- 桶容量 
  4. local capacity = tonumber(ARGV[2]) 
  5. -- 當(dāng)前時(shí)間戳 
  6. local now = tonumber(ARGV[3]) 
  7. -- 當(dāng)前請(qǐng)求token數(shù)量 
  8. local requested = tonumber(ARGV[4]) 
  9. -- 需要多少秒才能填滿桶 
  10. local fill_time = capacity/rate 
  11. -- 向下取整,ttl為填滿時(shí)間的2倍 
  12. local ttl = math.floor(fill_time*2) 
  13. -- 當(dāng)前時(shí)間桶容量 
  14. local last_tokens = tonumber(redis.call("get", KEYS[1])) 
  15. -- 如果當(dāng)前桶容量為0,說明是第一次進(jìn)入,則默認(rèn)容量為桶的最大容量 
  16. if last_tokens == nil then 
  17. last_tokens = capacity 
  18. end 
  19. -- 上一次刷新的時(shí)間 
  20. local last_refreshed = tonumber(redis.call("get", KEYS[2])) 
  21. -- 第一次進(jìn)入則設(shè)置刷新時(shí)間為0 
  22. if last_refreshed == nil then 
  23. last_refreshed = 0 
  24. end 
  25. -- 距離上次請(qǐng)求的時(shí)間跨度 
  26. local delta = math.max(0, now-last_refreshed) 
  27. -- 距離上次請(qǐng)求的時(shí)間跨度,總共能生產(chǎn)token的數(shù)量,如果超多最大容量則丟棄多余的token 
  28. local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) 
  29. -- 本次請(qǐng)求token數(shù)量是否足夠 
  30. local allowed = filled_tokens >= requested 
  31. -- 桶剩余數(shù)量 
  32. local new_tokens = filled_tokens 
  33. -- 允許本次token申請(qǐng),計(jì)算剩余數(shù)量 
  34. if allowed then 
  35. new_tokens = filled_tokens - requested 
  36. end 
  37. -- 設(shè)置剩余token數(shù)量 
  38. redis.call("setex", KEYS[1], ttl, new_tokens) 
  39. -- 設(shè)置刷新時(shí)間 
  40. redis.call("setex", KEYS[2], ttl, now) 
  41.  
  42. return allowed 

令牌桶限流器定義

  1. type TokenLimiter struct { 
  2.     // 每秒生產(chǎn)速率 
  3.     rate int 
  4.     // 桶容量 
  5.     burst int 
  6.     // 存儲(chǔ)容器 
  7.     store *redis.Redis 
  8.     // redis key 
  9.     tokenKey       string 
  10.     // 桶刷新時(shí)間key 
  11.     timestampKey   string 
  12.     // lock 
  13.     rescueLock     sync.Mutex 
  14.     // redis健康標(biāo)識(shí) 
  15.     redisAlive     uint32 
  16.     // redis故障時(shí)采用進(jìn)程內(nèi) 令牌桶限流器 
  17.     rescueLimiter  *xrate.Limiter 
  18.     // redis監(jiān)控探測(cè)任務(wù)標(biāo)識(shí) 
  19.     monitorStarted bool 
  20.  
  21. func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter { 
  22.     tokenKey := fmt.Sprintf(tokenFormat, key
  23.     timestampKey := fmt.Sprintf(timestampFormat, key
  24.  
  25.     return &TokenLimiter{ 
  26.         rate:          rate, 
  27.         burst:         burst, 
  28.         store:         store, 
  29.         tokenKey:      tokenKey, 
  30.         timestampKey:  timestampKey, 
  31.         redisAlive:    1, 
  32.         rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst), 
  33.     } 

獲取令牌

  1. func (lim *TokenLimiter) reserveN(now time.Time, n int) bool { 
  2.     // 判斷redis是否健康 
  3.     // redis故障時(shí)采用進(jìn)程內(nèi)限流器 
  4.     // 兜底保障 
  5.     if atomic.LoadUint32(&lim.redisAlive) == 0 { 
  6.         return lim.rescueLimiter.AllowN(now, n) 
  7.     } 
  8.     // 執(zhí)行腳本獲取令牌 
  9.     resp, err := lim.store.Eval( 
  10.         script, 
  11.         []string{ 
  12.             lim.tokenKey, 
  13.             lim.timestampKey, 
  14.         }, 
  15.         []string{ 
  16.             strconv.Itoa(lim.rate), 
  17.             strconv.Itoa(lim.burst), 
  18.             strconv.FormatInt(now.Unix(), 10), 
  19.             strconv.Itoa(n), 
  20.         }) 
  21.     // redis allowed == false 
  22.     // Lua boolean false -> r Nil bulk reply 
  23.     // 特殊處理key不存在的情況 
  24.     if err == redis.Nil { 
  25.         return false 
  26.     } else if err != nil { 
  27.         logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err) 
  28.         // 執(zhí)行異常,開啟redis健康探測(cè)任務(wù) 
  29.         // 同時(shí)采用進(jìn)程內(nèi)限流器作為兜底 
  30.         lim.startMonitor() 
  31.         return lim.rescueLimiter.AllowN(now, n) 
  32.     } 
  33.  
  34.     code, ok := resp.(int64) 
  35.     if !ok { 
  36.         logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp) 
  37.         lim.startMonitor() 
  38.         return lim.rescueLimiter.AllowN(now, n) 
  39.     } 
  40.  
  41.     // redis allowed == true 
  42.     // Lua boolean true -> r integer reply with value of 1 
  43.     return code == 1 

redis 故障時(shí)兜底策略

兜底策略的設(shè)計(jì)考慮得非常細(xì)節(jié),當(dāng) redis 不可用的時(shí)候,啟動(dòng)單機(jī)版的 ratelimit 做備用限流,確保基本的限流可用,服務(wù)不會(huì)被沖垮。

  1. // 開啟redis健康探測(cè) 
  2. func (lim *TokenLimiter) startMonitor() { 
  3.     lim.rescueLock.Lock() 
  4.     defer lim.rescueLock.Unlock() 
  5.     // 防止重復(fù)開啟 
  6.     if lim.monitorStarted { 
  7.         return 
  8.     } 
  9.  
  10.     // 設(shè)置任務(wù)和健康標(biāo)識(shí) 
  11.     lim.monitorStarted = true 
  12.     atomic.StoreUint32(&lim.redisAlive, 0) 
  13.     // 健康探測(cè) 
  14.     go lim.waitForRedis() 
  15.  
  16. // redis健康探測(cè)定時(shí)任務(wù) 
  17. func (lim *TokenLimiter) waitForRedis() { 
  18.     ticker := time.NewTicker(pingInterval) 
  19.     // 健康探測(cè)成功時(shí)回調(diào)此函數(shù) 
  20.     defer func() { 
  21.         ticker.Stop() 
  22.         lim.rescueLock.Lock() 
  23.         lim.monitorStarted = false 
  24.         lim.rescueLock.Unlock() 
  25.     }() 
  26.  
  27.     for range ticker.C { 
  28.         // ping屬于redis內(nèi)置健康探測(cè)命令 
  29.         if lim.store.Ping() { 
  30.             // 健康探測(cè)成功,設(shè)置健康標(biāo)識(shí) 
  31.             atomic.StoreUint32(&lim.redisAlive, 1) 
  32.             return 
  33.         } 
  34.     } 

項(xiàng)目地址

https://github.com/zeromicro/go-zero

歡迎使用 go-zero 并 star 支持我們!

 

責(zé)任編輯:武曉燕 來源: 微服務(wù)實(shí)踐
相關(guān)推薦

2023-08-10 08:00:42

令牌限流器計(jì)數(shù)器

2020-10-16 09:34:39

漏桶令牌桶限流

2022-03-07 08:14:27

并發(fā)分布式

2022-03-11 10:03:40

分布式鎖并發(fā)

2024-04-08 11:04:03

2018-01-12 16:51:48

華為

2018-06-11 11:12:09

秒殺限流分布式

2018-06-19 09:35:51

分布式系統(tǒng)限流

2023-07-11 10:24:00

分布式限流算法

2025-05-23 10:10:00

限流算法系統(tǒng)Go

2023-04-06 08:52:54

Sentinel分布式系統(tǒng)

2022-12-21 08:40:05

限流器分布式限流

2024-01-26 07:49:49

Go分布式鏈路

2023-11-02 09:33:31

Go語言Raft算法

2019-10-10 09:16:34

Zookeeper架構(gòu)分布式

2017-09-01 05:35:58

分布式計(jì)算存儲(chǔ)

2019-06-19 15:40:06

分布式鎖RedisJava

2023-05-29 14:07:00

Zuul網(wǎng)關(guān)系統(tǒng)

2023-08-04 07:28:00

2019-08-08 09:57:53

分布式服務(wù)限流
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 国产婷婷 | 午夜资源| 色av一区二区 | 欧美五月婷婷 | 日韩久久久一区二区 | 欧美a级成人淫片免费看 | 一级看片| 国产精品日韩欧美一区二区三区 | 中文区中文字幕免费看 | 成人在线视频网 | 嫩草视频免费 | 国产精品视频不卡 | 国产福利91精品一区二区三区 | 亚洲香蕉 | 91一区二区 | 天天干天天爱天天 | 中国美女撒尿txxxxx视频 | 中文字幕成人网 | 欧美日韩a | 一区二区三区四区毛片 | 狠狠婷婷综合久久久久久妖精 | 国产精品久久久久国产a级 欧美日韩国产免费 | 欧美一区二区三区在线观看 | 91综合网 | 久久er99热精品一区二区 | 亚洲欧美激情精品一区二区 | 精品久久久久久国产 | 亚洲成人av在线播放 | 羞羞的视频在线看 | 国产伦精品一区二区三区精品视频 | 一级aaaa毛片 | 成人在线视频一区 | 久久国产亚洲 | 国产99久久精品一区二区永久免费 | 亚洲系列第一页 | 日本高清中文字幕 | 成人区一区二区三区 | 亚洲精品一二三 | 毛片网在线观看 | 亚洲视频在线观看 | 日本中文字幕日韩精品免费 |