前言

在高併發系統或接口服務中,頻繁的請求可能導致系統資源被快速耗盡,甚至造成服務不可用。為了保證系統的穩定性和響應能力,我們需要對接口調用進行限流,即控制單位時間內的最大請求次數。

本文將以 C++ 實現的限流器為例,講解如何通過固定窗口計數法來實現高精度、易維護的限流機制。


核心目標

  1. 控制調用頻率:每單位時間允許的最大接口調用次數;
  2. 保證穩定性:避免短時間突發流量;
  3. 便於監控:記錄調用情況方便調試。

算法設計

本文使用 固定窗口計數法(Fixed Window Counter)實現限流。

設計方案

時間區間

調用次數

內部控制值

説明

第1s

1

0 → 1

callCount=1,窗口開始計時

2

1 → 2

callCount=2

3

2 → 3

callCount=3,觸發限流,等待剩餘窗口時間,計數重置

第2s

4

3 → 1

新窗口開始,callCount=1

5

4 → 2

callCount=2

6

5 → 3

callCount=3,觸發限流,等待剩餘窗口時間,計數重置

第3s

7

6 → 1

新窗口開始,callCount=1

...

...

...

...

偽代碼:

Wait() {
    now = 當前時間
    if (m_windowStartNs== 0)
        初始化窗口起始時間

    if (m_callCount == m_maxCalls - 1) {
        elapsed = now - m_windowStartNs
        if (elapsed < m_windowNs)
        {
            sleep(窗口剩餘時間)
        }
        重置計數器和窗口起始時間
    } 
    m_callCount++;
}

C++代碼示例

#include <iostream>
#include <vector>
#include <ctime>
#include <iomanip>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <string>

#define RATE_LIMITER_DEBUG 1

// ===================== 高精度時間戳工具 =====================
namespace HighTimestamp
{
    // 獲取單調遞增納秒時間
    inline uint64_t GetTimeNano()
    {
        struct timespec t;
        clock_gettime(CLOCK_MONOTONIC, &t);
        return static_cast<uint64_t>(t.tv_sec) * 1000000000ULL + t.tv_nsec;
    }
}
// ==========================================================

// ===================== 異步日誌器 =====================
class AsyncLogger
{
public:
    AsyncLogger()
        : m_stop(false)
    {
        m_thread = std::thread([this]
                               { Process(); });
    }

    ~AsyncLogger()
    {
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_stop = true;
        }
        m_cv.notify_all();
        if (m_thread.joinable())
        {
            m_thread.join();
        }
    }

    // 往隊列中添加日誌
    void Log(const std::string &msg)
    {
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_queue.push(msg);
        }
        m_cv.notify_one();
    }

private:
    void Process()
    {
        while (true)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_cv.wait(lock, [this]
                      { return !m_queue.empty() || m_stop; });
            while (!m_queue.empty())
            {
                std::string msg = m_queue.front();
                m_queue.pop();
                lock.unlock(); // 減小鎖粒度
                std::cout << msg << std::endl;
                lock.lock();
            }
            if (m_stop && m_queue.empty())
                break;
        }
    }

    std::queue<std::string> m_queue;
    std::mutex m_mutex;
    std::condition_variable m_cv;
    std::thread m_thread;
    bool m_stop;
};
// ==========================================================

// 模擬 send 函數
enum SendResult
{
    OK,
    RETRY,
    ERROR
};
SendResult Send(int) { return OK; }

// ===================== 限流器 =====================
class RateLimiter
{
public:
    RateLimiter(uint32_t maxCalls, uint64_t windowNs, AsyncLogger *logger)
        : m_maxCalls(maxCalls), m_windowNs(windowNs), m_logger(logger)
    {
    }

