博客 / 詳情

返回

從源碼角度解析C++20新特性如何簡化線程超時取消

C++20中增加了很多重量級新特性,它不僅帶來了ranges、concept和協程,也為多線程編程帶來了jthread和stop_source這些強力輔助。利用這些新特性,我們可以更高效地編寫併發程序。

今天要説的就是利用jthread和stop_source來簡化線程超時控制的實現,最終我們可以實現一個簡單高效、可維護性不輸給號稱“天生支持併發”的Go語言的版本。

為什麼需要超時控制

超時控制是很常見的需求,最普遍的場景是為了防止程序卡住或者長時間佔用資源,程序會主動取消掉一些超過允許運行時間的或者無響應的線程,比如一些耗時很長的網絡連接處理線程等。當然用户等得不耐煩了手動點擊取消任務執行也勉強可以算在內。

通常超時發生或者用户點擊取消之後,我們都期待線程能迅速終止執行並讓整個程序保持一個完整且安全的狀態。然而現實是複雜的,想實現上述功能對於線程來説是一件難事,尤其在Linux系統上。

第一個難點是如何讓線程知道自己要退出。對於進程來説這不是難點,因為不管進程在做什麼,我們都可以靠向其發送信號來立即中斷進程的執行(前提是線程沒有屏蔽這個信號),這樣進程的停止請求可以被立即感知到,進程從而可以儘快完成善後工作退出執行。同樣的招數對多線程程序來説就沒那麼好用了——信號默認是發給整個進程的,為了能讓每個線程獨立地接收信號,我們需要保存線程的標識符並在每個線程中設置接收和屏蔽信號的mask,這大大增加了程序的複雜性;其次信號處理函數是整個進程內所有線程共享的,我們需要額外的手段來保證併發安全,同時還得兼顧信號處理函數需要可重入、快速執行的最佳實踐,這會提高程序的開發難度。

第二個難點在於如何保證線程一定會退出執行。前面説到信號可以打斷進程的執行,但這只是通知,實際上進程完全可以在信號處理函數返回後無視這個通知繼續運行,或者有一種更普遍的場景——程序正好卡在某個系統調用上,而程序又設置了系統調用被信號中斷後自動重啓,這樣即使我們有效通知了進程,進程也會在收完通知之後再次進入系統調用從而無法響應停止請求。所以作為保底手段,Linux可以發送SIGKILL這個信號強制終止進程,這個信號無法捕獲也無法屏蔽,是我們貨真價實的“底牌”。

上述的情況在多線程中同樣存在,而且我們沒有“底牌”可用——因為不管給哪個線程發送SIGKILL,都會殺死整個進程而不是單獨接收到信號的那個線程。另外即使有辦法強制終止線程(比如早期的JVM),我們還會遇到資源釋放的問題。進程退出執行之後,內核會盡可能釋放進程持有的所有資源,打開的文件會被關閉,緩衝區的內容會被刷新,文件鎖之類的同步機制也會正常解鎖;但線程並沒有這種自動清理機制,清理工作完全需要手動執行,一旦進程沒有釋放自己持有的資源就退出,系統就會遇到各種數據損壞和死鎖等併發問題,排查和修復會極其困難。

為了克服上述難點並安全高效地實現終止超時線程的執行,我們需要一些額外的控制手段。這也一直都是開發者中的熱門話題。

在介紹C++20如何簡化超時控制之前,我們先來看看前人的智慧成果。

Golang實現超時控制

Golang是天生支持併發的語言,這一點可謂名副其實,尤其是在超時控制上。

我們直接看個例子,例子裏有主線程和工作線程,工作線程超時時間為5秒,如果超過這個時間還有線程沒完成工作,就取消所有線程的執行。Golang裏沒有系統級的線程,但我們可以用goroutine模擬。

在工作線程中我們用sleep代替耗時的工作,這樣便於測試:

func Work(ctx context.Context, id int) error {
	for range 10 {
		select {
		case <-ctx.Done():
			fmt.Printf("worker %d: canceled\n", id)
			return ctx.Err()
		default:
		}
		if rand.IntN(2) == 0 {
			time.Sleep(500 * time.Millisecond)
		} else {
			time.Sleep(time.Second)
		}
	}
	fmt.Printf("worker %d: done\n", id)
	return nil
}

