成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Go-Zero 是如何實現令牌桶限流的?

開發 前端
生成的令牌放入令牌桶中存放,如果令牌桶滿了則多余的令牌會直接丟棄,當請求到達時,會嘗試從令牌桶中取令牌,取到了令牌的請求可以執行; 如果桶空了,那么嘗試取令牌的請求會被直接丟棄。

上一篇文章介紹了 如何實現計數器限流。主要有兩種實現方式,分別是固定窗口和滑動窗口,并且分析了 go-zero 采用固定窗口方式實現的源碼。

但是采用固定窗口實現的限流器會有兩個問題:

  1. 會出現請求量超出限制值兩倍的情況
  2. 無法很好處理流量突增問題

這篇文章來介紹一下令牌桶算法,可以很好解決以上兩個問題。

工作原理

算法概念如下:

  • 令牌以固定速率生成;
  • 生成的令牌放入令牌桶中存放,如果令牌桶滿了則多余的令牌會直接丟棄,當請求到達時,會嘗試從令牌桶中取令牌,取到了令牌的請求可以執行;
  • 如果桶空了,那么嘗試取令牌的請求會被直接丟棄。

圖片圖片

令牌桶算法既能夠將所有的請求平均分布到時間區間內,又能接受服務器能夠承受范圍內的突發請求,因此是目前使用較為廣泛的一種限流算法。

源碼實現

源碼分析我們還是以 go-zero 項目為例,首先來看生成令牌的部分,依然是使用 Redis 來實現。

// core/limit/tokenlimit.go

// 生成 token 速率
script = `local rate = tonumber(ARGV[1])
// 通容量
local capacity = tonumber(ARGV[2])
// 當前時間戳
local now = tonumber(ARGV[3])
// 請求數量
local requested = tonumber(ARGV[4])
// 需要多少秒才能把桶填滿
local fill_time = capacity/rate
// 向下取整,ttl 為填滿時間 2 倍
local ttl = math.floor(fill_time*2)
// 當前桶剩余容量,如果為 nil,說明第一次使用,賦值為桶最大容量
local last_tokens = tonumber(redis.call("get", KEYS[1]))
if last_tokens == nil then
    last_tokens = capacity
end

// 上次請求時間戳,如果為 nil 則賦值 0
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
if last_refreshed == nil then
    last_refreshed = 0
end

// 距離上一次請求的時間跨度
local delta = math.max(0, now-last_refreshed)
// 距離上一次請求的時間跨度能生成的 token 數量和桶內剩余 token 數量的和
// 與桶容量比較,取二者的小值
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
// 判斷請求數量和桶內 token 數量的大小
local allowed = filled_tokens >= requested
// 被請求消耗掉之后,更新剩余 token 數量
local new_tokens = filled_tokens
if allowed then
    new_tokens = filled_tokens - requested
end

// 更新 redis token
redis.call("setex", KEYS[1], ttl, new_tokens)
// 更新 redis 刷新時間
redis.call("setex", KEYS[2], ttl, now)

return allowed`

Redis 中主要保存兩個 key,分別是 token 數量和刷新時間。

核心思想就是比較兩次請求時間間隔內生成的 token 數量 + 桶內剩余 token 數量,和請求量之間的大小,如果滿足則允許,否則則不允許。

限流器初始化:

// A TokenLimiter controls how frequently events are allowed to happen with in one second.
type TokenLimiter struct {
    // 生成 token 速率
    rate           int
    // 桶容量
    burst          int
    store          *redis.Redis
    // 桶 key
    tokenKey       string
    // 桶刷新時間 key
    timestampKey   string
    rescueLock     sync.Mutex
    // redis 健康標識
    redisAlive     uint32
    // redis 健康監控啟動狀態
    monitorStarted bool
    // 內置單機限流器
    rescueLimiter  *xrate.Limiter
}

// NewTokenLimiter returns a new TokenLimiter that allows events up to rate and permits
// bursts of at most burst tokens.
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
    tokenKey := fmt.Sprintf(tokenFormat, key)
    timestampKey := fmt.Sprintf(timestampFormat, key)

    return &TokenLimiter{
        rate:          rate,
        burst:         burst,
        store:         store,
        tokenKey:      tokenKey,
        timestampKey:  timestampKey,
        redisAlive:    1,
        rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
    }
}

其中有一個變量 rescueLimiter,這是一個進程內的限流器。如果 Redis 發生故障了,那么就使用這個,算是一個保障,盡量避免系統被突發流量拖垮。

圖片圖片

提供了四個可調用方法:

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *TokenLimiter) Allow() bool {
    return lim.AllowN(time.Now(), 1)
}

// AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.
func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool {
    return lim.AllowNCtx(ctx, time.Now(), 1)
}

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
    return lim.reserveN(context.Background(), now, n)
}

// AllowNCtx reports whether n events may happen at time now with incoming context.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool {
    return lim.reserveN(ctx, now, n)
}

最終調用的都是 reverveN 方法:

func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool {
    // 判斷 Redis 健康狀態,如果 Redis 故障,則使用進程內限流器
    if atomic.LoadUint32(&lim.redisAlive) == 0 {
        return lim.rescueLimiter.AllowN(now, n)
    }

    // 執行限流腳本
    resp, err := lim.store.EvalCtx(ctx,
        script,
        []string{
            lim.tokenKey,
            lim.timestampKey,
        },
        []string{
            strconv.Itoa(lim.rate),
            strconv.Itoa(lim.burst),
            strconv.FormatInt(now.Unix(), 10),
            strconv.Itoa(n),
        })
    // redis allowed == false
    // Lua boolean false -> r Nil bulk reply
    if err == redis.Nil {
        return false
    }
    if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
        logx.Errorf("fail to use rate limiter: %s", err)
        return false
    }
    if err != nil {
        logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
        // 如果有異常的話,會啟動進程內限流
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    code, ok := resp.(int64)
    if !ok {
        logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    // redis allowed == true
    // Lua boolean true -> r integer reply with value of 1
    return code == 1
}

最后看一下進程內限流的啟動與恢復:

func (lim *TokenLimiter) startMonitor() {
    lim.rescueLock.Lock()
    defer lim.rescueLock.Unlock()

    // 需要加鎖保護,如果程序已經啟動了,直接返回,不要重復啟動
    if lim.monitorStarted {
        return
    }

    lim.monitorStarted = true
    atomic.StoreUint32(&lim.redisAlive, 0)

    go lim.waitForRedis()
}

func (lim *TokenLimiter) waitForRedis() {
    ticker := time.NewTicker(pingInterval)
    // 更新監控進程的狀態
    defer func() {
        ticker.Stop()
        lim.rescueLock.Lock()
        lim.monitorStarted = false
        lim.rescueLock.Unlock()
    }()

    for range ticker.C {
        // 對 redis 進行健康監測,如果 redis 服務恢復了
        // 則更新 redisAlive 標識,并退出 goroutine
        if lim.store.Ping() {
            atomic.StoreUint32(&lim.redisAlive, 1)
            return
        }
    }
}

參考文章:

責任編輯:武曉燕 來源: AlwaysBeta
相關推薦

2023-08-07 08:01:15

2022-01-12 12:46:32

Go限流保障

2023-08-28 08:00:45

2020-10-16 09:34:39

漏桶令牌桶限流

2025-05-26 04:00:00

2024-04-28 14:46:55

gozero微服務技巧

2025-05-23 10:10:00

限流算法系統Go

2024-11-05 15:02:41

2021-05-31 07:01:46

限流算法令牌

2024-02-04 10:08:34

2025-04-08 09:20:00

Sentinel限流微服務

2023-02-20 08:08:48

限流算法計數器算法令牌桶算法

2024-12-25 15:44:15

2025-01-21 08:31:12

2022-05-19 14:14:26

go語言限流算法

2023-10-16 16:00:27

Redis限流

2023-11-20 10:09:59

2021-03-30 10:46:42

SpringBoot計數器漏桶算法

2021-10-12 10:00:25

架構運維技術

2022-07-29 07:03:17

CSRF令牌密鑰
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 毛片一区二区三区 | 综合久久综合久久 | 亚洲最大看片网站 | 毛片一区二区三区 | 欧美三级免费观看 | 色婷婷精品| 中文字幕高清 | 99久久久无码国产精品 | 日日夜夜天天 | 午夜影院在线观看版 | 久久久亚洲精品视频 | 99精品一级欧美片免费播放 | 黑人巨大精品欧美一区二区一视频 | 亚洲精品成人av久久 | 免费视频一区二区 | 99re视频在线免费观看 | 一级黄a视频 | 啪啪毛片 | 国内精品免费久久久久软件老师 | 日韩久久久久 | 亚洲国产黄 | 黄色日批视频 | 亚洲精品99| 99久久久无码国产精品 | 色视频www在线播放国产人成 | 麻豆changesxxx国产 | 国产成年人小视频 | 日韩中文字幕网 | 精品无码久久久久久久动漫 | 国产精品自拍av | 国产乱码精品一区二区三区中文 | 久久一本 | www.成人久久 | 国产中文原创 | 国产精品久久在线 | 欧美一区二区在线 | 99精品视频在线 | 日韩精品 电影一区 亚洲 | 91精品国产综合久久婷婷香蕉 | av大片在线观看 | 久久精品91久久久久久再现 |