    // 調用前等待,保證限流
    void Wait()
    {
        uint64_t now = HighTimestamp::GetTimeNano();

        if (m_windowStartNs == 0) // 首次則需要初始化窗口開始時間
        {
            m_windowStartNs = now;
        }

        if (m_callCount == m_maxCalls - 1)
        {
            uint64_t elapsed = now - m_windowStartNs;

            if (elapsed < m_windowNs) // 説明窗口還未滿,需要sleep
            {
#if 0 // 打印調用m_maxCalls次的實際耗時
                m_logger->Log("[信息] 達到調用次數= " + std::to_string(m_callCount) + ",實際耗時=" + std::to_string(elapsed / 1e6) + " ms,等待剩餘時間...");
#endif
                uint64_t sleepNs = m_windowNs - elapsed;
                m_ts.tv_sec = sleepNs / 1000000000ULL;
                m_ts.tv_nsec = sleepNs % 1000000000ULL;
                nanosleep(&m_ts, nullptr);
            }
            else // 説明窗口已佔滿甚至多佔,不需要sleep
            {
#if RATE_LIMITER_DEBUG
                m_logger->Log("[提示] 窗口已滿,但速率低於限制,無需 sleep");
#endif
            }

#if RATE_LIMITER_DEBUG
            uint64_t realElapsed = HighTimestamp::GetTimeNano() - m_windowStartNs;
            double rate = (double)m_callCount * 1e9 / realElapsed;
            m_logger->Log("[統計] 調用次數=" + std::to_string(m_callCount) +
                          ", 耗時=" + std::to_string(realElapsed / 1e6) +
                          " ms, 實際速率=" + std::to_string(rate) + " 次/秒");
#endif
            m_callCount = 0;                                // 重置窗口計數
            m_windowStartNs = HighTimestamp::GetTimeNano(); // 重置窗口開始時間
        }
        ++m_callCount; // 增加計數
    }

private:
    uint32_t m_maxCalls = 0;         // 每窗口最大調用次數
    uint64_t m_windowNs = 0;         // 窗口長度(納秒)
    uint32_t m_callCount = 0;        // 當前窗口已調用次數
    uint64_t m_windowStartNs = 0;    // 當前窗口開始時間
    AsyncLogger *m_logger = nullptr; // 異步日誌器
    struct timespec m_ts{};
};
// ==========================================================

int main()
{
    static const int32_t vecSize = 1200000;
    std::vector<int> vec(vecSize, 42); // 大數據量測試

    const uint32_t MAX_CALLS = 10000;         // 每窗口最大調用次數
    const uint64_t WINDOW_NS = 1000000000ULL; // 窗口長度 1 秒

    AsyncLogger logger;
    RateLimiter limiter(MAX_CALLS, WINDOW_NS, &logger);

    uint32_t totalCalls = 0;
    uint64_t startTime = HighTimestamp::GetTimeNano();

    for (uint32_t i = 0; i < vec.size(); ++i)
    {
        while (true)
        {
            SendResult err = Send(vec[i]);
            if (err == OK)
            {
                limiter.Wait();
                ++totalCalls;
                break;
            }
            else if (err == RETRY)
            {
                continue;
            }
            else
            {
                logger.Log("Send error! err =" + std::to_string(err));
                break;
            }
        }
    }

    uint64_t now = HighTimestamp::GetTimeNano();
    uint64_t elapsed = now - startTime;
    logger.Log("測試結束,總調用次數=" + std::to_string(totalCalls) + "\n" +
               "數據量=" + std::to_string(vec.size()) + "\n" +
               "實際耗時=" + std::to_string(elapsed / 1e6) + " ms\n" +
               "平均速率=" + std::to_string(totalCalls / (elapsed / 1e9)) + " 次/秒");

    return 0;
}

輸出:

【C++11算法設計】限流器設計之固定窗口計數法_#限流器

説明

為什麼平均速率達不到 10000 次/秒?

因為:

  • nanosleep 睡過頭(幾微秒到幾十微秒)
  • 固定窗口限流算法的結構導致“尾部累積誤差”誤差範圍約  -0.006%
  • 調度器喚醒延遲不可避免

總結

固定窗口計數法是一種簡單、高效的限流方案,適用於大部分接口調用控制場景。它結合高精度時間和異步日誌器,兼顧了性能與可觀測性。