動態

詳情 返回 返回

基於 IOCP 的協程調度器——零基礎深入淺出 C++20 協程 - 動態 詳情

前言

上一篇《基於 epoll 的協程調度器》談到如何基於 epoll 構建一個事件驅動的協程調度器,沒有使用三方庫的原因主要是為了避免引入額外複雜度,不過只演示 Linux 未免對非 Unix 平台的小夥伴有所不公,為此本文基於 Windows 的完成端口 (IO Completion Port:IOCP) 構建相同能力的 demo。

文章仍然遵守之前的創作原則:

* 選取合適的 demo 是頭等大事

* 以協程為目標,涉及到的新語法會簡單説明,不涉及的不旁徵博引

* 若語法的原理非常簡單,也會簡單展開講講,有利於透過現象看本質,用起來更得心應手

上一篇文章裏不光引入了基於事件的調度器,還説明了如何開啓多文件並行、await_suspend 與試讀的關係、singalfd 用於完美退出等話題,如果沒有這些內容鋪墊,看本文時會有很多地方難以理解,還沒看過的小夥伴,牆裂建議先看那篇。

工具還是之前介紹過的 Compile Explorer,這裏不再用到 C++ Insights ,主要是它不支持 Windows 平台,其實 Compiler Explorer 也只是編譯,運行的話還是不太行,因為它的環境不支持像文件、網絡之類的異步 IO,需要用户自行搭建開發環境。

基於完成端口的 IO 多路複用

上文中提到了 Unix 系統中多路複用接口的發展歷程:分別經歷了 select -> poll -> epoll/kqueue,Windows 則通過完成端口一統江山,其實它倆調用方式差不太多:

  epoll IOCP
初始化 epoll_create
CreateIoCompletionPort
關聯句柄 epoll_ctl
CreateIoCompletionPort
等待並獲取下一個事件 epoll_wait
GetQueuedCompletionStatus
投遞事件 n/a (self pipe trick) PostQueuedCompletionStatus
銷燬 close CloseHandle

而在可等待對象上,IOCP 則豐富的多:

* 文件 I/O 事件​​
* 文件系統變更
* 套接字(Socket)事件​​
* 命名管道(Named Pipe)事件​​
* 設備 I/O 事件​​
* 定時器事件(結合 Waitable Timer)​​

這方面能與它相提並論的恐怕只有 kqueue 了。有了上面的鋪墊再參考之前 epoll 的實現,直接上 demo 源碼:

#include <coroutine>
#include <unordered_map>
#include <windows.h>
#include <vector>
#include <stdexcept>
#include <iostream>
#include <sstream>
#include <memory>

struct Task {
    struct promise_type {
        Task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };
};

class IocpScheduler {
private:
    HANDLE iocp_handle;
    std::unordered_map<HANDLE, std::coroutine_handle<>> io_handles;

public:
    IocpScheduler() {
        iocp_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
        if (iocp_handle == NULL) {
            throw std::runtime_error("CreateIoCompletionPort failed");
        }
    }

    ~IocpScheduler() {
        CloseHandle(iocp_handle);
    }

    void register_io(HANDLE file_handle, std::coroutine_handle<> handle) {
        if (io_handles.find(file_handle) == io_handles.end()) {
            io_handles[file_handle] = handle;

            if (CreateIoCompletionPort(file_handle, iocp_handle, (ULONG_PTR)file_handle, 0) == NULL) {
                throw std::runtime_error("CreateIoCompletionPort failed to associate file handle");
            }
        }
    }

    void run() {
        while (true) {
            DWORD bytes_transferred = 0;
            ULONG_PTR completion_key = 0;
            LPOVERLAPPED overlapped = nullptr;

            BOOL success = GetQueuedCompletionStatus(
                iocp_handle,
                &bytes_transferred,
                &completion_key,
                &overlapped,
                INFINITE);

            if (completion_key != 0) {
                HANDLE ready_handle = (HANDLE)completion_key;
                if (auto it = io_handles.find(ready_handle); it != io_handles.end()) {
                    it->second.resume();
                }
            }
        }
    }
};

struct AsyncReadAwaiter {
    IocpScheduler& sched;
    HANDLE file_handle;
    std::unique_ptr<char[]> buffer;
    DWORD buffer_size;
    OVERLAPPED overlapped;
    DWORD bytes_read;

