大家好,我是Java烘焙師。上一篇文章介紹了限流方案的使用和選型,本文接着講限流算法的原理。
常見的限流算法有:令牌桶、窗口計數、漏桶,前兩種在實際使用中最常見,因此重點介紹。限流算法是通用的,既可以在單機上實現,也可以藉助redis來實現分佈式限流。
首先,定義一下限流算法需實現的基本功能:
- tryAquire嘗試獲取通行許可:立刻返回結果,true代表允許、false代表被限流
- aquire阻塞式獲取通行許可:如果能獲取到則返回,否則阻塞等待(指定超時時間,或一直等待)。當觸發限流時藉助
java.util.concurrent.locks.Condition的await方法來阻塞等待,當恢復時通過signalAll來喚醒阻塞的線程
令牌桶算法
概念
- 令牌:相當於通行許可,需要多少qps或資源消耗速率,就需要獲取到多少令牌
- 桶:決定了令牌個數上限,即處理能力上限
- 令牌生成速率:每秒往桶裏補充多少令牌
- 可用令牌大小:不超過桶大小
令牌桶算法,類似於生產者/消費者模式。桶相當於隊列,令牌相當於任務,往桶裏補充令牌相當於生產者,獲取令牌相當於消費者。但不用真的生成線程、隊列等實體,而是通過比較請求與可用令牌的大小、以及計算下一個令牌的發放時間,來實現的。
特點
- 場景通用,允許突發流量
- 理解起來不夠直觀,但代碼實現相對簡單
關鍵流程
不用定時補充令牌,而是類似於懶加載的方式,在調用時重新計算可用的令牌數。
-
tryAquire方法:
- 先補充令牌:重新計算可用令牌數
- 比較可用令牌數、與請求許可數:如果前者大則扣減請求許可數、更新可用令牌數,並返回成功,否則返回失敗
-
aquire方法:
- 如果請求許可數 <= 可用令牌數:則扣減請求許可數、更新可用令牌數,並返回成功
- 否則,計算需要等待的時間:根據所需的令牌差額,乘以令牌的生成間隔時間,得到需等待的時間,即
(permits - tokens) * refillIntervalNanos。使用condition.awaitNanos方法,來實現阻塞等待。例如:桶容量300,每秒生成100個令牌,則令牌的生成間隔時間為 1秒 / 100 = 10毫秒,當前可用令牌為50,而請求許可為200,則需等待 (200 - 50) * 10毫秒 = 1500毫秒 - 重新補充令牌
- 上述步驟循環往復,直到獲取到令牌
最關鍵的補充令牌邏輯:
- 計算需補充的令牌個數:根據自從上次補充以來的時間間隔,除以令牌的生成間隔時間,得到需補充的令牌個數,即
(now - lastRefillTime) / refillIntervalNanos - 更新可用令牌大小:不能超過桶容量大小,即在 capacity和tokens + tokensToAdd中取較小的那個
- 通知喚醒等待的線程:通過condition.signalAll()實現
以下是帶註釋的代碼示例。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 令牌桶限流器 - 支持非阻塞和阻塞獲取令牌
*/
public class TokenBucketRateLimiter {
// 桶容量大小
private final long capacity;
// 令牌生成速率(每秒生成的令牌數)
private final double refillRate;
// 每個令牌的生成間隔(納秒),等於 1秒對應的納秒 除以 令牌生成速率
private final long refillIntervalNanos;
// 可用令牌大小
private double tokens;
// 最後刷新時間(納秒)
private long lastRefillTime;
// 鎖和條件變量,用於阻塞等待
private final ReentrantLock lock;
private final Condition condition;
/**
* 構造方法
*/
public TokenBucketRateLimiter(long capacity, double refillRate) {
if (capacity <= 0 || refillRate <= 0) {
throw new IllegalArgumentException("容量和生成速率必須大於0");
}
this.capacity = capacity;
this.refillRate = refillRate;
this.refillIntervalNanos = (long) (1_000_000_000.0 / refillRate);
this.tokens = capacity;
this.lastRefillTime = System.nanoTime();
this.lock = new ReentrantLock();
this.condition = lock.newCondition();
}
/**
* 非阻塞方式嘗試獲取一個令牌
* @return 成功獲取返回true,否則返回false
*/
public boolean tryAcquire() {
return tryAcquire(1);
}
/**
* 非阻塞方式嘗試獲取指定數量的令牌
* @param permits 請求的令牌數量
* @return 成功獲取返回true,否則返回false
*/
public boolean tryAcquire(long permits) {
if (permits <= 0) {
throw new IllegalArgumentException("請求的令牌數必須大於0");
}
if (permits > capacity) {
return false; // 請求超過桶容量,直接拒絕
}
lock.lock();
try {
// 重新補充令牌
refillTokens();
if (tokens >= permits) {
tokens -= permits;
return true;
}
return false;
} finally {
lock.unlock();
}
}
/**
* 阻塞方式獲取一個令牌
* @throws InterruptedException 如果等待過程中線程被中斷
*/
public void acquire() throws InterruptedException {
acquire(1);
}
/**
* 阻塞方式獲取指定數量的令牌(無限等待)
* @param permits 請求的令牌數量
* @throws InterruptedException 如果等待過程中線程被中斷
*/
public void acquire(long permits) throws InterruptedException {
if (permits <= 0) {
throw new IllegalArgumentException("請求的令牌數必須大於0");
}
if (permits > capacity) {
throw new IllegalArgumentException("請求的令牌數超過桶容量");
}
lock.lock();
try {
while (tokens < permits) {
// 計算需要等待的時間
double neededTokens = permits - tokens;
long waitNanos = (long) (neededTokens * refillIntervalNanos);
// 等待直到有足夠令牌或超時
condition.awaitNanos(waitNanos);
// 重新補充令牌
refillTokens();
}
tokens -= permits;
} finally {
lock.unlock();
}
}
/**
* 補充令牌(必須在鎖內調用)
*/
private void refillTokens() {
long now = System.nanoTime();
// 計算需要補充的令牌個數
double tokensToAdd = (now - lastRefillTime) / (double) refillIntervalNanos;
if (tokensToAdd > 0) {
// 可用令牌大小,最大不能超過桶容量大小
tokens = Math.min(capacity, tokens + tokensToAdd);
lastRefillTime = now;
// 如果有等待的線程,通知它們
if (tokens > 0) {
condition.signalAll();
}
}
}
}
窗口計數算法
概念
- 時間窗口:一段時間範圍,可以是秒,也可以是分鐘、小時、天等。窗口限流值,就代表該時間窗口最大允許的請求數
- 格子:把時間窗口拆分到更細粒度,比如把1秒拆成10個100毫秒,每個格子有開始時間、結束時間
- 格子計數,就是格子對應時間範圍的請求計數
- 窗口總計數,就是所有格子計數的總和
- 固定窗口計數:時間窗口固定,在臨界情況下可能出現qps尖刺。例如:每秒限流100,在第一個1秒格子的第900毫秒請求許可80次(通過),在第二個1秒格子的第200毫秒請求許可70次(通過),但是整體看900毫秒到1200毫秒,這300毫秒內已經請求了150次,超過了限流100的閾值、存在qps尖刺
- 滑動窗口計數:滑動窗口有起始時間點、終止時間點,隨着時間流逝一起動態往後推移,滑動窗口限流就是統計該時間範圍內的請求許可是否超過閾值
一般用環形數組實現,數組的每個元素就是格子計數器,循環複用。
因為滑動窗口使用了更細粒度的格子,所以限流qps相對於固定窗口,會更平滑。
特點
- 追求嚴格的限流,不允許突發流量
- 理解起來容易,但是代碼實現較複雜,需要維護更新格子計數
關鍵流程:
以滑動窗口計數為例,下面的關鍵流程是閲讀代碼後提煉出來的。
-
tryAquire方法:
- 首先更新過期的格子:把過期格子的計數清零,同時更新格子的開始、結束時間
- 檢查當前窗口總計數是否超過限制,是則返回false,代表被限流;否則更新當前格子的計數、以及窗口總計數,返回成功
-
aquire方法:
- 首先更新過期的格子,然後檢查當前窗口總計數是否超過限制,是則計算需要等待的時間,否則更新當前格子的計數、以及窗口總計數,返回成功
-
其中,計算需要等待的時間:
- 至少等待的時間:當前窗口中最早的格子多久後過期,則需要至少等待多久
- 至多等待的時間:如果窗口總計數減掉最早格子計數,仍然小於請求許可,説明即使最早格子過期,仍然會限流,則至多等待一個格子的時間,待下一輪循環再判斷
- 使用condition.awaitNanos方法,來實現阻塞等待
- 上述步驟循環往復,直到滿足條件
- 清理過期的格子時,如果發現窗口總計數小於限流值,則通知喚醒等待的線程,通過condition.signalAll()實現。
以下是帶註釋的代碼示例。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Arrays;
/**
* 滑動窗口限流器 - 支持非阻塞和阻塞獲取
*/
public class SlidingWindowRateLimiter {
// 時間窗口大小(毫秒)
private final long windowSizeMs;
// 窗口內允許的最大請求數
private final int maxRequests;
// 格子數量
private final int slotCount;
// 每個格子的時間長度(毫秒)
private final long slotSizeMs;
// 滑動窗口數組,每個元素代表一個時間格子的請求計數
private final AtomicInteger[] slots;
// 每個格子對應的開始時間(毫秒)
private final long[] slotStartTimes;
// 當前窗口的總請求數
private final AtomicInteger totalRequests;
// 鎖和條件變量,用於阻塞等待
private final ReentrantLock lock;
private final Condition condition;
/**
* 構造函數
* @param maxRequests 時間窗口內允許的最大請求數
* @param windowSizeMs 時間窗口大小(毫秒)
* @param slotCount 窗口分割的格子數量
*/
public SlidingWindowRateLimiter(int maxRequests, long windowSizeMs, int slotCount) {
if (maxRequests <= 0 || windowSizeMs <= 0 || slotCount <= 0) {
throw new IllegalArgumentException("參數必須大於0");
}
this.maxRequests = maxRequests;
this.windowSizeMs = windowSizeMs;
this.slotCount = slotCount;
this.slotSizeMs = windowSizeMs / slotCount;
// 初始化滑動窗口
this.slots = new AtomicInteger[slotCount];
this.slotStartTimes = new long[slotCount];
this.totalRequests = new AtomicInteger(0);
long currentTime = System.currentTimeMillis();
for (int i = 0; i < slotCount; i++) {
slots[i] = new AtomicInteger(0);
slotStartTimes[i] = currentTime - (slotCount - i) * slotSizeMs;
}
this.lock = new ReentrantLock();
this.condition = lock.newCondition();
}
/**
* 非阻塞方式嘗試獲取一個請求許可
* @return 成功獲取返回true,否則返回false
*/
public boolean tryAcquire() {
return tryAcquire(1);
}
/**
* 非阻塞方式嘗試獲取指定數量的請求許可
* @param permits 請求的許可數量
* @return 成功獲取返回true,否則返回false
*/
public boolean tryAcquire(int permits) {
if (permits <= 0) {
throw new IllegalArgumentException("請求的許可數必須大於0");
}
if (permits > maxRequests) {
return false; // 請求超過窗口容量,直接拒絕
}
lock.lock();
try {
// 滑動窗口,更新過期的格子
slideWindow();
// 檢查當前窗口是否超過限制
if (totalRequests.get() + permits <= maxRequests) {
// 獲取當前時間對應的格子索引
long currentTime = System.currentTimeMillis();
int currentSlotIndex = getCurrentSlotIndex(currentTime);
// 更新當前格子的計數和總計數
slots[currentSlotIndex].addAndGet(permits);
totalRequests.addAndGet(permits);
return true;
}
return false;
} finally {
lock.unlock();
}
}
/**
* 阻塞方式獲取一個請求許可(無限等待)
* @throws InterruptedException 如果等待過程中線程被中斷
*/
public void acquire() throws InterruptedException {
acquire(1);
}
/**
* 阻塞方式獲取指定數量的請求許可(無限等待)
* @param permits 請求的許可數量
* @throws InterruptedException 如果等待過程中線程被中斷
*/
public void acquire(int permits) throws InterruptedException {
if (permits <= 0) {
throw new IllegalArgumentException("請求的許可數必須大於0");
}
if (permits > maxRequests) {
throw new IllegalArgumentException("請求的許可數超過窗口容量");
}
lock.lock();
try {
while (!tryAcquireInternal(permits)) {
// 計算需要等待的時間(直到下一個格子過期)
long waitTimeMs = calculateWaitTime(permits);
if (waitTimeMs > 0) {
condition.await(waitTimeMs, TimeUnit.MILLISECONDS);
} else {
condition.await();
}
}
} finally {
lock.unlock();
}
}
/**
* 內部嘗試獲取方法(必須在鎖內調用)
*/
private boolean tryAcquireInternal(int permits) {
slideWindow();
if (totalRequests.get() + permits <= maxRequests) {
long currentTime = System.currentTimeMillis();
int currentSlotIndex = getCurrentSlotIndex(currentTime);
slots[currentSlotIndex].addAndGet(permits);
totalRequests.addAndGet(permits);
return true;
}
return false;
}
/**
* 滑動窗口,清理過期的格子
*/
private void slideWindow() {
long currentTime = System.currentTimeMillis();
long windowStartTime = currentTime - windowSizeMs;
for (int i = 0; i < slotCount; i++) {
// 如果格子開始時間在窗口開始時間之前,説明這個格子已過期
if (slotStartTimes[i] < windowStartTime) {
int expiredCount = slots[i].getAndSet(0);
if (expiredCount > 0) {
totalRequests.addAndGet(-expiredCount);
// 更新格子開始時間為當前週期開始時間
slotStartTimes[i] = currentTime - (slotCount - i - 1) * slotSizeMs;
}
}
}
// 如果有等待的線程,通知它們
if (totalRequests.get() < maxRequests) {
condition.signalAll();
}
}
/**
* 獲取當前時間對應的格子索引
*/
private int getCurrentSlotIndex(long currentTime) {
return (int) ((currentTime / slotSizeMs) % slotCount);
}
/**
* 計算需要等待的時間(毫秒)
*/
private long calculateWaitTime(int permits) {
long currentTime = System.currentTimeMillis();
long windowStartTime = currentTime - windowSizeMs;
// 找到最早的非空格子
long earliestSlotTime = Long.MAX_VALUE;
for (int i = 0; i < slotCount; i++) {
if (slots[i].get() > 0 && slotStartTimes[i] < earliestSlotTime) {
earliestSlotTime = slotStartTimes[i];
}
}
if (earliestSlotTime == Long.MAX_VALUE) {
return 0; // 沒有找到非空格子,不需要等待
}
// 計算最早格子過期的時間
long earliestExpireTime = earliestSlotTime + windowSizeMs;
long waitTime = earliestExpireTime - currentTime;
// 考慮請求數量,可能需要等待更多時間
int availableSlots = maxRequests - totalRequests.get();
if (availableSlots <= 0) {
waitTime = Math.max(waitTime, slotSizeMs);
}
return Math.max(0, waitTime);
}
/**
* 獲取當前窗口內的總請求數
* @return 當前請求數
*/
public int getCurrentRequests() {
lock.lock();
try {
slideWindow();
return totalRequests.get();
} finally {
lock.unlock();
}
}
}
分佈式限流
上面提到的令牌桶、窗口計數限流算法,都是單機實現,還可以通過redis lua腳本實現分佈式限流。lua腳本可保證原子性,即多條redis命令要麼都執行成功、要麼都不生效。
固定窗口計數(分佈式限流實現)
以固定窗口限流為例,因為可設置窗口key的過期時間,所以不用專門寫對格子計數清零的邏輯,簡單很多。
關鍵流程
- 首先,通過get命令,查詢窗口key是否存在
- 如果存在,則判斷是否超過閾值,未超過則通過incr命令自增、返回成功,否則返回限流
- 如果不存在,則通過set ex命令,設置初始值1、以及過期時間ttl
固定窗口限流的lua腳本示例如下:
-- 固定窗口限流腳本
-- KEYS[1]: 限流key
-- ARGV[1]: 時間窗口大小(秒)
-- ARGV[2]: 最大請求次數
local key = KEYS[1]
local window = tonumber(ARGV[1])
local maxCount = tonumber(ARGV[2])
-- 獲取當前計數
local current = redis.call('GET', key)
if current == false then
-- key不存在,設置初始值並設置過期時間
redis.call('SET', key, 1, 'EX', window)
return 1
else
-- key存在,檢查是否超過限制
local count = tonumber(current)
if count < maxCount then
-- 未超過限制,計數+1
redis.call('INCR', key)
return 1
else
-- 超過限制
return 0
end
end
總結
理解限流算法的原理,有助於做出合適的方案選型。
回顧上一篇文章介紹的限流方案:
- 令牌桶算法允許突發流量,實現較輕量,適用場景最通用:Guava RateLimiter、Sentinel、Redisson RateLimiter都採用令牌桶算法、或變種實現
- 窗口計數算法強調平滑限流,Sentinel也內置了滑動窗口計數算法
- 漏桶算法,只能按固定速率處理、不夠靈活,只在特定嚴格平滑限流的場景下適用