前言
在高併發系統或接口服務中,頻繁的請求可能導致系統資源被快速耗盡,甚至造成服務不可用。為了保證系統的穩定性和響應能力,我們需要對接口調用進行限流,即控制單位時間內的最大請求次數。
本文將以 C++ 實現的限流器為例,講解如何通過固定窗口計數法來實現高精度、易維護的限流機制。
核心目標
- 控制調用頻率:每單位時間允許的最大接口調用次數;
- 保證穩定性:避免短時間突發流量;
- 便於監控:記錄調用情況方便調試。
算法設計
本文使用 固定窗口計數法(Fixed Window Counter)實現限流。
設計方案
|
時間區間
|
調用次數
|
內部控制值
|
説明
|
|
第1s
|
1
|
0 → 1
|
callCount=1,窗口開始計時
|
|
2
|
1 → 2
|
callCount=2
|
|
|
3
|
2 → 3
|
callCount=3,觸發限流,等待剩餘窗口時間,計數重置
|
|
|
第2s
|
4
|
3 → 1
|
新窗口開始,callCount=1
|
|
5
|
4 → 2
|
callCount=2
|
|
|
6
|
5 → 3
|
callCount=3,觸發限流,等待剩餘窗口時間,計數重置
|
|
|
第3s
|
7
|
6 → 1
|
新窗口開始,callCount=1
|
|
...
|
...
|
...
|
...
|
偽代碼:
Wait() {
now = 當前時間
if (m_windowStartNs== 0)
初始化窗口起始時間
if (m_callCount == m_maxCalls - 1) {
elapsed = now - m_windowStartNs
if (elapsed < m_windowNs)
{
sleep(窗口剩餘時間)
}
重置計數器和窗口起始時間
}
m_callCount++;
}
C++代碼示例
#include <iostream>
#include <vector>
#include <ctime>
#include <iomanip>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <string>
#define RATE_LIMITER_DEBUG 1
// ===================== 高精度時間戳工具 =====================
namespace HighTimestamp
{
// 獲取單調遞增納秒時間
inline uint64_t GetTimeNano()
{
struct timespec t;
clock_gettime(CLOCK_MONOTONIC, &t);
return static_cast<uint64_t>(t.tv_sec) * 1000000000ULL + t.tv_nsec;
}
}
// ==========================================================
// ===================== 異步日誌器 =====================
class AsyncLogger
{
public:
AsyncLogger()
: m_stop(false)
{
m_thread = std::thread([this]
{ Process(); });
}
~AsyncLogger()
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_stop = true;
}
m_cv.notify_all();
if (m_thread.joinable())
{
m_thread.join();
}
}
// 往隊列中添加日誌
void Log(const std::string &msg)
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push(msg);
}
m_cv.notify_one();
}
private:
void Process()
{
while (true)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [this]
{ return !m_queue.empty() || m_stop; });
while (!m_queue.empty())
{
std::string msg = m_queue.front();
m_queue.pop();
lock.unlock(); // 減小鎖粒度
std::cout << msg << std::endl;
lock.lock();
}
if (m_stop && m_queue.empty())
break;
}
}
std::queue<std::string> m_queue;
std::mutex m_mutex;
std::condition_variable m_cv;
std::thread m_thread;
bool m_stop;
};
// ==========================================================
// 模擬 send 函數
enum SendResult
{
OK,
RETRY,
ERROR
};
SendResult Send(int) { return OK; }
// ===================== 限流器 =====================
class RateLimiter
{
public:
RateLimiter(uint32_t maxCalls, uint64_t windowNs, AsyncLogger *logger)
: m_maxCalls(maxCalls), m_windowNs(windowNs), m_logger(logger)
{
}
// 調用前等待,保證限流
void Wait()
{
uint64_t now = HighTimestamp::GetTimeNano();
if (m_windowStartNs == 0) // 首次則需要初始化窗口開始時間
{
m_windowStartNs = now;
}
if (m_callCount == m_maxCalls - 1)
{
uint64_t elapsed = now - m_windowStartNs;
if (elapsed < m_windowNs) // 説明窗口還未滿,需要sleep
{
#if 0 // 打印調用m_maxCalls次的實際耗時
m_logger->Log("[信息] 達到調用次數= " + std::to_string(m_callCount) + ",實際耗時=" + std::to_string(elapsed / 1e6) + " ms,等待剩餘時間...");
#endif
uint64_t sleepNs = m_windowNs - elapsed;
m_ts.tv_sec = sleepNs / 1000000000ULL;
m_ts.tv_nsec = sleepNs % 1000000000ULL;
nanosleep(&m_ts, nullptr);
}
else // 説明窗口已佔滿甚至多佔,不需要sleep
{
#if RATE_LIMITER_DEBUG
m_logger->Log("[提示] 窗口已滿,但速率低於限制,無需 sleep");
#endif
}
#if RATE_LIMITER_DEBUG
uint64_t realElapsed = HighTimestamp::GetTimeNano() - m_windowStartNs;
double rate = (double)m_callCount * 1e9 / realElapsed;
m_logger->Log("[統計] 調用次數=" + std::to_string(m_callCount) +
", 耗時=" + std::to_string(realElapsed / 1e6) +
" ms, 實際速率=" + std::to_string(rate) + " 次/秒");
#endif
m_callCount = 0; // 重置窗口計數
m_windowStartNs = HighTimestamp::GetTimeNano(); // 重置窗口開始時間
}
++m_callCount; // 增加計數
}
private:
uint32_t m_maxCalls = 0; // 每窗口最大調用次數
uint64_t m_windowNs = 0; // 窗口長度(納秒)
uint32_t m_callCount = 0; // 當前窗口已調用次數
uint64_t m_windowStartNs = 0; // 當前窗口開始時間
AsyncLogger *m_logger = nullptr; // 異步日誌器
struct timespec m_ts{};
};
// ==========================================================
int main()
{
static const int32_t vecSize = 1200000;
std::vector<int> vec(vecSize, 42); // 大數據量測試
const uint32_t MAX_CALLS = 10000; // 每窗口最大調用次數
const uint64_t WINDOW_NS = 1000000000ULL; // 窗口長度 1 秒
AsyncLogger logger;
RateLimiter limiter(MAX_CALLS, WINDOW_NS, &logger);
uint32_t totalCalls = 0;
uint64_t startTime = HighTimestamp::GetTimeNano();
for (uint32_t i = 0; i < vec.size(); ++i)
{
while (true)
{
SendResult err = Send(vec[i]);
if (err == OK)
{
limiter.Wait();
++totalCalls;
break;
}
else if (err == RETRY)
{
continue;
}
else
{
logger.Log("Send error! err =" + std::to_string(err));
break;
}
}
}
uint64_t now = HighTimestamp::GetTimeNano();
uint64_t elapsed = now - startTime;
logger.Log("測試結束,總調用次數=" + std::to_string(totalCalls) + "\n" +
"數據量=" + std::to_string(vec.size()) + "\n" +
"實際耗時=" + std::to_string(elapsed / 1e6) + " ms\n" +
"平均速率=" + std::to_string(totalCalls / (elapsed / 1e9)) + " 次/秒");
return 0;
}
輸出:
説明
為什麼平均速率達不到 10000 次/秒?
因為:
nanosleep睡過頭(幾微秒到幾十微秒)- 固定窗口限流算法的結構導致“尾部累積誤差”,誤差範圍約 -0.006%
- 調度器喚醒延遲不可避免
總結
固定窗口計數法是一種簡單、高效的限流方案,適用於大部分接口調用控制場景。它結合高精度時間和異步日誌器,兼顧了性能與可觀測性。