动态

详情 返回 返回

C++高併發異步定時器的實現 - 动态 详情

各位開發者好,久違的Workflow架構系列追更了~

在C++高併發場景,定時功能的實現有三大難題:高效、精準、原子性

除了定時任務隨時可能到期、而進程隨時可能要退出之外,最近Workflow甚至為定時任務增加了取消功能,導致任務可能被框架調起之前被用户取消,或者創建之後不想執行直接刪除等情況,而這些情況大部分來説都是由不同線程執行的,因此其中的併發處理可謂教科書級別!

那麼就和大家一起看看Workflow在定時器的設計上做了哪些考慮,深扒細節,體驗併發架構之美~

https://github.com/sogou/workflow

1. 高效的數據結構與timerfd

舉個例子:實現一個server,收到請求之後,隔1s再回復給用户。

聰明的讀者肯定知道,在server的執行函數中用sleep(1)是不行的,sleep()這個系統調用是會阻塞當前線程的,而異步編程裏阻塞線程是高效的大忌!

所以我們可以使用timerfd,顧名思義就是用特定的fd來通知定時事件,把定時事件響應和網絡事件響應都一起處理,用epoll管理就是一把梭。

現在離高效還差一點。回到例子,我們不可能每次收到一個請求都創建一個timerfd,因為高併發場景下一個server通常要抗上百萬的QPS。

目前Workflow的超時算法做法是:一個poller有一個timerfd,內部利用了鏈表+紅黑樹的數據結構,時間複雜度在O(1)和O(logn)之間,其中n為poller線程的fd數量。

Workflow定時器管理:高效的數據結構與poller_add_timer()

2. 精準的響應

這樣的數據結構設計有什麼好處呢?

  • 寫得快(放入一個新節點)
  • 讀得快(響應已超時的節點)
  • 精度高(超時時間無精度損失)

Workflow源碼在kernel和factory目錄中都有對應的實現,kernel層是主要負責timerfd的地方,當前factory層還比較薄。我們重點看看上述數據結構。

寫:由用户發起異步任務,將這個任務加到上述的鏈表+紅黑樹的數據結構中,如果這個超時是當前最小的超時時間,還會更新一下timerfd。

讀:框架的網絡線程每次會從epoll拿出事件,如果響應到超時事件,會把數據結構中已經超時的全部節點都拿出來,並調用任務的handle。

以下是從epoll處理超時事件的關鍵函數:

/*** poller響應timerfd的到時事件,並處理所有到時的定時任務 ***/
static void __poller_handle_timeout(const struct __poller_node *time_node, poller_t *poller)                           
{                                                                               
    ...

    // 鎖裏,把list與rbtree上時間已到的節點都從數據結構裏刪除,臨時放到一個局部變量上                                        
    list_for_each_safe(pos, tmp, &poller->timeo_list)                           
    {
       ...
       node->removed = 1; // 標誌位:【removed】
       ...
    }

    if (poller->tree_first)                                                     
    { ... }  

    // 鎖外,設置state和error,並回調Task的handle()函數
    while (!list_empty(&timeo_list))                                            
    {                                                                           
        node = list_entry(timeo_list.next, struct __poller_node, list);         
        list_del(&node->list);                                                  

        node->error = ETIMEDOUT;                                                
        node->state = PR_ST_ERROR;                                              
        free(node->res);                                                        
        poller->cb((struct poller_result *)node, poller->ctx);                  
    }                                                                           
} 

由於timerfd使用的超時時間是所有節點中最早超時的時間,而所有節點都在rbtree和list上按序排好,我們從前到後找的都是已超時的節點。因此利用了timerfd的精準性可以非常準確地叫醒當前已經超時的全部節點,無精度損失。

由於實際使用中,用户使用的超時時長總是傾向於固定的,比如上述例子都是1s,因此超時的絕對時間一般來説都是遞增的,使用這個數據結構寫入會非常快,設計特別適合定時器的實際需求。

3. 原子性

上述有提到,用户的回調需要調且只調一次,Workflow可以保證在進程退出時立刻全部立刻到時結束。進程退出又是另一個話題,感興趣的讀者先自行去看代碼,回頭再細説~

4.允許取消(新功能)

看到這裏,已經可以感受到優雅的數據結構如何實現高效精準的定時器了~

但是當我們打開poller.h,感受一下它的接口,總覺得差了點什麼:

孤獨的api:poller_add_timer( )

是的!一個timer可以add,但是卻不可以delete!

Workflow中許多結構的實現都是非常完備和對稱的,因此取消一個定時任務這件事在Workflow開源的第一天就一直想要實現。

