C++ 併發雙階段隊列設計原理與實現

C++雙端隊列_c++ 雙端隊列_#開發語言

一、設計思想

該隊列採用雙階段處理模型,通過鎖機制實現線程安全,核心思想包含:

  1. 階段隔離:入隊(Enqueue)與處理(TryBegin/TryEnd)分離
  2. 原子操作:通過SpinLock保證關鍵段操作的原子性
  3. 狀態跟蹤:使用ID序列追蹤任務處理狀態
  4. 異常隔離:虛函數設計允許自定義處理邏輯



C++雙端隊列_c++ 雙端隊列_#linux_02


二、數據結構

1. 核心成員變量

成員變量

類型

作用

list_

chtrader::list<TElement>

存儲待處理元素

ends__

unordered_map<int, ConcurrencyEndValue>

存儲處理中的任務狀態

skip_

unordered_set<int>

記錄失敗的任務ID

syncobj_

SpinLock

同步控制對象

2. 狀態流轉圖

C++雙端隊列_c++ 雙端隊列_#算法_03


三、流程分析

1. 入隊流程 (Enqueue)

virtual void Enqueue(TElement&& element) noexcept {
    SynchronizeObjectScope scope(syncobj_);
    list_.emplace_back(std::move(element));
}
  • 同步控制:通過SpinLock保護list_的push操作
  • 無拷貝設計:使用std::move實現元素移動語義
  • 線程安全:保證多線程環境下的入隊操作安全

2. 任務獲取流程 (TryBegin)

virtual bool TryBegin() noexcept {
    TElement element;
    int id;
    
    for (SynchronizeObjectScope scope(syncobj_);;) 
    {
        auto tail = list_.begin();
        auto endl = list_.end();
        if (tail == endl)
        {
            return false;
        }
        
        id = ++ends_id_inc_;
        element = std::move(*tail);
        
        list_.erase(tail);
        break;
    }
    
    ConcurrencyEndValue value;
    value.element = std::move(element);
    
    if (!OnBegin(value.element, value.result))
    {
        SynchronizeObjectScope scope(syncobj_);
        skip_.emplace(id);
        return false;
    }
    
    for (SynchronizeObjectScope scope(syncobj_);;)
    {
        auto r = ends__.emplace(std::make_pair(id, std::move(value)));
        return r.second;
    }
}
  • 雙重檢查:通過自增ID確保任務唯一性
  • 預處理機制:通過虛函數OnBegin實現自定義校驗
  • 異常處理:失敗任務ID存入skip_集合

3. 任務處理流程 (TryEnd)

virtual bool TryEnd() noexcept {
    ConcurrencyEndValue value;
    for (SynchronizeObjectScope scope(syncobj_);;)
    {
        int next_id = ends_id_ + 1;
        auto tail = ends__.find(next_id);
        auto endl = ends__.end();
        if (tail == endl)
        {
            auto skip_tail = skip_.find(next_id);
            auto skil_endl = skip_.end();
            if (skip_tail != skil_endl)
            {
                ends_id_ = next_id;
                skip_.erase(skip_tail);
                return true;
            }

            return false;
        }

        value = std::move(tail->second);
        ends_id_ = next_id;
        ends__.erase(tail);
        break;
    }
    
    return OnEnd(value.element, value.result);
}
  • 順序保證:通過ID自增保證處理順序
  • 狀態清理:處理完成後立即刪除記錄
  • 結果傳遞:通過引用參數返回處理結果

四、併發控制

1. 鎖粒度

C++雙端隊列_c++ 雙端隊列_#linux_04

2. 優化點

  1. 批量處理:DoEvents()循環處理多個任務
  2. 內存預分配:list_使用連續內存存儲
  3. 快速失敗:skip_集合快速跳過無效任務

五、應用場景

C++雙端隊列_c++ 雙端隊列_#開發語言_05


六、源代碼

#pragma once 

// 包含預編譯頭文件(提高編譯效率)
#include <chtrader/stdafx.h>

// 包含自旋鎖實現(用於線程同步)
#include <chtrader/threading/SpinLock.h>

namespace chtrader 
{
   namespace collections 
   {
        // 併發雙階段隊列模板類
        // TElement: 隊列元素類型
        // TResult: 處理結果類型
        // SynchronizeObject: 同步機制類型(默認使用自旋鎖)
        template <typename TElement, typename TResult, class SynchronizeObject = chtrader::threading::SpinLock>
        class ConcurrentTwoStepQueue
        {        
            // 用於存儲待處理元素及其處理結果的結構體
            struct ConcurrencyEndValue
            {
                TElement                                             element;  // 原始元素
                TResult                                              result;    // 處理結果
            };

            // 鎖作用域類型別名(簡化鎖管理)
            using SynchronizeObjectScope = std::lock_guard<SynchronizeObject>;

        public:
            // 默認構造函數
            ConcurrentTwoStepQueue() noexcept { }