超時控制是ctx參數實現的,每次循環處理前我們都會主動檢查線程是否需要退出,這種協作式的“請求-檢查-響應”是各種語言中取消線程執行的常見做法。

這個工作函數執行時間在5秒到10秒之間,取值的步長在0.5秒,加上go標準庫默認隨機數是均勻分佈的,所以整體執行時間的概率是正態分佈的,在7.5秒左右我們很容易看到超時和正常運行結束兩種情況。所以我們把超時時間分別設為4秒、7.5秒、11秒,來進行模擬運行實驗:

func main() {
    // 從命令行獲取超時時間,單位毫秒
    timeout, err := strconv.Atoi(os.Args[1])
    if err != nil {
        panic(err)
    }
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Millisecond)
	now := time.Now()
	defer cancel()
	g := &errgroup.Group{}
	for i := range 3 {
		g.Go(func() error {
			return Work(ctx, i)
		})
	}
	err = g.Wait()
	fmt.Printf("run time: %s\n", time.Since(now))
	if err != nil {
		if errors.Is(err, context.DeadlineExceeded) {
			fmt.Println("Tasks canceled")
			return
		}
		panic(err)
	}
	fmt.Println("All work done!")
}

代碼很簡單,關鍵在這行:ctx, cancel := context.WithTimeout(context.Background(), 7500*time.Millisecond),只要我們設定的時間到了,<-ctx.Done()就會從阻塞變為非阻塞,循環開始處的檢查會發現這個變化,然後會退出線程的執行。代碼中使用了errgroup,但這不是必須的,實際上有很多辦法可以通知主線程,這裏我選擇了一種最通用的,代價是代碼會稍微複雜一些。

運行代碼,會看到下面這樣的輸出,結果有很大的隨機成分,下面只是無數種可能中的一種:

$ go build -o test

$ ./test 4000
worker 1: canceled
worker 0: canceled
worker 2: canceled
run time: 4.00431275s
Tasks canceled

$ ./test 7500
worker 0: done
worker 2: done
worker 1: canceled
run time: 7.507776458s
Tasks canceled

$ ./test 11000
worker 1: done
worker 2: done
worker 0: done
run time: 8.509193125s
All work done!

可以看到超時控制發揮了作用,儘管內置的time計時有一些誤差,但程序的總體的運行時間是小於等於超時時間的。

Golang的超時控制可以通過context簡單實現,但需要工作線程主動檢查主動配合,前文我們也提到了強制終止工作線程很可能會造成併發問題,因此所有的線程超時控制中都是採用的這種協作式退出機制,即使天生併發的語言也不能免俗。作為代價,我們需要謹慎編碼以免工作線程無法響應退出請求,同時還需要付出一點在循環裏檢查是否需要退出執行的性能損失。

C++中的典型超時控制實現

c++沒有方便好用的context,想要實現協作式退出得自己造輪子。

Golang好用是因為標準庫和運行時調度器隱藏了實現的細節:WithTimeout實際上會創建一個定時器,到時間後調度器會執行定時器的回調函數主動關閉ctx內部的channel,這樣<-ctx.Done()就會從阻塞變成非阻塞,協程就能檢查到這一變化從而退出執行。

核心只在於兩點,以合適的方法標記線程已被取消和異步地在超時後設置取消標記。

第一點很容易解決,使用原子變量即可。第二點的異步通知有些棘手,但我們還是有幾種選擇:

  1. 使用alarm和信號: alarm會註冊一個定時器,到時間後給進程發送SIGALRM信號,雖説多線程程序裏不推薦用信號,但在這個場景下在信號處理函數裏設置原子變量是合適的,另外使用alarm(0)可以取消之前註冊的定時器。
  2. 使用多線程:我們可以另外創建一個線程,並在其中等待到超時時間過去之後設置標誌,這樣主線程也不會阻塞。

當然兩個方案各有缺點:

  1. alarm是整個進程共享的,且同時只能設置一個定時器,最後它只能設置秒級精度的超時時間;使用setitimer可以解決上面這些問題,但會出現不知道信號是哪個超時的定時器發送的問題。
  2. 多線程方案問題比較少,集中在變量生命週期和任務正常完成如何取消超時控制線程這兩點上。