    AsyncReadAwaiter(IocpScheduler& s, HANDLE file, DWORD size)
        : sched(s), file_handle(file), buffer_size(size), bytes_read(0) {
        buffer = std::make_unique<char[]>(size);
        ZeroMemory(&overlapped, sizeof(OVERLAPPED));
    }

    bool await_ready() const {
        return false;
    }

    void await_suspend(std::coroutine_handle<> h) {
        sched.register_io(file_handle, h);
        
        if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {
            DWORD error = GetLastError();
            if (error != ERROR_IO_PENDING) {
                std::stringstream ss;
                ss << "ReadFile failed, error " << error;
                throw std::runtime_error(ss.str());
            }
        }
    }

    std::string await_resume() {
        DWORD bytes_transferred = 0;
        if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
            DWORD error = GetLastError();
            std::stringstream ss;
            ss << "GetOverlappedResult failed, error " << error;
            throw std::runtime_error(ss.str());
        }

        return std::string(buffer.get(), bytes_transferred);
    }
};

Task async_read_file(IocpScheduler& sched, const char* path) {
    HANDLE file_handle = CreateFileA(
        path,
        GENERIC_READ,
        FILE_SHARE_READ,
        NULL,
        OPEN_EXISTING,
        FILE_FLAG_OVERLAPPED,
        NULL);

    if (file_handle == INVALID_HANDLE_VALUE) {
        std::stringstream ss;
        ss << "CreateFile failed, error " << GetLastError();
        throw std::runtime_error(ss.str());
    }

    while (true) {
        auto data = co_await AsyncReadAwaiter(sched, file_handle, 4096);
        std::cout << "Read " << data.size() << " bytes\n";
        if (data.size() == 0) {
            break;
        }
    }

    CloseHandle(file_handle);
}

int main(int argc, char* argv[]) {
    if (argc < 2) {
        std::cout << "Usage: sample file_path" << std::endl;
        return 1;
    }

    IocpScheduler scheduler;
    async_read_file(scheduler, argv[1]);
    scheduler.run();
    return 0;
}

先看編譯:

image

Compile Explorer 中指定最新的 msvc 編譯器和 C++20 選項可以編譯通過,注意在 Windows 中選項指定的語法與 Unix 大相徑庭,別弄錯了。

一點一點降低版本嘗試,發現能編譯這段代碼的最低版本是 msvc19.29,對應 vs16.11,如果你需要在本地安裝測試環境的話,穩妥起見安裝 msvc19.30、對應 vs17.0 也就是  VS2022 比較好,如果本地只有 VS2019,需要升級到第五個也就是最後一個發行版才可以。

image

接下來創建一個簡單的控制枱應用包含上面的源文件,需要配置一下 C++ 語言標準:

image

就可以編譯生成可執行文件了,在同目錄準備一個文本文件 (test.txt) 進行測試:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
...
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 409
PS D:\code\iocp_coroutine\Debug>

居然死循環了。

指定偏移量

同樣的代碼邏輯,Unix 上沒問題 Windows 上卻死循環,主要原因是:前者底層使用的是管道,與 socket 之類相似是一個流 (stream),因此沒有讀寫偏移量的説法,每次從開頭獲取就可以了;後者使用的是文件,如果不指定偏移量,每次都會從位置 0 讀取,有的讀者可能問了,為何不能使用當前文件的讀取位置呢?這是因為 Windows 上的多路複用底層是徹徹底底的異步架構,必需每次為 ReadFile 指定一個偏移量,而不能夠使用當前文件的偏移量。

修復的方法很簡單,為 ReadFile 的 overlapped 參數的 Offset & OffsetHigh 字段指定要讀取數據的偏移量即可:

...
    struct AsyncReadAwaiter {
    IocpScheduler& sched;
    HANDLE file_handle;
    std::unique_ptr<char[]> buffer;
    DWORD buffer_size;

增加一個引用成員用來記錄當前請求的偏移值,LARGE_INTEGER 可以理解為 uint64 的結構化表達

    LARGE_INTEGER &offset; 
    OVERLAPPED overlapped;
    DWORD bytes_read;

    AsyncReadAwaiter(IocpScheduler& s, HANDLE file, LARGE_INTEGER &off, DWORD size)

在構造函數中初始化新成員,這個值需要從外部傳入,讀取成功後更新之,以便跨等待對象使用

        : sched(s), file_handle(file), buffer_size(size), offset(off), bytes_read(0) {
        buffer = std::make_unique<char[]>(size);
        ZeroMemory(&overlapped, sizeof(OVERLAPPED));
    }

    bool await_ready() const {
        return false;
    }

    void await_suspend(std::coroutine_handle<> h) {
        sched.register_io(file_handle, h);

每次請求前設置 overlapped 的偏移字段,並增加調試日誌輸出以便觀察

        overlapped.Offset = offset.LowPart; 
        overlapped.OffsetHigh = offset.HighPart; 
        std::cout << "ReadFile from " << offset.QuadPart << std::endl;
        if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {
            DWORD error = GetLastError();
            if (error != ERROR_IO_PENDING) {
                std::stringstream ss;
                ss << "ReadFile failed, error " << error;
                throw std::runtime_error(ss.str());
            }
        }
    }

    std::string await_resume() {
        DWORD bytes_transferred = 0;
        if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
            DWORD error = GetLastError();
            std::stringstream ss;
            ss << "GetOverlappedResult failed, error " << error;
            throw std::runtime_error(ss.str());
        }

讀取成功後,遞增相應的偏移量

        offset.QuadPart += bytes_transferred; 
        return std::string(buffer.get(), bytes_transferred);
    }
};

Task async_read_file(IocpScheduler& sched, const char* path) {
    HANDLE file_handle = CreateFileA(
        path,
        GENERIC_READ,
        FILE_SHARE_READ,
        NULL,
        OPEN_EXISTING,
        FILE_FLAG_OVERLAPPED,
        NULL);

    if (file_handle == INVALID_HANDLE_VALUE) {
        std::stringstream ss;
        ss << "CreateFile failed, error " << GetLastError();
        throw std::runtime_error(ss.str());
    }

在外層循環中保存這個偏移量,以便可以持久化使用,初始值為 0

    LARGE_INTEGER offset = { 0 }; 
    while (true) {
        auto data = co_await AsyncReadAwaiter(sched, file_handle, offset, 4096);
        std::cout << "Read " << data.size() << " bytes\n";
        if (data.size() == 0) {
            break;
        }
    }

    CloseHandle(file_handle);
}
...

再次運行程序,可以輸出讀取的內容了:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
ReadFile from 0
Read 4096 bytes
ReadFile from 4096
Read 456 bytes
ReadFile from 4552

但是額外的,也收到了一個崩潰提示:

image

處理文件 EOF

記得之前講到協程體整個是包在編譯的 try...catch 代碼塊中的,這裏直接崩潰難道是 msvc 的異常處理沒起作用?掛上調試器看看崩潰堆棧:

image

看起來是命中 promise 對象的 unhandle_exception,這裏調用的 terminate 導致崩潰框彈出,而 unhandled_exception 恰恰是編譯器捕獲了 throw 拋出的異常,與直覺剛好相反。經過排查,唯一可能拋出異常的位置是這裏:

    std::string await_resume() {
        DWORD bytes_transferred = 0;
        if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
            DWORD error = GetLastError();
            std::stringstream ss;
            ss << "GetOverlappedResult failed, error " << error;

這裏加打一行日誌

            std::cerr << ss.str() << std::endl;
            throw std::runtime_error(ss.str());
        }
        
        offset.QuadPart += bytes_transferred; 
        return std::string(buffer.get(), bytes_transferred);
    }

新的輸出果然提示這裏返回了錯誤:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
ReadFile from 0
Read 4096 bytes
ReadFile from 4096
Read 456 bytes
ReadFile from 4552
GetOverlappedResult failed, error 38

錯誤碼 38 對應的是 ERROR_HANDLE_EOF表示文件已到末尾,相比 epoll 管道不關心數據結尾的問題,IOCP 讀文件還需要額外增加一些處理,另外在拋異常時,msvc 相比 clang 的顯示不太友好,需要在拋出異常前補上 stderr 的打印,修復後的代碼如下:

    std::string await_resume() {
        DWORD bytes_transferred = 0;
        if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
            DWORD error = GetLastError();

判斷錯誤類型,如果是文件 EOF,直接返回空數據,上層會進行判斷,從而退出讀取循環

            if (error != ERROR_HANDLE_EOF) {
                std::stringstream ss;
                ss << "GetOverlappedResult failed, error " << error;
                std::cerr << ss.str() << std::endl;
                throw std::runtime_error(ss.str());
            }
            else {
                return ""; 
            }
        }

        offset.QuadPart += bytes_transferred; 
        return std::string(buffer.get(), bytes_transferred);
    }

