Stories

Detail Return Return

C++ 從 0 到 1 完成一個支持 future/promise 的 Windows 異步串口通信庫 - Stories Detail

由於我在工作環境中不斷由於需要為了不同硬件設備寫新的串口通信庫,所以我寫了一個集成了 future/promise 的異步串口通信庫,並且已經用在了每天有數十萬人使用的生產環境設備上,下面分享一下如何從零開始構建一個集成 C++11 的 future/promise 機制的實用的異步串口通信庫。

1. 異步串口通信庫的設計思路

設計思路:

  1. 首先創建一個串口通信工具庫,支持同步方式的串口消息收發;
  2. 然後我們創建一個生產者消費者隊列,需要請求數據的時候往這個隊列裏存 promise 並立即給使用者返回一個 future,串口庫獲得回覆或者超時的時候給這個 promise 回覆信息,這樣使用者就可以通過 future 獲得返回的結果;
  3. 生產者消費者隊列需要滿足:

    1. 在生產消息的時候通過條件變量通知消費者線程取隊列中的消息來消費,這個消息在成功取到結果或超時的時候出隊;
    2. 如果生產消息時在隊列滿則進行等待;
  4. 在使用者實例化這個串口通信庫的時候,將串口解析協議(ValidateFrame 一個解析函數,返回 int 表示解析到的針頭地址)傳入,以可以適配不同硬件設備的不同串口通信協議;

用户的使用流程:

  1. 用户發起異步請求,立即獲得 future 對象;
  2. 其請求被封裝成任務並投遞到隊列中;
  3. 之後消費者線程從隊列中取出任務並執行串口通信;
  4. 通信完成或超時後,通過 promise 設置結果;
  5. 用户通過 future 獲取最終結果;

2. Windows 串口通信 API 封裝

底層串口通信基於 Windows 提供的原生 API 實現,主要包括:

  • CreateFile:打開串口設備;
  • SetCommState:配置串口參數(波特率、數據位等);
  • WriteFile:發送數據到串口;
  • ReadFile:從串口讀取數據;
  • PurgeComm:清空串口緩衝區;

為了適配不同硬件設備的通信協議,庫提供了可插拔的協議解析接口 ValidateFrame,在消費消息的時候不斷檢索緩衝區,在超時之前緩衝區中是否有滿足用户需要的報文體,如果有就成功返回,否則繼續檢索。

// 用户自定義的協議解析函數
// 返回值:0-繼續等待,-1-無效幀,>0-有效幀長度
using ValidFrameCallback = std::function<int(const unsigned char *, int)>;

完整實現涉及較多細節,具體可以到 Github 倉庫裏查閲代碼。

3. 生產者/消費者消息隊列實現

首先,創建一個生產者消費者的有界消息隊列 BoundedQueue 用來管理待處理消息,這個隊列是一個模板類,傳入的模板 MSG_BASE 是消息體,如果消息被成功返回,使用者可以從這個消息體中獲取返回的消息碼和報文體,隊列在實例化的時候需這個隊列的容量,內部則通過條件變量來協調消費線程對消息的消費:

template <typename MSG_BASE>
class BoundedQueue {
 public:
  explicit BoundedQueue(size_t capacity) : capacity_(capacity) {}

  bool push(std::unique_ptr<MSG_BASE> message) {
    std::unique_lock lock(mutex_);
    not_full_.wait(lock, [this]() { return queue_.size() < capacity_; });

    queue_.push(std::move(message));
    lock.unlock();
    not_empty_.notify_one(); // 通知消費者有新消息
    return true;
  }

  std::unique_ptr<MSG_BASE> pop() {
    std::unique_lock lock(mutex_);
    not_empty_.wait(lock, [this]() { return !queue_.empty(); });

    auto message = std::move(queue_.front());
    queue_.pop();
    lock.unlock();
    not_full_.notify_one();
    return message;
  }

  bool empty() const {
    std::lock_guard lock(mutex_);
    return queue_.empty();
  }

  size_t size() const {
    std::lock_guard lock(mutex_);
    return queue_.size();
  }

 private:
  const size_t capacity_;
  std::queue<std::unique_ptr<MSG_BASE>> queue_;
  mutable std::mutex mutex_;

  std::condition_variable not_empty_;
  std::condition_variable not_full_;
};

注意對隊列的操作需要加鎖,以保證線程安全。

3. 消費者管理類和消息相關類設計

消費者管理類並持有之前的生產者消費者隊列實例,做的事比較簡單,就是負責創建和管理消費線程,消費線程會不斷從隊列中取消息出來消費:

template <typename MSG_BASE>
class ConsumerManager {
 public:
  ConsumerManager(BoundedQueue<MSG_BASE>& queue, size_t thread_count = 1)
      : queue_(queue), running_(true) {
    for (size_t i = 0; i < thread_count; ++i) {
      threads_.emplace_back(&ConsumerManager::worker, this);
    }
  }

  ~ConsumerManager() { stop(); }

  void stop() {
    running_ = false;
    for (auto& thread : threads_) {
      if (thread.joinable()) {
        thread.join();
      }
    }
  }

 private:
  void worker() const {
    while (running_) {
      if (const auto taskItem = queue_.pop()) {
        taskItem->execute();
      }
    }
  }

  BoundedQueue<MSG_BASE>& queue_;
  std::atomic<bool> running_;
  std::vector<std::thread> threads_;
};

消息相關的模板類 Message 作為通信的消息容器,繼承自 MessageBase,通過模板支持多種返回類型,並提供 execute 方法作為消費線程的入口,使用者可以傳遞一個返回值類型給 Message 類,消費者管理類會在消費消息之後通過 Message 傳遞返回值給使用者:

template <typename T>
struct is_future : std::false_type {};

template <typename T>
struct is_future<std::future<T>> : std::true_type {};

template <typename T>
inline constexpr bool is_future_v = is_future<T>::value;

class MessageBase {
 public:
  virtual ~MessageBase() = default;
  virtual std::unique_ptr<void> execute() = 0;

  template <typename RetType>
  std::future<RetType> getFuture() {
    promise_ = std::make_shared<std::promise<std::unique_ptr<void>>>();
    return std::async(std::launch::deferred, [promise = promise_]() -> RetType {
      auto result = promise->get_future().get();
      if constexpr (is_future_v<RetType>) {
        auto* future_ptr = static_cast<RetType*>(result.get());
        return std::move(*future_ptr);
      } else {
        return *static_cast<RetType*>(result.get());
      }
    });
  }

 protected:
  std::shared_ptr<std::promise<std::unique_ptr<void>>> promise_;
};

template <typename RetType>
class Message : public MessageBase {
 public:
  using ExecutorType = std::function<RetType()>;

  explicit Message(ExecutorType executor) : executor_(std::move(executor)) {}

  std::unique_ptr<void> execute() override {
    auto result = executor_();
    return std::make_unique<RetType>(std::move(result));
  }

 private:
  ExecutorType executor_;
};

5. 完整使用示例

#include <iostream>
#include <random>
#include <thread>
#include <chrono>

// 生成測試數據
std::vector<unsigned char> generateRandomData() {
    static std::random_device rd;
    static std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(0, 0xFF);

    return {0xAA, static_cast<unsigned char>(dis(gen)),
            static_cast<unsigned char>(dis(gen)),
            static_cast<unsigned char>(dis(gen)), 0xBB};
}

// 示例協議解析函數
// 協議格式:0xAA [數據] 0xBB
int validateFrame(const unsigned char* buffer, int length) {
    if (length < 4) return 0;  // 數據不足,繼續等待

    if (buffer[0] == 0xAA) {
        // 查找結束標誌
        for (int i = 3; i < length; i++) {
            if (buffer[i] == 0xBB) {
                return i + 1;  // 返回完整幀長度
            }
        }
    }

    return -1;  // 無效幀
}

// 串口任務封裝類
class SerialTask {
public:
    explicit SerialTask(std::shared_ptr<SerialComm> serial) : serial_(serial) {}

    std::future<SerialRet> sendAsync(const std::vector<unsigned char>& data,
                                   int timeout = 10000) const {
        std::cout << "發送數據: " << bytes2hexStr(data) << std::endl;
        return serial_->SendCommandAsync(data, validateFrame, timeout);
    }

private:
    std::shared_ptr<SerialComm> serial_;
};

int main() {
    // 初始化串口通信
    auto serial = std::make_shared<SerialComm>();
    constexpr int comPort = 1;
    constexpr int baudRate = 115200;

    if (int res = serial->OpenCom(comPort, baudRate); res != 0) {
        std::cout << "打開串口失敗,錯誤代碼: " << res << std::endl;
        return 1;
    }
    std::cout << "串口打開成功!" << std::endl;

    // 創建消息隊列和消費者管理器
    BoundedQueue<Message<SerialRet>> queue(10);
    ConsumerManager consumer(queue);

    // 創建串口任務封裝器
    SerialTask serialTask(serial);

    // 主循環:處理用户輸入
    std::string line;
    std::cout << "請按回車發送隨機數據,輸入 'quit' 退出..." << std::endl;
    
    while (std::getline(std::cin, line)) {
        if (line == "quit") break;

        // 創建異步任務
        auto task = std::make_unique<Message<SerialRet>>([&serialTask]() -> SerialRet {
            auto fut = serialTask.sendAsync(generateRandomData());
            auto [ret, response] = fut.get();
            
            if (ret > 0) {
                std::cout << "任務完成,收到響應: " << bytes2hexStr(response) << std::endl;
            } else {
                std::cout << "任務失敗,錯誤代碼: " << ret << std::endl;
            }
            
            return std::make_tuple(ret, response);
        });

        // 投遞到隊列
        queue.push(std::move(task));

        // 短暫等待以觀察結果
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

    // 清理資源
    consumer.stop();
    serial->CloseCom();
    std::cout << "程序退出" << std::endl;

    return 0;
}

6. 總結

代碼已開源,項目地址:https://github.com/SHERlocked93/cpp_easy_serial

這個庫已在生產環境中驗證,支持高併發場景下的穩定運行。如果你在項目中需要進行串口通信,不妨試試這個庫,也可以在這個基礎上進行修改迭代,歡迎給提 pr 提 issue 一起討論啊!


網上的帖子大多深淺不一,甚至有些前後矛盾,在下的文章都是學習過程中的總結,如果發現錯誤,歡迎留言指出,如果本文幫助到了你,別忘了點贊支持一下,你的點贊是我更新的最大動力!~

PS:本文同步更新於在下的博客 Github - SHERlocked93/blog 系列文章中,歡迎大家關注我的公眾號 CPP下午茶,直接搜索即可添加,持續為大家推送 CPP 以及 CPP 周邊相關優質技術文,共同進步,一起加油~

user avatar ZhongQianwen Avatar Yzi321 Avatar samhou Avatar starrocks Avatar muzijun_68c14af5563a2 Avatar puxiaoke6 Avatar hedzr Avatar greyham Avatar iceblue Avatar OasisPioneer Avatar pcworld Avatar wei-boke Avatar
Favorites 25 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.