C++ 併發雙階段隊列設計原理與實現
一、設計思想
該隊列採用雙階段處理模型,通過鎖機制實現線程安全,核心思想包含:
- 階段隔離:入隊(Enqueue)與處理(TryBegin/TryEnd)分離
- 原子操作:通過SpinLock保證關鍵段操作的原子性
- 狀態跟蹤:使用ID序列追蹤任務處理狀態
- 異常隔離:虛函數設計允許自定義處理邏輯
二、數據結構
1. 核心成員變量
|
成員變量
|
類型
|
作用
|
|
|
|
存儲待處理元素
|
|
|
|
存儲處理中的任務狀態
|
|
|
|
記錄失敗的任務ID
|
|
|
|
同步控制對象
|
2. 狀態流轉圖
三、流程分析
1. 入隊流程 (Enqueue)
virtual void Enqueue(TElement&& element) noexcept {
SynchronizeObjectScope scope(syncobj_);
list_.emplace_back(std::move(element));
}
- 同步控制:通過SpinLock保護list_的push操作
- 無拷貝設計:使用
std::move實現元素移動語義 - 線程安全:保證多線程環境下的入隊操作安全
2. 任務獲取流程 (TryBegin)
virtual bool TryBegin() noexcept {
TElement element;
int id;
for (SynchronizeObjectScope scope(syncobj_);;)
{
auto tail = list_.begin();
auto endl = list_.end();
if (tail == endl)
{
return false;
}
id = ++ends_id_inc_;
element = std::move(*tail);
list_.erase(tail);
break;
}
ConcurrencyEndValue value;
value.element = std::move(element);
if (!OnBegin(value.element, value.result))
{
SynchronizeObjectScope scope(syncobj_);
skip_.emplace(id);
return false;
}
for (SynchronizeObjectScope scope(syncobj_);;)
{
auto r = ends__.emplace(std::make_pair(id, std::move(value)));
return r.second;
}
}
- 雙重檢查:通過自增ID確保任務唯一性
- 預處理機制:通過虛函數OnBegin實現自定義校驗
- 異常處理:失敗任務ID存入skip_集合
3. 任務處理流程 (TryEnd)
virtual bool TryEnd() noexcept {
ConcurrencyEndValue value;
for (SynchronizeObjectScope scope(syncobj_);;)
{
int next_id = ends_id_ + 1;
auto tail = ends__.find(next_id);
auto endl = ends__.end();
if (tail == endl)
{
auto skip_tail = skip_.find(next_id);
auto skil_endl = skip_.end();
if (skip_tail != skil_endl)
{
ends_id_ = next_id;
skip_.erase(skip_tail);
return true;
}
return false;
}
value = std::move(tail->second);
ends_id_ = next_id;
ends__.erase(tail);
break;
}
return OnEnd(value.element, value.result);
}
- 順序保證:通過ID自增保證處理順序
- 狀態清理:處理完成後立即刪除記錄
- 結果傳遞:通過引用參數返回處理結果
四、併發控制
1. 鎖粒度
2. 優化點
- 批量處理:DoEvents()循環處理多個任務
- 內存預分配:list_使用連續內存存儲
- 快速失敗:skip_集合快速跳過無效任務
五、應用場景
六、源代碼
#pragma once
// 包含預編譯頭文件(提高編譯效率)
#include <chtrader/stdafx.h>
// 包含自旋鎖實現(用於線程同步)
#include <chtrader/threading/SpinLock.h>
namespace chtrader
{
namespace collections
{
// 併發雙階段隊列模板類
// TElement: 隊列元素類型
// TResult: 處理結果類型
// SynchronizeObject: 同步機制類型(默認使用自旋鎖)
template <typename TElement, typename TResult, class SynchronizeObject = chtrader::threading::SpinLock>
class ConcurrentTwoStepQueue
{
// 用於存儲待處理元素及其處理結果的結構體
struct ConcurrencyEndValue
{
TElement element; // 原始元素
TResult result; // 處理結果
};
// 鎖作用域類型別名(簡化鎖管理)
using SynchronizeObjectScope = std::lock_guard<SynchronizeObject>;
public:
// 默認構造函數
ConcurrentTwoStepQueue() noexcept { }
// 析構函數(確保所有事件被處理)
virtual ~ConcurrentTwoStepQueue() noexcept
{
Clear();
}
// 入隊操作(線程安全)
void Enqueue(TElement&& element) noexcept
{
SynchronizeObjectScope scope(syncobj_); // 加鎖
size_++;
list_.emplace_back(std::move(element)); // 移動元素到隊列尾部
}
// 開始處理元素(嘗試獲取下一個元素)
bool TryBegin() noexcept
{
TElement element;
int sequence;
// 循環嘗試獲取元素
for (SynchronizeObjectScope scope(syncobj_);;)
{
auto tail = list_.begin(); // 獲取隊列頭部迭代器
auto endl = list_.end(); // 獲取隊列尾部迭代器
if (tail == endl) // 隊列為空時返回失敗
{
return false;
}
sequence = ++ends_inc_; // 生成唯一ID
element = std::move(*tail); // 移動元素值
list_.erase(tail); // 從隊列中移除元素
break; // 退出循環
}
// 存儲待處理元素及其結果
ConcurrencyEndValue value;
value.element = std::move(element);
// 執行前置處理邏輯(需子類實現)
bool initiate = OnBegin(value.element, value.result);
if (unlikely(!initiate))
{
SynchronizeObjectScope scope(syncobj_);
auto r = skip_.emplace(sequence);
assert(r.second);
return r.second;
}
// 存儲到處理中的隊列
for (SynchronizeObjectScope scope(syncobj_);;)
{
auto r = ends__.emplace(std::make_pair(sequence, std::move(value)));
assert(r.second);
return r.second;
}
}
// 結束處理元素(嘗試獲取下一個待處理結果)
bool TryEnd() noexcept
{
ConcurrencyEndValue value;
for (SynchronizeObjectScope scope(syncobj_);;)
{
auto ack = ends_ack_.load(std::memory_order_acquire);
auto ends_tail = ends__.find(ack); // 查找對應ID的元素
if (ends_tail == ends__.end()) // 未找到時檢查跳過集合
{
auto skip_tail = skip_.find(ack);
if (skip_tail != skip_.end()) // 存在跳過記錄則移除
{
skip_.erase(skip_tail);
size_.fetch_sub(1, std::memory_order_release);
ends_ack_.fetch_add(1, std::memory_order_release);
return true; // 表示跳過成功
}
return false; // 隊列空且無跳過項時返回失敗
}
value = std::move(ends_tail->second); // 獲取元素值
ends__.erase(ends_tail); // 從處理隊列中移除
size_.fetch_sub(1, std::memory_order_release);
ends_ack_.fetch_add(1, std::memory_order_release);
// 執行後置處理邏輯(需子類實現)
OnEnd(value.element, value.result);
return true;
}
}
// 處理所有可用事件(批量處理)
int DoEvents() noexcept
{
for (int events = 0;;)
{
TryBegin();
if (TryEnd())
{
events++;
}
else
{
return events; // 返回處理的事件總數
}
}
}
// 清空隊列
void Clear() noexcept
{
chtrader::unordered_map<int, ConcurrencyEndValue> ends;
chtrader::unordered_set<int> skip;
chtrader::list<TElement> list;
for (SynchronizeObjectScope scope(syncobj_);;)
{
list = std::move(list_);
skip = std::move(skip_);
ends = std::move(ends__);
list_.clear();
skip_.clear();
ends__.clear();
break;
}
}
int Count() noexcept { return size_.load(std::memory_order_acquire); }
protected:
// 前置處理回調(需子類實現具體邏輯)
virtual bool OnBegin(const TElement& element, TResult& output) noexcept = 0;
// 後置處理回調(需子類實現具體邏輯)
virtual void OnEnd(const TElement& element, const TResult& result) noexcept = 0;
private:
SynchronizeObject syncobj_; // 同步鎖對象
chtrader::list<TElement> list_; // 底層元素隊列
chtrader::unordered_set<int> skip_; // 跳過處理的ID集合
std::atomic<int> size_ = 0; // 數量
std::atomic<int> ends_ack_ = 1; // 當前處理的ID
std::atomic<int> ends_inc_ = 0; // ID遞增計數器
chtrader::unordered_map<int, ConcurrencyEndValue> ends__; // 處理中的元素映射
};
}
}
本文章為轉載內容,我們尊重原作者對文章享有的著作權。如有內容錯誤或侵權問題,歡迎原作者聯繫我們進行內容更正或刪除文章。