為什么要限流?常見的限流算法有哪些?
實(shí)際開發(fā)中,當(dāng)業(yè)務(wù)流量過大時(shí),為了保護(hù)下游服務(wù),我們通常會(huì)做一些預(yù)防性的工作,今天我們就一起來聊聊限流!
一、為什么需要限流?
在實(shí)際應(yīng)用中,每個(gè)系統(tǒng)或者服務(wù)都有其處理能力的極限(瓶頸),即便是微服務(wù)中有集群和分布式的夾持,也不能保證系統(tǒng)能應(yīng)對(duì)任何大小的流量,因此,系統(tǒng)為了自保,需要對(duì)處理能力范圍以外的流量進(jìn)行“特殊照顧”(比如,丟棄請(qǐng)求或者延遲處理),從而避免系統(tǒng)卡死、崩潰或不可用等情況,保證系統(tǒng)整體服務(wù)可用。
二、限流算法
1.令牌桶算法
令牌桶算法(Token Bucket Algorithm)是計(jì)算機(jī)網(wǎng)絡(luò)和電信領(lǐng)域中常用的一種簡(jiǎn)單方法,用于流量整形和速率限制。它旨在控制系統(tǒng)在某個(gè)時(shí)間段內(nèi)可以發(fā)送或接收的數(shù)據(jù)量,確保流量符合指定的速率。
令牌桶算法的核心思路:系統(tǒng)按照固定速度往桶里加入令牌,如果桶滿則停止添加。當(dāng)有請(qǐng)求到來時(shí),會(huì)嘗試從桶里拿走一個(gè)令牌,取到令牌才能繼續(xù)進(jìn)行請(qǐng)求處理,沒有令牌就拒絕服務(wù)。示意圖如下:
令牌桶法的幾個(gè)特點(diǎn):
- 令牌桶容量固定,即系統(tǒng)的處理能力閾值
- 令牌放入桶內(nèi)的速度固定
- 令牌從桶內(nèi)拿出的速度根據(jù)實(shí)際請(qǐng)求量而定,每個(gè)請(qǐng)求對(duì)應(yīng)一個(gè)令牌
- 當(dāng)桶內(nèi)沒有令牌時(shí),請(qǐng)求進(jìn)入等待或者被拒絕
令牌桶算法主要用于應(yīng)對(duì)突發(fā)流量的場(chǎng)景,在 Java語(yǔ)言中使用最多的是 Google的 Guava RateLimiter,下面舉幾個(gè)例子來說明它是如何應(yīng)對(duì)突發(fā)流量:
示例1
import java.util.concurrent.TimeUnit;
public class RateLimit {
public static void main(String[] args) {
RateLimiter limiter = RateLimiter.create(5); // 每秒創(chuàng)建5個(gè)令牌
System.out.println("acquire(5), wait " + limiter.acquire(5) + " s"); // 全部取走 5個(gè)令牌
System.out.println("acquire(1), wait " + limiter.acquire(1) + " s");// 獲取1個(gè)令牌
boolean result = limiter.tryAcquire(1, 0, TimeUnit.SECONDS); // 嘗試獲取1個(gè)令牌,獲取不到則直接返回
System.out.println("tryAcquire(1), result: " + result);
}
}
示例代碼運(yùn)行結(jié)果如下:
acquire(5), wait 0.0 s
acquire(1), wait 0.971544 s
tryAcquire(1), result: false
桶中共有 5個(gè)令牌,acquire(5)返回0 代表令牌充足無需等待,當(dāng)桶中令牌不足,acquire(1)等待一段時(shí)間才獲取到,當(dāng)令牌不足時(shí),tryAcquire(1)不等待直接返回。
示例2
import com.google.common.util.concurrent.RateLimiter;
public class RateLimit {
public static void main(String[] args) {
RateLimiter limiter = RateLimiter.create(5);
System.out.println("acquire(10), wait " + limiter.acquire(10) + " s");
System.out.println("acquire(1), wait " + limiter.acquire(1) + " s");
}
}
示例代碼運(yùn)行結(jié)果如下:
acquire(10), wait 0.0 s
acquire(1), wait 1.974268 s
桶中共有 5個(gè)令牌,acquire(10)返回0,和示例似乎有點(diǎn)沖突,其實(shí),這里返回0 代表應(yīng)對(duì)了突發(fā)流量,但是 acquire(1)
卻等待了 1.974268秒,這代表 acquire(1)的等待是時(shí)間包含了應(yīng)對(duì)突然流量多出來的 5個(gè)請(qǐng)求,即 1.974268 = 1 + 0.974268。
為了更好的驗(yàn)證示例2的猜想,我們看示例3:
示例3
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
public class RateLimit {
public static void main(String[] args) throws InterruptedException {
RateLimiter limiter = RateLimiter.create(5);
System.out.println("acquire(5), wait " + limiter.acquire(5) + " s");
TimeUnit.SECONDS.sleep(1);
System.out.println("acquire(5), wait " + limiter.acquire(5) + " s");
System.out.println("acquire(1), wait " + limiter.acquire(1) + " s");
}
}
示例代碼運(yùn)行結(jié)果如下:
acquire(5), wait 0.0 s
acquire(5), wait 0.0 s
acquire(1), wait 0.966104 s
桶中共有 5個(gè)令牌,acquire(5)返回0 代表令牌充足無需等待,接著睡眠 1s,這樣系統(tǒng)又可以增加5個(gè)令牌,因此,再次 acquire(5)令牌充足返回0 無需等待,acquire(1)需要等待一段時(shí)間才能獲取令牌。
2.漏桶算法
漏桶算法(Leaky Bucket Algorithm)的核心思路是:水(請(qǐng)求)進(jìn)入固定容量的漏桶,漏桶的水以固定的速度流出,當(dāng)水流入漏桶的速度過大導(dǎo)致漏桶滿而直接溢出,然后拒絕請(qǐng)求。示意圖如下:
下面為一個(gè) Java版本的漏桶算法示例:
import java.util.concurrent.*;
public class LeakyBucket {
private final int capacity; // 桶的容量
private final int rate; // 出水速率
private int water; // 漏斗中的水量
private long lastLeakTime; // 上一次漏水的時(shí)間
public LeakyBucket(int capacity, int rate) {
this.capacity = capacity;
this.rate = rate;
this.water = 0;
this.lastLeakTime = System.currentTimeMillis();
}
public synchronized boolean allowRequest(int tokens) {
leak(); // 漏水
if (water + tokens <= capacity) {
water += tokens; // 漏斗容量未滿,可以加水
return true;
} else {
return false; // 漏斗容量已滿,無法加水
}
}
private void leak() {
long currentTime = System.currentTimeMillis();
long timeElapsed = currentTime - lastLeakTime;
int waterToLeak = (int) (timeElapsed * rate / 1000); // 計(jì)算經(jīng)過的時(shí)間內(nèi)應(yīng)該漏掉的水量
water = Math.max(0, water - waterToLeak); // 漏水
lastLeakTime = currentTime; // 更新上一次漏水時(shí)間
}
public static void main(String[] args) {
LeakyBucket bucket = new LeakyBucket(10, 2); // 容量為10,速率為2令牌/秒
int[] packets = {2, 3, 1, 5, 2, 10}; // 要發(fā)送的數(shù)據(jù)包大小
for (int packet : packets) {
if (bucket.allowRequest(packet)) {
System.out.println("發(fā)送 " + packet + " 字節(jié)的數(shù)據(jù)包");
} else {
System.out.println("漏桶已滿,無法發(fā)送數(shù)據(jù)包");
}
try {
TimeUnit.SECONDS.sleep(1); // 模擬發(fā)送間隔
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
漏桶算法的幾個(gè)特點(diǎn):
- 漏桶容量固定
- 流入(請(qǐng)求)速度隨意
- 流出(處理請(qǐng)求)速度固定
- 桶滿則溢出,即拒絕新請(qǐng)求(限流)
3.計(jì)數(shù)器算法
計(jì)數(shù)器是最簡(jiǎn)單的限流方式,主要用來限制總并發(fā)數(shù),主要通過一個(gè)支持原子操作的計(jì)數(shù)器來累計(jì) 1秒內(nèi)的請(qǐng)求次數(shù),當(dāng) 秒內(nèi)計(jì)數(shù)達(dá)到限流閾值時(shí)觸發(fā)拒絕策略。每過 1秒,計(jì)數(shù)器重置為 0開始重新計(jì)數(shù)。比如數(shù)據(jù)庫(kù)連接池大小、線程池大小、程序訪問并發(fā)數(shù)等都是使用計(jì)數(shù)器算法。
如下代碼就是一個(gè)Java版本的計(jì)數(shù)器算法示例,通過一個(gè)原子計(jì)算器 AtomicInteger來記錄總數(shù),如果請(qǐng)求數(shù)大于總數(shù)就拒絕請(qǐng)求,否則正常處理請(qǐng)求:
import java.util.concurrent.atomic.AtomicInteger;
public class CounterRateLimiter {
private final int limit; // 限流閾值
private final long windowSizeMs; // 時(shí)間窗口大小(毫秒)
private AtomicInteger counter; // 請(qǐng)求計(jì)數(shù)器
private long lastResetTime; // 上次重置計(jì)數(shù)器的時(shí)間
public CounterRateLimiter(int limit, long windowSizeMs) {
this.limit = limit;
this.windowSizeMs = windowSizeMs;
this.counter = new AtomicInteger(0);
this.lastResetTime = System.currentTimeMillis();
}
public boolean allowRequest() {
long currentTime = System.currentTimeMillis();
// 如果當(dāng)前時(shí)間超出了時(shí)間窗口,重置計(jì)數(shù)器
if (currentTime - lastResetTime > windowSizeMs) {
counter.set(0);
lastResetTime = currentTime;
}
// 檢查計(jì)數(shù)器是否超過了限流閾值
return counter.incrementAndGet() <= limit;
}
public static void main(String[] args) {
CounterRateLimiter rateLimiter = new CounterRateLimiter(3, 1000); // 每秒最多處理3個(gè)請(qǐng)求
for (int i = 0; i < 10; i++) {
if (rateLimiter.allowRequest()) {
System.out.println("允許請(qǐng)求 " + (i + 1));
} else {
System.out.println("限流,拒絕請(qǐng)求 " + (i + 1));
}
try {
Thread.sleep(200); // 模擬請(qǐng)求間隔
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4.滑動(dòng)窗口算法
滑動(dòng)窗口算法是一種常用于限流和統(tǒng)計(jì)的算法。它基于一個(gè)固定大小的時(shí)間窗口,在這個(gè)時(shí)間窗口內(nèi)統(tǒng)計(jì)請(qǐng)求的數(shù)量,并根據(jù)設(shè)定的閾值來控制流量。比如,TCP協(xié)議就使用了該算法
以下是一個(gè)簡(jiǎn)單的 Java 示例實(shí)現(xiàn)滑動(dòng)窗口算法:
import java.util.concurrent.atomic.AtomicInteger;
public class SlidingWindowRateLimiter {
private final int limit; // 限流閾值
private final long windowSizeMs; // 時(shí)間窗口大小(毫秒)
private final AtomicInteger[] window; // 滑動(dòng)窗口
private long lastUpdateTime; // 上次更新窗口的時(shí)間
private int pointer; // 指向當(dāng)前時(shí)間窗口的指針
public SlidingWindowRateLimiter(int limit, long windowSizeMs, int granularity) {
this.limit = limit;
this.windowSizeMs = windowSizeMs;
this.window = new AtomicInteger[granularity];
for (int i = 0; i < granularity; i++) {
window[i] = new AtomicInteger(0);
}
this.lastUpdateTime = System.currentTimeMillis();
this.pointer = 0;
}
public synchronized boolean allowRequest() {
long currentTime = System.currentTimeMillis();
// 計(jì)算時(shí)間窗口的起始位置
long windowStart = currentTime - windowSizeMs + 1;
// 更新窗口中過期的計(jì)數(shù)器
while (lastUpdateTime < windowStart) {
lastUpdateTime++;
window[pointer].set(0);
pointer = (pointer + 1) % window.length;
}
// 檢查窗口內(nèi)的總計(jì)數(shù)是否超過限流閾值
int totalRequests = 0;
for (AtomicInteger counter : window) {
totalRequests += counter.get();
}
if (totalRequests >= limit) {
return false; // 超過限流閾值,拒絕請(qǐng)求
} else {
window[pointer].incrementAndGet(); // 記錄新的請(qǐng)求
return true; // 允許請(qǐng)求
}
}
public static void main(String[] args) {
SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter(10, 1000, 10); // 每秒最多處理10個(gè)請(qǐng)求
for (int i = 0; i < 20; i++) {
if (rateLimiter.allowRequest()) {
System.out.println("允許請(qǐng)求 " + (i + 1));
} else {
System.out.println("限流,拒絕請(qǐng)求 " + (i + 1));
}
try {
Thread.sleep(100); // 模擬請(qǐng)求間隔
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
5.Redis + Lua分布式限流
Redis + Lua屬于分布式環(huán)境下的限流方案,主要利用的是Lua在 Redis中運(yùn)行能保證原子性。如下示例為一個(gè)簡(jiǎn)單的Lua限流腳本:
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local current = tonumber(redis.call('get', key) or "0")
if current + 1 > limit then
return 0
else
redis.call("INCRBY", key, 1)
redis.call("EXPIRE", key, 1)
return 1
end
腳本解釋:
- KEYS[1]:限流的鍵名,注意,在Lua中,下角標(biāo)是從 1開始
- ARGV[1]:限流的最大值
- redis.call(‘get’, key):獲取當(dāng)前限流計(jì)數(shù)。
- redis.call(‘INCRBY’, key, 1):增加限流計(jì)數(shù)。
- redis.call(‘EXPIRE’, key, 1):設(shè)置鍵的過期時(shí)間為 1 秒。
6.三方限流工具
當(dāng)我們自己無法實(shí)現(xiàn)比較好的限流方案時(shí),成熟的三方框架就是我們比較好的選擇,下面列出兩個(gè) Java語(yǔ)言比較優(yōu)秀的框架。
(1) resilience4j
resilience4j 是一個(gè)輕量級(jí)的容錯(cuò)庫(kù),提供了限流、熔斷、重試等功能。限流模塊 RateLimiter 提供了靈活的限流配置,其優(yōu)點(diǎn)如下:
- 集成了多種容錯(cuò)機(jī)制
- 支持注解方式配置
- 易于與 Spring Boot集成
(2) Sentinel
Sentinel 是阿里巴巴開源的一個(gè)功能全面的流量防護(hù)框架,提供限流、熔斷、系統(tǒng)負(fù)載保護(hù)等多種功能。其優(yōu)點(diǎn)如下:
- 功能全面,適用于多種場(chǎng)景
- 強(qiáng)大的監(jiān)控和控制臺(tái)
- 與 Spring Cloud 深度集成
三、總結(jié)
本文講述了以下幾種限流方式:
- 計(jì)數(shù)器
- 滑動(dòng)窗口
- 漏桶
- 令牌桶
- Redis + Lua 分布式限流
- 三方限流工具
上面的限流方式,主要是針對(duì)服務(wù)器進(jìn)行限流,除此之外,我們也可以對(duì)客戶端進(jìn)行限流, 比如驗(yàn)證碼,答題,排隊(duì)等方式。
另外,我們也會(huì)在一些中間件上進(jìn)行限流,比如Apache、Tomcat、Nginx等。
在實(shí)際的開發(fā)中,限流場(chǎng)景略有差異,限流的維度也不一樣,比如,有的場(chǎng)景需要根據(jù)請(qǐng)求的 URL來限流,有的會(huì)對(duì) IP地址進(jìn)行限流、另外,設(shè)備ID、用戶ID 也是限流常用的維度,因此,我們需要結(jié)合真實(shí)業(yè)務(wù)場(chǎng)景靈活的使用限流方案。