下面是新的輸出:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
ReadFile from 0
Read 4096 bytes
ReadFile from 4096
Read 456 bytes
ReadFile from 4552
Read 0 bytes

不再報錯了。

多文件並行

上面的例子雖然通過多次讀取展示了協程多次喚醒的過程,但沒有展示多個 IO 句柄併發的能力,下面稍加改造,同時讀取多個文件:

Task async_read_file(IocpScheduler& sched, const char* path) {
    HANDLE file_handle = CreateFileA(
        path,
        GENERIC_READ,
        FILE_SHARE_READ,
        NULL,
        OPEN_EXISTING,
        FILE_FLAG_OVERLAPPED,
        NULL);

    if (file_handle == INVALID_HANDLE_VALUE) {
        std::stringstream ss;
        ss << "CreateFile failed, error " << GetLastError();
        std::cerr << ss.str() << std::endl; 
        throw std::runtime_error(ss.str());
    }

    LARGE_INTEGER offset = { 0 };
    while (true) {
        auto data = co_await AsyncReadAwaiter(sched, file_handle, offset, 4096);

輸出文件句柄以區別從不同文件讀取的數據

        std::cout << "Read [" << file_handle << "] " << data.size() << " bytes\n";
        if (data.size() == 0) {
            break;
        }
    }

    CloseHandle(file_handle);
}

int main(int argc, char* argv[]) {
    if (argc < 3) {
        std::cout << "Usage: sample file1 file2" << std::endl;
        return 1;
    }

    IocpScheduler scheduler;
    async_read_file(scheduler, argv[1]);

多個文件只需要多次調用協程即可,從這裏可以感受到協程強大的拓展性

    async_read_file(scheduler, argv[2]);
    scheduler.run();
    return 0;
}

下面是新的輸出:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt test2.txt
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 456 bytes
Read [0000010C] 456 bytes
Read [00000108] 0 bytes
Read [0000010C] 0 bytes

為了便於對比,這裏將讀取 buffer 的默認尺寸改為 1024,並去掉了調試日誌。可以看出在 IOCP 裏兩個文件基本是輪着讀的,公平性還是不錯的。

await_suspend & 試讀

上文中提到,通過在 await_ready 或 await_suspend 中增加一些代碼,就可以支持數據試讀,從而在某些場景下提升數據吞吐能力。下面看看 IOCP 是如何實現的,這裏只演示 await_suspend 方式:

    bool await_suspend(std::coroutine_handle<> h) {
        sched.register_io(file_handle, h);

        overlapped.Offset = offset.LowPart;
        overlapped.OffsetHigh = offset.HighPart;
        //std::cout << "ReadFile from " << offset.QuadPart << std::endl;
        if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {
            DWORD error = GetLastError();
            if (error != ERROR_IO_PENDING) {
                std::stringstream ss;
                ss << "ReadFile failed, error " << error;
                std::cerr << ss.str() << std::endl; 
                throw std::runtime_error(ss.str());
            }
        }

ReadFile 本身具有試讀能力,當任務可以立即完成時,它將返回 TRUE,bytes_read 參數將返回讀取的數據長度;這裏加入判斷,若立即讀取成功,則返回 false 不掛起協程

        else {
            // if immediately success, not hangup
            std::cout << "immediately success, read = " << bytes_read << std::endl; 
        }
        return bytes_read > 0 ? false : true;
    }

    std::string await_resume() {
        DWORD bytes_transferred = 0;

resume 時先判斷是否為試讀場景,是的話直接返回數據就可以了

        if (bytes_read > 0) {
            bytes_transferred = bytes_read;
        }
        else {
            if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
                DWORD error = GetLastError();
                if (error != ERROR_HANDLE_EOF) {
                    std::stringstream ss;
                    ss << "GetOverlappedResult failed, error " << error;
                    std::cerr << ss.str() << std::endl;
                    throw std::runtime_error(ss.str());
                }
                else {
                    return "";
                }
            }
        }

        offset.QuadPart += bytes_transferred; 
        return std::string(buffer.get(), bytes_transferred);
    }