但是多加一個取消cancel會有很大的問題:如果一個timer已經timeout,用户再去cancel的生命週期是沒有辦法保證的,它可能已經析構了!

最近終於找到了一個非常好的解決辦法:使用命名timer,交給全局map管,cancel的時候帶名字去操作,就可以解決生命週期問題

我們增加了帶名字的Timer :__WFNamedTimerTask,通過全局的map可以找到它,從而進行刪除。刪除實際上就是從poller中刪除一個timer。

所以從底向上,為孤獨的poller_add_time增加一個小夥伴:poller_del_timer。

/*** 取消一個定時任務時,從poller刪除它 ***/
int poller_del_timer(void *timer, poller_t *poller)
{
    ...
    // 鎖內:如果這個標誌位還是0,表示stop還沒把它拿走,這裏就可以去刪除這個timer
    if (!node->removed)
    {
        node->removed = 1; // 可以讓cancel和stop互斥,保證只調一次

        if (node->in_rbtree) // 從定時器的數據結構中刪掉
            __poller_tree_erase(node, poller);
        else
            list_del(&node->list);
        node->error = 0;                                                        
        node->state = PR_ST_DELETED;                                                
        stopped = poller->stopped;                                                  
        if (!stopped) // 標誌位【stopped】,如果當時沒有進程退出,把timer事件交出去處理
            write(poller->pipe_wr, &node, sizeof (void *));
    }
    ...

    // 鎖外:標誌位【stopped】如果進程要退出了,立刻處理timer事件的handle
    if (stopped)
        poller->cb((struct poller_result *)node, poller->ctx);

    return -!node;
}

剛才講述過的timeout(時間到)、stop(進程退出)、cancel(用户取消)三者可能由三個線程分別發起,於是我們看到的併發狀態,簡單來説是這樣的:

定時器到期(timeout)、進程退出(stop)、任務取消(cancel)三者隨時可能發生!

5. 精妙的併發狀態分析

cancel和另外兩個行為有着本質上的不同:

  • timeout和stop的觸發順序是先poller層、再到Task層,最後到用户的callbback;
  • cancel的觸發先Task層,Task層的命名map中先刪掉它觸發、再到poller層,最後到用户的callback;

因此先討論第一類情況。

我們以timeout為例:

  1. poller層面的__poller_handle_timeout()會把上述的removed標誌位用上,與poller_del_timer()互斥:誰第一個搶到removed標誌位並置為1,就代表了timer結束於哪個狀態。如果是timeout,那麼用户拿到的state為SS_STATE_COMPLETE;
  2. 互斥鎖poller->mutex保證從poller的數據結構中刪掉這個節點並調Task層回調,從而可以保證stop的時候無需重複處理它;
  3. timeout調用的Task層回調,實際上是__WFNamedTimerTask::handle():
    (1) 它會置一個標誌位node_.task,表示此任務已經處理過;
    (2) 並且把這個節點從全局map中刪除:這樣就保證了用户自頂向下cancel就不會刪到它了;
/*** 處理定時器到期,由poller調用 ***/
void __WFNamedTimerTask::handle(int state, int error)    
{    
    if (node_.task) // 由於不想先加鎖再處理,所以先判斷任務沒有被cancel處理過
    {
        std::lock_guard<std::mutex> lock(__timer_map.mutex_);    
        if (node_.task) // 鎖內再檢查一下,入門技能
        {
            timers_->del(&node_, &__timer_map.root_); // 從map中刪除
            node_.task = NULL; // 標誌位:表示任務生命週期結束了
        }    
    }

    mutex_.lock();   // 這裏是為了等待dispatch流程保證exchange已經結束,
    mutex_.unlock(); // 否則資源被釋放就不能訪問成員變量了。也是非常常用的技巧!
    this->__WFTimerTask::handle(state, error); // 裏邊會釋放資源,併發起任務流下一個任務
}

第二類情況,如果用户調用cancel先發生呢?

  1. 自頂向下先由factory層找到這個節點,再調到poller的poller_del_timer()。期間需要記錄一些狀態,因為我們需要通常有多個poller。然後內部會先置上removed,並從定時器數據結構中刪掉,以保證和timeout流程互斥;
  2. 在Task層面,需要由cancel流程負責調用用户的callback(),同時回收timer的資源;