            // 析構函數(確保所有事件被處理)
            virtual                                                     ~ConcurrentTwoStepQueue() noexcept
            {
                Clear();
            }

            // 入隊操作(線程安全)
            void                                                        Enqueue(TElement&& element) noexcept
            {
                SynchronizeObjectScope scope(syncobj_);  // 加鎖
                size_++;
                list_.emplace_back(std::move(element));  // 移動元素到隊列尾部
            }

            // 開始處理元素(嘗試獲取下一個元素)   
            bool                                                        TryBegin() noexcept
            {
                TElement element;
                int sequence;

                // 循環嘗試獲取元素
                for (SynchronizeObjectScope scope(syncobj_);;)
                {
                    auto tail = list_.begin();  // 獲取隊列頭部迭代器
                    auto endl = list_.end();    // 獲取隊列尾部迭代器

                    if (tail == endl)  // 隊列為空時返回失敗
                    {
                        return false;
                    }
                   
                    sequence = ++ends_inc_;  // 生成唯一ID
                    element = std::move(*tail);  // 移動元素值

                    list_.erase(tail);  // 從隊列中移除元素
                    break;  // 退出循環
                }

                // 存儲待處理元素及其結果
                ConcurrencyEndValue value;
                value.element = std::move(element);

                // 執行前置處理邏輯(需子類實現)
                bool initiate = OnBegin(value.element, value.result);
                if (unlikely(!initiate))
                {
                    SynchronizeObjectScope scope(syncobj_);
                    auto r = skip_.emplace(sequence);
                    assert(r.second);
                    return r.second;
                }

                // 存儲到處理中的隊列
                for (SynchronizeObjectScope scope(syncobj_);;)
                {
                    auto r = ends__.emplace(std::make_pair(sequence, std::move(value)));
                    assert(r.second);
                    return r.second;
                }
            }

            // 結束處理元素(嘗試獲取下一個待處理結果) 
            bool                                                        TryEnd() noexcept
            {
                ConcurrencyEndValue value;
                for (SynchronizeObjectScope scope(syncobj_);;)
                {
                    auto ack = ends_ack_.load(std::memory_order_acquire);
                    auto ends_tail = ends__.find(ack);  // 查找對應ID的元素
                    if (ends_tail == ends__.end())  // 未找到時檢查跳過集合
                    {
                        auto skip_tail = skip_.find(ack);
                        if (skip_tail != skip_.end())  // 存在跳過記錄則移除
                        {
                            skip_.erase(skip_tail);
                            
                            size_.fetch_sub(1, std::memory_order_release);
                            ends_ack_.fetch_add(1, std::memory_order_release);

                            return true;  // 表示跳過成功
                        }

                        return false;  // 隊列空且無跳過項時返回失敗
                    }

                    value = std::move(ends_tail->second);  // 獲取元素值
                    ends__.erase(ends_tail);  // 從處理隊列中移除
                    
                    size_.fetch_sub(1, std::memory_order_release);
                    ends_ack_.fetch_add(1, std::memory_order_release);
                    
                    // 執行後置處理邏輯(需子類實現)
                    OnEnd(value.element, value.result);
                    return true;
                }
            }

            // 處理所有可用事件(批量處理)
            int                                                         DoEvents() noexcept 
            {
                for (int events = 0;;) 
                {
                    TryBegin();
                    if (TryEnd())
                    {
                        events++;
                    }
                    else 
                    {
                        return events;  // 返回處理的事件總數
                    }
                }
            }

            // 清空隊列
            void                                                        Clear() noexcept 
            {
                chtrader::unordered_map<int, ConcurrencyEndValue> ends;
                chtrader::unordered_set<int> skip;
                chtrader::list<TElement> list;

                for (SynchronizeObjectScope scope(syncobj_);;)
                {
                    list = std::move(list_);
                    skip = std::move(skip_);
                    ends = std::move(ends__);

                    list_.clear();
                    skip_.clear();
                    ends__.clear();
                    break;
                }
            }

            int                                                         Count() noexcept { return size_.load(std::memory_order_acquire); }

        protected:   
            // 前置處理回調(需子類實現具體邏輯)    
            virtual bool                                                OnBegin(const TElement& element, TResult& output) noexcept = 0;

            // 後置處理回調(需子類實現具體邏輯)                    
            virtual void                                                OnEnd(const TElement& element, const TResult& result) noexcept = 0;
           
        private: 
            SynchronizeObject                                           syncobj_;               // 同步鎖對象
            chtrader::list<TElement>                                    list_;                  // 底層元素隊列
            chtrader::unordered_set<int>                                skip_;                  // 跳過處理的ID集合

            std::atomic<int>                                            size_        = 0;       // 數量
            std::atomic<int>                                            ends_ack_    = 1;       // 當前處理的ID
            std::atomic<int>                                            ends_inc_    = 0;       // ID遞增計數器
            chtrader::unordered_map<int, ConcurrencyEndValue>           ends__;                 // 處理中的元素映射
       };
   }
}