從這裏也可以看出,Windows 對直接成功的支持是比較好的,不必像 Unix 那樣來回倒騰數據,下面是新版本輸出:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
Read [000000FC] 1024 bytes
Read [000000FC] 1024 bytes
Read [000000FC] 1024 bytes
Read [000000FC] 1024 bytes
Read [000000FC] 456 bytes
Read [000000FC] 0 bytes

運行了多次沒有試出來,可能 Windows 只對網絡等真正異步的場景才會有立即成功的情況吧。

PostQueuedCompletionStatus & 完美退出

上面的 demo 如果遇到大文件目前只能通過 Ctrl C 強制殺死,畢竟調度器的 run 是個死循環沒法退出,下面對它進行一番改造,看看能否實現完美退出

IocpScheduler g_scheduler;

由於需要在信號響應函數中通知調度器退出,這裏將它做為一個全局變量,工程化一點的話可以改成單例,這裏偷個懶

int main(int argc, char* argv[]) {
    if (argc < 2) {
        std::cout << "Usage: sample file" << std::endl;
        return 1;
    }

初始化時捕獲 SiGINT 以便響應 Ctrl C

    signal(SIGINT, on_user_exit); 
    async_read_file(g_scheduler, argv[1]);
    g_scheduler.run();
    return 0;
}

在信號響應函數中調用調度器退出接口實現完美退出

void on_user_exit(int signo) {
    g_scheduler.exit(signo); 
}

class IocpScheduler {
    ...

調度器中新增的退出接口,無腦給 IOCP 隊列投遞通知,注意 key 參數給的是 0,以區別於一般的文件讀取事件

    void exit(int signo) {
        std::cout << "caught signal " << signo << ", prepare to quit!" << std::endl; 
        PostQueuedCompletionStatus(iocp_handle, 0, (ULONG_PTR)0, NULL);
    }
    
    void run() {
        while (true) {
            DWORD bytes_transferred = 0;
            ULONG_PTR completion_key = 0;
            LPOVERLAPPED overlapped = nullptr;

            BOOL success = GetQueuedCompletionStatus(
                iocp_handle,
                &bytes_transferred,
                &completion_key,
                &overlapped,
                INFINITE);

收到事件後,優先檢測是否為退出事件,命中的話直接 break while 循環退出 main

            if (completion_key == 0) {
                std::cout << "IOCP ready to quit" << std::endl; 
                break; 
            }
            else {
                HANDLE ready_handle = (HANDLE)completion_key;
                if (auto it = io_handles.find(ready_handle); it != io_handles.end()) {
                    it->second.resume();
                }
            }
        }
    }

    ~IocpScheduler() {

調度器析構中增加協程的銷燬,防止內存、句柄泄漏

        for(auto handle : io_handles) {
            std::cout << "coroutine destroy" << std::endl;
            handle.second.destroy();
        }
        CloseHandle(iocp_handle);
    }
};

下面是新版輸出:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
Read [00000110] 1024 bytes
Read [00000110] 1024 bytes
Read [00000110] 1024 bytes
Read [00000110] 1024 bytes
Read [00000110] 456 bytes
Read [00000110] 0 bytes
caught signal 2, prepare to quit!
IOCP ready to quit
coroutine destroy

用户按下 Ctrl C 後,可以實現完美退出啦! 

結語

本文介紹了一種基於 IOCP 多路複用的協程調度器,除此之外還説明了如何妥善處理文件偏移、文件 EOF、await_suspend 與試讀寫、PostQueuedCompletionStatus 與進程完美退出等,可用於構建工業級強度的代碼。

最後,由於本文中 demo 經歷多次迭代,想要複製最終版進行驗證的小夥伴,可以 follow 這個開源 git 庫獲取:cpp20coroutine。

下一篇來看下,如何將現有的基於回調的異步庫與 C++20 協程無縫糅合。

參考 

[1]. 如果異步完成,ReadFile()是否總是返回FALSE?

[2]. 系統錯誤代碼 (0-499)

Add a new 評論

Some HTML is okay.