void __NamedTimerMap::cancel(const std::string& name, size_t max)               
{
    ...
    // 鎖內,拿出命名為name的timer隊列
    timers = __get_object_list<TimerList>(name, &root_, false);                 
    if (timers)
    {
        do
        {
            if (max == 0) // 從map中刪除最多max個
                return;

            // 從該名字對應的隊列中刪除該timer
            node = list_entry(timers->head.next, struct __timer_node, list);    
            list_del(&node->list);

            // 標識位:exchange。如果是第二次exchange,會調到task自身的cancel()從poller中刪掉它
            if (node->task->flag_.exchange(true))
                node->task->cancel();

            // 標誌位:表示生命週期正確結束,資源已經被回收,否則timeout流程或析構函數需要做回收
            node->task = NULL;                                                  
            max--;                                                              
        } while (!timers->empty());                                             

        rb_erase(&timers->rb, &root_);
        delete timers;
    }
}

6. 異步任務的發起時機是個謎

上面那張圖,我們假設的是任務先創建好,再被髮起。那如果任務還沒有被髮起,甚至我們不想發起呢?

我們假設的是任務先創建好,再被髮起。那如果任務還沒有被髮起,甚至我們不想發起呢?

1、任務可以在被髮起前取消

實際上我們把一個timer放到一個任務流圖中,我們並不能確定它被髮起的準確的時機,但我們依然允許先cancel它。

這時候我們就需要上述的標誌位exchange來做互斥了。exchange是個std::atomic<tool>,初始化為false,用户已經手動cancel過之後,任務可能在任務流中才會被髮起dispatch
因此即使先取消,沒關係,但必須保證dispatch過才能釋放這個timer的資源:

/*** 由任務流發起、或者用户手動start起來 ***/
void __WFNamedTimerTask::dispatch()
{    
    int ret;    

    mutex_.lock();    
    ret = this->scheduler->sleep(this); // 先把定時任務交給poller
    if (ret >= 0 && flag_.exchange(true)) // exchange一下。如果是第二個調用exchange的人,會拿到true
        this->cancel(); // 説明發起之前已經有人cancel過了,立刻從poller中刪除即可

    mutex_.unlock();    
    if (ret < 0)    
        this->handle(SS_STATE_ERROR, errno);
}

這裏有兩個要注意的點:

  • 通過sleep()交給poller之後,用户的callback可以在網絡線程中處理,而不是當前線程立刻處理,這樣可以遞歸爆棧問題;
  • 如果過期時間非常短,sleep()之後是隨時有可能到期並回到__WFNamedTimerTask::handle()的!因此前面對mutex_.lock()mutex_.unlock()等的就是這裏的flag_.exchange(true)執行結束;

2. 任務甚至可以不發起

而如果創建完之後不想發起,Workflow統一的接口是需要調用一下task->dismiss(),以釋放task資源,調用析構函數等。

/*** 命名定時任務的析構函數,異步任務需要注意處理各種情況的資源回收 ***/
virtual ~__WFNamedTimerTask()
{
    if (node_.task) // 標誌位:如果沒有置空,説明任務沒有發起過。需要從全局map中刪掉這個timer
    {
         std::lock_guard<std::mutex> lock(__timer_map.mutex_);
         if (node_.task) // 鎖內再檢查一次
             timers_->del(&node_, &__timer_map.root_);
    }
}

7. 總結

最後貼一段代碼看看,一個高併發1s定時返回的server代碼,用Workflow實現可以多簡單:

int main()
{
    WFHttpServer server([](WFHttpTask * task)
    {
        task->get_resp()->append_output_body("<html>will response after 1s</html>");
        auto timer = WFTaskFactory::create_timer_task(1, 0, nullptr);
        series_of(task)->push_back(timer);
    }); 

    if (server.start(1412) == 0)
    {   
        getchar();
        server.stop();
    }
    return 0;
}

本篇介紹瞭如何優雅地處理異步任務的:創建、發起、回調、取消、進程退出多種併發行為,其中包含了許多常用的技巧!不記得的小夥伴自行翻回去再看一遍._.

當然這在併發的世界中還只是冰山一角,因為很有可能寫下某一句話的時機不對,任務一結束,程序就GG了。

歡迎小夥伴提出寶貴的建議^0^/

最後!

先前諮詢過如何參與到Workflow開發中的小夥伴,你們的任務來了!!!

可取消的定時器功能,目前還不支持Windows版本,歡迎熟悉iocp的小夥伴參與開發~

不好意思上來就是個大boss。以及下次一定為新手村帶來更多任務協助同學們打怪升級~感恩!

user avatar chengdumeiyouni 头像 hlinleanring 头像 jkkang 头像 artificer 头像 240cgxo4 头像 aixiaodewulongcha_ehoerm 头像 nocobase 头像 feixianghelanren 头像 iceblue 头像 duwenlong 头像 pcworld 头像
点赞 11 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.