博客 / 詳情

返回

架構師必備:限流方案選型(原理篇)

大家好,我是Java烘焙師。上一篇文章介紹了限流方案的使用和選型,本文接着講限流算法的原理。
常見的限流算法有:令牌桶、窗口計數、漏桶,前兩種在實際使用中最常見,因此重點介紹。限流算法是通用的,既可以在單機上實現,也可以藉助redis來實現分佈式限流。

首先,定義一下限流算法需實現的基本功能:

  • tryAquire嘗試獲取通行許可:立刻返回結果,true代表允許、false代表被限流
  • aquire阻塞式獲取通行許可:如果能獲取到則返回,否則阻塞等待(指定超時時間,或一直等待)。當觸發限流時藉助java.util.concurrent.locks.Conditionawait方法來阻塞等待,當恢復時通過signalAll來喚醒阻塞的線程

令牌桶算法

概念

  • 令牌:相當於通行許可,需要多少qps或資源消耗速率,就需要獲取到多少令牌
  • 桶:決定了令牌個數上限,即處理能力上限
  • 令牌生成速率:每秒往桶裏補充多少令牌
  • 可用令牌大小:不超過桶大小

令牌桶算法,類似於生產者/消費者模式。桶相當於隊列,令牌相當於任務,往桶裏補充令牌相當於生產者,獲取令牌相當於消費者。但不用真的生成線程、隊列等實體,而是通過比較請求與可用令牌的大小、以及計算下一個令牌的發放時間,來實現的。

特點

  • 場景通用,允許突發流量
  • 理解起來不夠直觀,但代碼實現相對簡單

關鍵流程

不用定時補充令牌,而是類似於懶加載的方式,在調用時重新計算可用的令牌數。

  • tryAquire方法:

    • 先補充令牌:重新計算可用令牌數
    • 比較可用令牌數、與請求許可數:如果前者大則扣減請求許可數、更新可用令牌數,並返回成功,否則返回失敗
  • aquire方法:

    • 如果請求許可數 <= 可用令牌數:則扣減請求許可數、更新可用令牌數,並返回成功
    • 否則,計算需要等待的時間:根據所需的令牌差額,乘以令牌的生成間隔時間,得到需等待的時間,即(permits - tokens) * refillIntervalNanos。使用condition.awaitNanos方法,來實現阻塞等待。例如:桶容量300,每秒生成100個令牌,則令牌的生成間隔時間為 1秒 / 100 = 10毫秒,當前可用令牌為50,而請求許可為200,則需等待 (200 - 50) * 10毫秒 = 1500毫秒
    • 重新補充令牌
    • 上述步驟循環往復,直到獲取到令牌

最關鍵的補充令牌邏輯:

  • 計算需補充的令牌個數:根據自從上次補充以來的時間間隔,除以令牌的生成間隔時間,得到需補充的令牌個數,即 (now - lastRefillTime) / refillIntervalNanos
  • 更新可用令牌大小:不能超過桶容量大小,即在 capacity和tokens + tokensToAdd中取較小的那個
  • 通知喚醒等待的線程:通過condition.signalAll()實現

以下是帶註釋的代碼示例。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 令牌桶限流器 - 支持非阻塞和阻塞獲取令牌
 */
public class TokenBucketRateLimiter {
    // 桶容量大小
    private final long capacity;
    // 令牌生成速率(每秒生成的令牌數)
    private final double refillRate;
    // 每個令牌的生成間隔(納秒),等於 1秒對應的納秒 除以 令牌生成速率
    private final long refillIntervalNanos;
    
    // 可用令牌大小
    private double tokens;
    // 最後刷新時間(納秒)
    private long lastRefillTime;
    
    // 鎖和條件變量,用於阻塞等待
    private final ReentrantLock lock;
    private final Condition condition;
    
    /**
     * 構造方法
     */
    public TokenBucketRateLimiter(long capacity, double refillRate) {
        if (capacity <= 0 || refillRate <= 0) {
            throw new IllegalArgumentException("容量和生成速率必須大於0");
        }
        
        this.capacity = capacity;
        this.refillRate = refillRate;
        this.refillIntervalNanos = (long) (1_000_000_000.0 / refillRate);
        this.tokens = capacity;
        this.lastRefillTime = System.nanoTime();
        this.lock = new ReentrantLock();
        this.condition = lock.newCondition();
    }
    
    /**
     * 非阻塞方式嘗試獲取一個令牌
     * @return 成功獲取返回true,否則返回false
     */
    public boolean tryAcquire() {
        return tryAcquire(1);
    }
    