綜合來看使用多線程方案才能真正解決問題,跨平台性也更強。知道原理後我們就可以寫實驗代碼了。

轉換後的工作函數是這樣的:

namespace {
    std::atomic<int> canceled_flag{0};
    std::atomic<int> is_canceled{0};
}

void Work(int id)
{
    std::mt19937 rng{std::random_device{}()};
    std::uniform_int_distribution<int> dist(0, 1);

    for (int i = 0; i < 10; ++i) {
        if (canceled_flag.load(std::memory_order_acquire) == 1) {
            std::osyncstream{std::cout} << "worker: " << id << " canceled\n";
            is_canceled.store(1, std::memory_order_release);
            return;
        }
        if (dist(rng) == 0) {
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
        } else {
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
    }

    std::osyncstream{std::cout} << "worker: " << id << " done\n";
}

代碼和go版本的沒有太大差異,唯一的區別是我們不靠返回值而是is_canceled標誌來區分線程是否因為被取消而退出。只使用canceled_flag會導致競態條件並導致誤判,你可以想想是為什麼,這算是課後練習。另外我在這還使用了memory_order,這不是必須的,但默認的cst內存序多少有些殺雞用牛刀。

下面是主線程和超時控制線程的邏輯:

int main(int argc, const char *argv[])
{
    if (argc != 2) {
        std::cerr << "wrong arg\n";
        return 1;
    }
    auto timeout = std::stoi(argv[1]);

    std::vector<std::thread> workers;
    constexpr int worker_num = 3;
    workers.reserve(worker_num);
    for (int i = 0; i < worker_num; ++i) {
        workers.emplace_back(work, i);
    }

    std::atomic<int> timeout_cancel_flag{0};
    std::thread{
        // 超時控制線程
        [&timeout_cancel_flag](auto timeout){
            std::this_thread::sleep_for(timeout);
            if (timeout_cancel_flag.load(std::memory_order_acquire) == 1) { // 危險
                return;
            }
            canceled_flag.store(1, std::memory_order_release);
        }, std::chrono::milliseconds(timeout)
    }.detach();

    for (auto &worker: workers) {
        worker.join();
    }
    timeout_cancel_flag.store(1, std::memory_order_release);

    if (is_canceled.load(std::memory_order_acquire) != 1) {
        std::osyncstream{std::cout} << "All works done!\n";
    } else {
        std::osyncstream{std::cout} << "Tasks canceled\n";
    }
}

整體上沒什麼難懂的地方,基本可以看做Golang版本的轉譯,如果線程都退出之後不管是否超時我們都要取消超時控制線程。整體上只有一點不一樣:超時控制線程是detach的,因為我們不能讓主線程阻塞。

然而這段代碼有很致命的生命週期問題,想象一下如果worker都在timeout之前完成工作,且函數在timeout之前退出,但超時控制線程仍然需要睡眠到timeout為止,這時候它醒來訪問到的timeout_cancel_flag將會是一個無效值。

問題出在兩個地方,第一個是我們用了sleep,這不可中斷,線程必須要等滿timeout時間才能退出,這會造成線程泄漏;第二是因為sleep不可中斷,導致我們的超時控制線程生命週期長於主線程和工作線程,在其中引用的主線程的局部變量很可能會失效。

解決方案當然也很多,最簡單的就是用std::shared_ptr包裹我們需要跨線程訪問的資源,這和在rust中使用Arc是一樣的。但這種方案治標不治本,我們的超時控制線程仍然會有比其他線程長的生命週期。

第二種則是使用一種在超時等待中可以被中斷的機制,c++20前我們有std::timed_mutex和條件變量可用。

改進後的代碼:

auto timeout_cancel_flag = std::make_shared<std::condition_variable>();
std::thread{
    [timeout_cancel_flag](auto timeout){
        std::mutex timeout_lock;
        std::unique_lock u{timeout_lock};
        if (timeout_cancel_flag->wait_for(u, timeout) == std::cv_status::timeout) {
            std::osyncstream{std::cout} << "cancel all threads\n";
            canceled_flag.store(1, std::memory_order_release);
        } else {
            std::osyncstream{std::cout} << "self was canceled by main thread\n";
        }
    }, std::chrono::milliseconds(timeout)
}.detach();

for (auto &worker: workers) {
    worker.join();
}

timeout_cancel_flag->notify_all(); // 取消超時控制線程

我們可以使用條件變量的wait_for方法,它可以讓當前線程阻塞到指定的時間,或者中途被notify喚醒。這完美實現了我們既要超時等待又要中途可被打斷的需求。並且條件變量本身用std::shared_ptr包裹,不會有任何生命週期問題。

然而沒有了生命週期問題,我們還有時序問題,如果主線程中的notify_all()早於控制線程中的wait_for執行(概率比較小但不為0),那麼這次notify超時控制線程是收不到的,wait會一直阻塞到超過timeout,這時候再設置取消標誌就沒有意義了。想要解決這種“喚醒丟失”問題,我們需要藉助wait重載的第三個參數,讓它告訴我們超時控制線程本身是否被取消:

struct TimeoutContext {
    std::atomic<int> canceled{0};
    std::mutex lock;
    std::condition_variable cv;
};

auto timeout_ctx = std::make_shared<TimeoutContext>();
std::thread{
    [timeout_ctx](auto timeout){
        // 必須使用ctx裏的鎖才能有效避免競態條件
        std::unique_lock u{timeout_ctx->lock};
        if (!timeout_cancel_flag->wait_for(u, timeout, [&](){ return timeout_ctx->canceled.load(std::memory_order_acquire) == 1; })) {
            // wait_for 返回 false,canceled是值還是0,説明是超時導致的返回
            std::osyncstream{std::cout} << "cancel all threads\n";
            canceled_flag.store(1, std::memory_order_release);
        } else {
            // wait_for 返回 true,canceled被設置為1,説明主線程通知了取消
            std::osyncstream{std::cout} << "self was canceled by main thread\n";
        }
    }, std::chrono::milliseconds(timeout)
}.detach();

for (auto &worker: workers) {
    worker.join();
}

// 取消超時控制線程
{
    // 獲取同一把鎖,修改狀態時要麼超時控制線程還沒運行,要麼已經在wait了
    std::lock_guard lk(timeout_ctx->lock);
    timeout_ctx->canceled.store(1, std::memory_order_release);
}
// 解鎖後才能通知
timeout_ctx->cv.notify_all();

因為有鎖存在,所以不管怎麼樣運行順序只有兩種:

  1. 超時線程先運行,一直到wait方法裏解鎖,我們可以保證wait一定在notify之前運行
  2. 主線程設置超時線程取消標誌的代碼先運行,這時wait是晚於notify執行的,但我們設置取消標誌是先於wait的,而wait在休眠前會先檢查謂詞條件,所以條件變量會馬上退出不會進行等待。
  3. 會不會存在wait中條件變量解除了鎖,在即將進入休眠前主線程完成了執行?答案是不會的,標準有明文要求wait和它的兄弟函數裏unlock+wait加在一起是原子的(實際上分為三部分,解鎖+休眠、被喚醒、重新加鎖,它們各自都是原子的)且和notify之間是全序關係——要麼notify在前他們在後或者反過來,不可能同時執行。簡單説,如果超時控制線程正在執行unlock+wait,這説明主線程沒有拿到鎖,此時主線程要麼還沒運行到notify(這種情況不會丟失喚醒),要麼已經設置了標誌並釋放了鎖,謂詞會檢測到標誌被設置(謂詞檢測在鎖的保護中),條件變量不進入休眠;如果notify在之後運行,則notify會看到超時控制線程已經進入wait,會喚醒它。所以不存在中間可以被打斷的場景。

現在不再有時序問題了。

總體來説這個實現還不錯,能正常工作性能也尚可,很多框架也選擇了類似的方案來實現線程的超時取消,比如Qt。然而它有幾個顯著的缺點:

  1. 代碼依賴很多全局狀態
  2. 我們需要用join等待所有線程退出,這是因為標準庫的thread不join就析構會導致程序崩潰,然而這是不必要的,我們只關心工作是否完成,剩下的資源釋放和線程退出無需去等待
  3. 線程之間暴露了過多的實現細節,比如flag具體的值,再比如std::condition_variable用來通知超時控制線程
  4. 代碼真的很複雜,想正確實現整體邏輯會比較麻煩,比如杜絕喚醒丟失

儘管有這些缺點,但在新標準之前我們只能使用這個方案。當然如果放棄跨平台的話,Linux上更安全更簡單的做法其實是eventfd+epoll_wait,它不需要太多的外部狀態,且邏輯簡單易懂,比用條件變量還要預防喚醒丟失強太多了,使用得當的話它的性能也不會比純內存操作的條件變量差多少。

C++20帶來的簡化

C++20為併發編程體驗帶來了不少提升。

第一個就是std::jthread,它會在析構的時候自動join,從而避免了std::thread手動操作的麻煩。不僅如此,每個jthread中還包含一個std::stop_token,這可以簡單實現線程的協作式取消執行。僅僅這一個新特性就已經解決了上一節説的缺點中的前兩條。

第二個有用的新特性是std::stop_sourcestd::stop_token。一個std::stop_source可以產生多個std::stop_token,當一個source被取消的時候,每個從它那派生出來的token都會收到通知,這可以上位替代我們之前使用的canceled_flag

第三個是std::latch,你可以把它當成go的sync.WaitGroup,它可以讓我們在工作線程完成工作之後立即通知主線程,而不用調用join等待線程完全退出。

最後一個則是std::condition_variable_any新增了可以用stop_token中斷等待的方法,無需我們自己手動notify,這樣可以儘量隱藏實現細節。

我們首先改造工作函數:

bool work(std::stop_token stoken, int id)
{
    std::mt19937 rng{std::random_device{}()};
    std::uniform_int_distribution<int> dist(0, 1);

    for (int i = 0; i < 10; ++i) {
        if (stoken.stop_requested()) {
            std::osyncstream{std::cout} << "worker: " << id << " canceled\n";
            return false;
        }
        if (dist(rng) == 0) {
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
        } else {
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
    }

    std::osyncstream{std::cout} << "worker: " << id << " done\n";
    return true;
}

現在我們不依賴全局變量通知線程退出了。並且函數的返回值改成了bool,以表示線程是否是因為被取消而退出的。我們通過stoken.stop_requested()判斷線程是否被取消。

接下來我們利用C++20改造主線程和超時控制:

int main(int argc, const char *argv[])
{
    if (argc != 2) {
        std::cerr << "wrong arg\n";
        return 1;
    }
    auto timeout = std::stoi(argv[1]);

    std::atomic<int> is_canceled{0};
    std::stop_source source;
    std::vector<std::jthread> workers;
    constexpr int worker_num = 3;
    std::latch wait_group{worker_num};
    workers.reserve(worker_num);
    for (int i = 0; i < worker_num; ++i) {
        workers.emplace_back([&](int id){
            if (!work(source.get_token(), id)) {
                is_canceled.store(1, std::memory_order_release);
            }
            wait_group.count_down(); // 三個線程都調用過之後,wait會解除阻塞
        }, i);
    }

    // 現在不需要專門detach了,效果是一樣的
    std::jthread timeout_control{
        [&source](std::stop_token stoken, std::chrono::milliseconds timeout){
            std::mutex timeout_lock;
            std::unique_lock u{timeout_lock};
            std::condition_variable_any timeout_cancel;
            if (!timeout_cancel.wait_for(u, stoken, timeout, [&stoken] { return stoken.stop_requested(); })) {
                std::osyncstream{std::cout} << "cancel all threads\n";
                source.request_stop();
            } else {
                std::osyncstream{std::cout} << "self was canceled by main thread\n";
            }
        }, std::chrono::milliseconds(timeout)
    };

    wait_group.wait(); // 等工作線程完成
    timeout_control.request_stop(); // 工作線程退出後立即取消超時控制線程,即使線程已經執行完成也能安全調用這個方法

    if (is_canceled.load(std::memory_order_acquire) != 1) {
        std::osyncstream{std::cout} << "All works done!\n";
    } else {
        std::osyncstream{std::cout} << "Tasks canceled\n";
    }
}

可以看到我們現在完全不使用全局變量了,而且線程的協作式取消是request_stop()stop_requested()共同完成的,不會暴露太多實現細節。

需要注意的是!timeout_cancel.wait_for(u, stoken, timeout, [&stoken] { return stoken.stop_requested(); })這行代碼。這行代碼和普通條件變量的wait_for一樣,只不過多了一個參數stoken,這個函數會阻塞住當前線程,直到超時、被notify或者stoken被取消。最後一個參數lambda的返回值會作為wait_for的返回值,這個lambda會在阻塞解除後立即調用,因此我們在這個函數裏檢查stoken是否被取消。如果stoken沒有被取消,説明是因為超時導致的返回,因為我們在其他地方調用notify,因此在這時我們需要取消其他工作線程。

新代碼不僅更簡潔,而且沒有上一節那樣的時序問題:

  1. 如果stoken的取消發生在整個wait之前。wait裏的謂詞檢測會發現wait被取消
  2. stoken的取消發生在謂詞檢測之後、unlock+wait之前。condition_variable_any裏有內部鎖,調用notify_all(取消stoken會自動調用)和unlock+wait之前都會先嚐試獲取這個鎖,unlock+wait在獲取鎖之後會檢查stoken是否被取消,所以如果notify先執行,那麼wait會檢測到stoken已取消;如果unlock+wait先執行,那麼notify一定是在wait之後執行的,不存在丟失
  3. 取消發生在wait之後,這是最安全的情況,不會有喚醒丟失。

condition_variable_any內部持有的鎖正好對應我們上一節的TimeoutContext::lock,而stop_token對應TimeoutContext::canceledrequest_stop()則對應了加鎖設置標誌並調用notify。現在所有操作all in one,代碼在保證安全的同時將複雜的細節全部隱藏。

儘管仍有一定複雜性,但利用condition_variable_any已經是比較理想的超時控制解決方案了。

condition_variable_any是如何配合stop_token的

最後還有一個小插曲,condition_variable_any是如何配合stop_token的。

C++20只給condition_variable_any添加了支持stop_token的接口,這是因為普通的條件變量和其他的鎖幾乎都是系統接口的簡單包裝,這些系統接口本身不支持中斷,因此也沒法為這些包裝接口添加支持,而condition_variable_any為了支持所有種類的鎖,幾乎所有標準庫的實現都在自己內部創建了一個普通的條件變量和內部鎖,並基於這個條件變量重新實現了所有接口,因此給支持stop_token留下了餘地。

我們以libcxx的實現為例:

// notify全都要先加內部鎖
inline void condition_variable_any::notify_one() _NOEXCEPT {
  { lock_guard<mutex> __lx(*__mut_); }
  // notify時最好要解鎖,否則wait線程被喚醒後發現加不了鎖又會再次休眠,效率很低
  // 鎖只是為了讓notify的調用和unlock+wait互斥
  __cv_.notify_one();
}

inline void condition_variable_any::notify_all() _NOEXCEPT {
  { lock_guard<mutex> __lx(*__mut_); }
  __cv_.notify_all();
}

// wait_for最終調用的這個方法
template <class _Lock, class _Clock, class _Duration, class _Predicate>
bool condition_variable_any::wait_until(
    _Lock& __user_lock,
    stop_token __stoken,
    const chrono::time_point<_Clock, _Duration>& __abs_time,
    _Predicate __pred) {
    // 先檢查一次是否被取消,因為後面的操作都很重量級會浪費性能
  if (__stoken.stop_requested())
    return __pred();

  shared_ptr<mutex> __mut = __mut_;
  // 這行是關鍵,讓底層的普通條件變量可以從wait中甦醒
  stop_callback __cb(__stoken, [this] { notify_all(); });

  while (true) {
    // 從wait中因為notify醒來會回到這裏,調用最後的lambda來檢查是否應該退出等待
    // 初次進入循環也會檢查,以免進行不必要的等待
    // 檢查其實在臨界區之外,所以其實最好我們得持有傳入的外部鎖才能完全避免靜態條件
    // 只不過恰好我們的謂詞檢查只檢查了stoken是否被取消,和下面臨界區內的一樣,所以這地方的時序沒有影響。我們在主線程中也無需加外部鎖
    if (__pred())
      return true;

    // 先加內部鎖,以免競態條件出現,代碼本身的註釋已經解釋清楚了,無需多言
    // We need to take the internal lock before checking stop_requested,
    // so that the notification cannot come in between the stop_requested
    // check and entering the wait.
    // Note that the stop_callback takes the same internal lock before notifying
    unique_lock<mutex> __internal_lock(*__mut);
    // 檢查stop_token是否被取消
    // notify拿到鎖先運行的話循環就會在這裏退出
    if (__stoken.stop_requested())
      break;

    // 解鎖用户傳入的外部鎖,和普通的條件變量一樣
    __unlock_guard<_Lock> __unlock(__user_lock);
    unique_lock<mutex> __internal_lock2(
        std::move(__internal_lock)); // switch unlock order between __internal_lock and __user_lock

    // 內部鎖的unlock+wait
    if (__cv_.wait_until(__internal_lock2, __abs_time) == cv_status::timeout)
      // 超時之後直接退出循環,否則回到循環開始處
      break;
  } // __internal_lock2.unlock(), __user_lock.lock()
  // 還會調用謂詞,因此返回值總是能反應stoken是否被取消
  return __pred();
}

template <class _Lock>
struct __unlock_guard {
  _Lock& __lock_;

  // 對象創建的時候解鎖,析構的時候加鎖
  _LIBCPP_HIDE_FROM_ABI __unlock_guard(_Lock& __lock) : __lock_(__lock) { __lock_.unlock(); }

  _LIBCPP_HIDE_FROM_ABI ~__unlock_guard() _NOEXCEPT // turns exception to std::terminate
  {
    __lock_.lock();
  }

  __unlock_guard(const __unlock_guard&)            = delete;
  __unlock_guard& operator=(const __unlock_guard&) = delete;
};

核心在於stop_callback __cb(__stoken, [this] { notify_all(); });這一行,std::stop_callback可以把回調函數註冊給stop_token關聯的stop_source,當source被要求停止時,註冊的回調會在調用request_stop()的那個線程執行,回調可以註冊多個,調用順序是不確定的。

所以這行代碼等於在我們要求取消stoken的時候,順手調用notify_all()把底層的條件變量喚醒了,喚醒之後循環會檢查lambda和stoken,然後發現自己被取消從而從wait中退出。這就是wait可以被立即中斷的秘密。

不過如果token已經取消,stop_callback是不生效的,但這也沒關係,因為在進入真正的等待支持,我們至少檢查了兩次stoken是否被取消,且第二次檢查是被鎖保護的,notify的callback不生效或者喚醒丟失了我們也能檢查到stoken已經被取消,不會進入無意義的等待。

上面的代碼也展現了condition_variable_any的問題,相比直接轉發到系統接口的其他標準庫功能,它的性能通常要差一些,對於性能要求較高的場景必須謹慎使用。

總結

為了保證併發安全,上面的邏輯是有些繞的,但總結起來也就幾句話:

  1. 設置取消標誌需要和unlock+wait互斥,這樣超時控制線程才有機會檢測自己是否被取消。
  2. 超時控制線程中的取消標誌檢查(C++20前的謂詞匿名函數、C++20裏的stoken.stop_requested)需要和unlock+wait在同一塊臨界區裏,這樣才能保證進入wait之前取消標誌被正確檢測到或者notify在wait之後才被調用。
  3. notify通常在設置完取消標誌之後執行,但不需要在臨界區裏,即使喚醒錯過了我們也有保底措施。

只要想明白這三點你就掌握了這種模式,以後遇到超時取消或者防止喚醒丟失的場景時不至於兩眼一黑了。

這就是為什麼要用現代C++,新特性有時候真的可以飛躍式提升開發效率,雖説這種程度和Golang相比還稱不上優雅,但心智負擔減輕了很多加班也沒那麼累了。

但話説回來,同樣的需求如果不是追求極致性能/只能用C++,我更樂意用Golang去實現。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.