    /**
     * 非阻塞方式嘗試獲取指定數量的令牌
     * @param permits 請求的令牌數量
     * @return 成功獲取返回true,否則返回false
     */
    public boolean tryAcquire(long permits) {
        if (permits <= 0) {
            throw new IllegalArgumentException("請求的令牌數必須大於0");
        }
        if (permits > capacity) {
            return false; // 請求超過桶容量,直接拒絕
        }
        
        lock.lock();
        try {
            // 重新補充令牌
            refillTokens();
            
            if (tokens >= permits) {
                tokens -= permits;
                return true;
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 阻塞方式獲取一個令牌
     * @throws InterruptedException 如果等待過程中線程被中斷
     */
    public void acquire() throws InterruptedException {
        acquire(1);
    }
    
    /**
     * 阻塞方式獲取指定數量的令牌(無限等待)
     * @param permits 請求的令牌數量
     * @throws InterruptedException 如果等待過程中線程被中斷
     */
    public void acquire(long permits) throws InterruptedException {
        if (permits <= 0) {
            throw new IllegalArgumentException("請求的令牌數必須大於0");
        }
        if (permits > capacity) {
            throw new IllegalArgumentException("請求的令牌數超過桶容量");
        }
        
        lock.lock();
        try {
            while (tokens < permits) {
                // 計算需要等待的時間
                double neededTokens = permits - tokens;
                long waitNanos = (long) (neededTokens * refillIntervalNanos);
                
                // 等待直到有足夠令牌或超時
                condition.awaitNanos(waitNanos);
                // 重新補充令牌
                refillTokens();
            }
            tokens -= permits;
        } finally {
            lock.unlock();
        }
    }
        
    /**
     * 補充令牌(必須在鎖內調用)
     */
    private void refillTokens() {
        long now = System.nanoTime();
        // 計算需要補充的令牌個數
        double tokensToAdd = (now - lastRefillTime) / (double) refillIntervalNanos;
        
        if (tokensToAdd > 0) {
            // 可用令牌大小,最大不能超過桶容量大小
            tokens = Math.min(capacity, tokens + tokensToAdd);
            lastRefillTime = now;
            
            // 如果有等待的線程,通知它們
            if (tokens > 0) {
                condition.signalAll();
            }
        }
    }

}

窗口計數算法

概念

  • 時間窗口:一段時間範圍,可以是秒,也可以是分鐘、小時、天等。窗口限流值,就代表該時間窗口最大允許的請求數
  • 格子:把時間窗口拆分到更細粒度,比如把1秒拆成10個100毫秒,每個格子有開始時間、結束時間
  • 格子計數,就是格子對應時間範圍的請求計數
  • 窗口總計數,就是所有格子計數的總和
  • 固定窗口計數:時間窗口固定,在臨界情況下可能出現qps尖刺。例如:每秒限流100,在第一個1秒格子的第900毫秒請求許可80次(通過),在第二個1秒格子的第200毫秒請求許可70次(通過),但是整體看900毫秒到1200毫秒,這300毫秒內已經請求了150次,超過了限流100的閾值、存在qps尖刺
  • 滑動窗口計數:滑動窗口有起始時間點、終止時間點,隨着時間流逝一起動態往後推移,滑動窗口限流就是統計該時間範圍內的請求許可是否超過閾值

一般用環形數組實現,數組的每個元素就是格子計數器,循環複用。
因為滑動窗口使用了更細粒度的格子,所以限流qps相對於固定窗口,會更平滑。

特點

  • 追求嚴格的限流,不允許突發流量
  • 理解起來容易,但是代碼實現較複雜,需要維護更新格子計數

關鍵流程:

以滑動窗口計數為例,下面的關鍵流程是閲讀代碼後提煉出來的。

  • tryAquire方法:

    • 首先更新過期的格子:把過期格子的計數清零,同時更新格子的開始、結束時間
    • 檢查當前窗口總計數是否超過限制,是則返回false,代表被限流;否則更新當前格子的計數、以及窗口總計數,返回成功
  • aquire方法:

    • 首先更新過期的格子,然後檢查當前窗口總計數是否超過限制,是則計算需要等待的時間,否則更新當前格子的計數、以及窗口總計數,返回成功
    • 其中,計算需要等待的時間:

      • 至少等待的時間:當前窗口中最早的格子多久後過期,則需要至少等待多久
      • 至多等待的時間:如果窗口總計數減掉最早格子計數,仍然小於請求許可,説明即使最早格子過期,仍然會限流,則至多等待一個格子的時間,待下一輪循環再判斷
      • 使用condition.awaitNanos方法,來實現阻塞等待
    • 上述步驟循環往復,直到滿足條件
  • 清理過期的格子時,如果發現窗口總計數小於限流值,則通知喚醒等待的線程,通過condition.signalAll()實現。

以下是帶註釋的代碼示例。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Arrays;

/**
 * 滑動窗口限流器 - 支持非阻塞和阻塞獲取
 */
public class SlidingWindowRateLimiter {
    // 時間窗口大小(毫秒)
    private final long windowSizeMs;
    // 窗口內允許的最大請求數
    private final int maxRequests;
    // 格子數量
    private final int slotCount;
    // 每個格子的時間長度(毫秒)
    private final long slotSizeMs;
    
    // 滑動窗口數組,每個元素代表一個時間格子的請求計數
    private final AtomicInteger[] slots;
    // 每個格子對應的開始時間(毫秒)
    private final long[] slotStartTimes;
    // 當前窗口的總請求數
    private final AtomicInteger totalRequests;
    
    // 鎖和條件變量,用於阻塞等待
    private final ReentrantLock lock;
    private final Condition condition;
    
    /**
     * 構造函數
     * @param maxRequests 時間窗口內允許的最大請求數
     * @param windowSizeMs 時間窗口大小(毫秒)
     * @param slotCount 窗口分割的格子數量
     */
    public SlidingWindowRateLimiter(int maxRequests, long windowSizeMs, int slotCount) {
        if (maxRequests <= 0 || windowSizeMs <= 0 || slotCount <= 0) {
            throw new IllegalArgumentException("參數必須大於0");
        }
        
        this.maxRequests = maxRequests;
        this.windowSizeMs = windowSizeMs;
        this.slotCount = slotCount;
        this.slotSizeMs = windowSizeMs / slotCount;
        
        // 初始化滑動窗口
        this.slots = new AtomicInteger[slotCount];
        this.slotStartTimes = new long[slotCount];
        this.totalRequests = new AtomicInteger(0);
        
        long currentTime = System.currentTimeMillis();
        for (int i = 0; i < slotCount; i++) {
            slots[i] = new AtomicInteger(0);
            slotStartTimes[i] = currentTime - (slotCount - i) * slotSizeMs;
        }
        
        this.lock = new ReentrantLock();
        this.condition = lock.newCondition();
    }
    
    /**
     * 非阻塞方式嘗試獲取一個請求許可
     * @return 成功獲取返回true,否則返回false
     */
    public boolean tryAcquire() {
        return tryAcquire(1);
    }
    
    /**
     * 非阻塞方式嘗試獲取指定數量的請求許可
     * @param permits 請求的許可數量
     * @return 成功獲取返回true,否則返回false
     */
    public boolean tryAcquire(int permits) {
        if (permits <= 0) {
            throw new IllegalArgumentException("請求的許可數必須大於0");
        }
        if (permits > maxRequests) {
            return false; // 請求超過窗口容量,直接拒絕
        }
        
        lock.lock();
        try {
            // 滑動窗口,更新過期的格子
            slideWindow();
            
            // 檢查當前窗口是否超過限制
            if (totalRequests.get() + permits <= maxRequests) {
                // 獲取當前時間對應的格子索引
                long currentTime = System.currentTimeMillis();
                int currentSlotIndex = getCurrentSlotIndex(currentTime);
                
                // 更新當前格子的計數和總計數
                slots[currentSlotIndex].addAndGet(permits);
                totalRequests.addAndGet(permits);
                return true;
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 阻塞方式獲取一個請求許可(無限等待)
     * @throws InterruptedException 如果等待過程中線程被中斷
     */
    public void acquire() throws InterruptedException {
        acquire(1);
    }
    
    /**
     * 阻塞方式獲取指定數量的請求許可(無限等待)
     * @param permits 請求的許可數量
     * @throws InterruptedException 如果等待過程中線程被中斷
     */
    public void acquire(int permits) throws InterruptedException {
        if (permits <= 0) {
            throw new IllegalArgumentException("請求的許可數必須大於0");
        }
        if (permits > maxRequests) {
            throw new IllegalArgumentException("請求的許可數超過窗口容量");
        }
        
        lock.lock();
        try {
            while (!tryAcquireInternal(permits)) {
                // 計算需要等待的時間(直到下一個格子過期)
                long waitTimeMs = calculateWaitTime(permits);
                if (waitTimeMs > 0) {
                    condition.await(waitTimeMs, TimeUnit.MILLISECONDS);
                } else {
                    condition.await();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 內部嘗試獲取方法(必須在鎖內調用)
     */
    private boolean tryAcquireInternal(int permits) {
        slideWindow();
        
        if (totalRequests.get() + permits <= maxRequests) {
            long currentTime = System.currentTimeMillis();
            int currentSlotIndex = getCurrentSlotIndex(currentTime);
            
            slots[currentSlotIndex].addAndGet(permits);
            totalRequests.addAndGet(permits);
            return true;
        }
        return false;
    }
    
    /**
     * 滑動窗口,清理過期的格子
     */
    private void slideWindow() {
        long currentTime = System.currentTimeMillis();
        long windowStartTime = currentTime - windowSizeMs;
        
        for (int i = 0; i < slotCount; i++) {
            // 如果格子開始時間在窗口開始時間之前,説明這個格子已過期
            if (slotStartTimes[i] < windowStartTime) {
                int expiredCount = slots[i].getAndSet(0);
                if (expiredCount > 0) {
                    totalRequests.addAndGet(-expiredCount);
                    // 更新格子開始時間為當前週期開始時間
                    slotStartTimes[i] = currentTime - (slotCount - i - 1) * slotSizeMs;
                }
            }
        }
        
        // 如果有等待的線程,通知它們
        if (totalRequests.get() < maxRequests) {
            condition.signalAll();
        }
    }
    
    /**
     * 獲取當前時間對應的格子索引
     */
    private int getCurrentSlotIndex(long currentTime) {
        return (int) ((currentTime / slotSizeMs) % slotCount);
    }
    
    /**
     * 計算需要等待的時間(毫秒)
     */
    private long calculateWaitTime(int permits) {
        long currentTime = System.currentTimeMillis();
        long windowStartTime = currentTime - windowSizeMs;
        
        // 找到最早的非空格子
        long earliestSlotTime = Long.MAX_VALUE;
        for (int i = 0; i < slotCount; i++) {
            if (slots[i].get() > 0 && slotStartTimes[i] < earliestSlotTime) {
                earliestSlotTime = slotStartTimes[i];
            }
        }
        
        if (earliestSlotTime == Long.MAX_VALUE) {
            return 0; // 沒有找到非空格子,不需要等待
        }
        
        // 計算最早格子過期的時間
        long earliestExpireTime = earliestSlotTime + windowSizeMs;
        long waitTime = earliestExpireTime - currentTime;
        
        // 考慮請求數量,可能需要等待更多時間
        int availableSlots = maxRequests - totalRequests.get();
        if (availableSlots <= 0) {
            waitTime = Math.max(waitTime, slotSizeMs);
        }
        
        return Math.max(0, waitTime);
    }
    
    /**
     * 獲取當前窗口內的總請求數
     * @return 當前請求數
     */
    public int getCurrentRequests() {
        lock.lock();
        try {
            slideWindow();
            return totalRequests.get();
        } finally {
            lock.unlock();
        }
    }
}

分佈式限流

上面提到的令牌桶、窗口計數限流算法,都是單機實現,還可以通過redis lua腳本實現分佈式限流。lua腳本可保證原子性,即多條redis命令要麼都執行成功、要麼都不生效。

固定窗口計數(分佈式限流實現)

以固定窗口限流為例,因為可設置窗口key的過期時間,所以不用專門寫對格子計數清零的邏輯,簡單很多。

關鍵流程

  • 首先,通過get命令,查詢窗口key是否存在
  • 如果存在,則判斷是否超過閾值,未超過則通過incr命令自增、返回成功,否則返回限流
  • 如果不存在,則通過set ex命令,設置初始值1、以及過期時間ttl

固定窗口限流的lua腳本示例如下:

-- 固定窗口限流腳本
-- KEYS[1]: 限流key
-- ARGV[1]: 時間窗口大小(秒)
-- ARGV[2]: 最大請求次數

local key = KEYS[1]
local window = tonumber(ARGV[1])
local maxCount = tonumber(ARGV[2])

-- 獲取當前計數
local current = redis.call('GET', key)
if current == false then
    -- key不存在,設置初始值並設置過期時間
    redis.call('SET', key, 1, 'EX', window)
    return 1
else
    -- key存在,檢查是否超過限制
    local count = tonumber(current)
    if count < maxCount then
        -- 未超過限制,計數+1
        redis.call('INCR', key)
        return 1
    else
        -- 超過限制
        return 0
    end
end

總結

理解限流算法的原理,有助於做出合適的方案選型。

回顧上一篇文章介紹的限流方案:

  • 令牌桶算法允許突發流量,實現較輕量,適用場景最通用:Guava RateLimiter、Sentinel、Redisson RateLimiter都採用令牌桶算法、或變種實現
  • 窗口計數算法強調平滑限流,Sentinel也內置了滑動窗口計數算法
  • 漏桶算法,只能按固定速率處理、不夠靈活,只在特定嚴格平滑限流的場景下適用
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.