動態

詳情 返回 返回

Redis 是單線程模型?|得物技術 - 動態 詳情

一、背景

使用過Redis的同學肯定都瞭解過一個説法,説Redis是單線程模型,那麼實際情況是怎樣的呢?

其實,我們常説Redis是單線程模型,是指Redis採用單線程的事件驅動模型,只有並且只會在一個主線程中執行Redis命令操作,這意味着它在處理請求時不使用複雜的上下文切換或鎖機制。儘管只是單線程的架構,但Redis通過非阻塞的I/O操作和高效的事件循環來處理大量的併發連接,性能仍然非常高。

然而在Redis4.0開始也引入了一些後台線程執行異步淘汰、異步刪除過期key、異步執行大key刪除等任務,然後,在Redis6.0中引入了多線程IO特性,將Redis單節點訪問請求從10W提升到20W。

而在去年Valkey社區發佈的Valkey8.0版本,在I/O線程系統上進行了重大升級,特別是異步I/O線程的引入,使主線程和I/O線程能夠並行工作,可實現最大化服務吞吐量並減少瓶頸,使得Valkey單節點訪問請求可以提升到100W。

那麼在Redis6.0和Valkey8.0中多線程IO是怎麼回事呢?是否改變了Redis原有單線程模型?

  • 2024年,Redis商業支持公司Redis Labs**宣佈Redis核心代碼的許可證從BSD變更為RSALv2,明確禁止雲廠商提供Redis託管服務,這一決定直接導致社區分裂。
  • 為維護開源自由,Linux基金會聯合多家科技公司(包括AWS、Google、Cloud、Oracle等)宣佈支持Valkey,作為Redis的替代分支。
  • Valkey8.0系Valkey社區發佈的首個主要大版本。
  • 最新消息,在Redis項目創始人antirez今年加入Redis商業公司5個月後,Redis宣傳從Redis8開始,Redis項目重新開源。

本篇文章主要介紹Redis6.0多線程IO特性。

二、Redis6.0 多線程 IO 概述

Redis6.0引入多線程IO,但多線程部分只是用來處理網絡數據的讀寫和協議解析,執行命令仍然是單線程。默認是不開啓的,需要進程啓動前開啓配置,並且在運行期間無法通過 config set 命令動態修改。

參數與配置

多線程IO涉及下面兩個配置參數:

# io-threads 4  IO 線程數量# io-threads-do-reads no  讀數據及數據解析是否也用 IO 線程
  •  io-threads 表示IO線程數量, io-threads 設置為1時(代碼中默認值),表示只使用主線程,不開啓多線程IO。因此,若要配置開啓多線程IO,需要設置 io-threads 大於1,但不可以超過最大值128。
  • 但在默認情況下,Redis只將多線程IO用於向客户端寫數據,因為作者認為通常使用多線程執行讀數據的操作幫助不是很大。如果需要使用多線程用於讀數據和解析數據,則需要將參數 io-threads-do-reads 設置為 yes 。
  • 此兩項配置參數在Redis運行期間無法通過 config set 命令修改,並且開啓SSL時,不支持多線程IO特性。
  • 若機器CPU將至少超過4核時,則建議開啓,並且至少保留一個備用CPU核,使用超過8個線程可能並不會有多少幫助。

執行流程概述

Redis6.0引入多線程IO後,讀寫數據執行流程如下所示:

image.png

流程簡述

  1. 主線程負責接收建立連接請求,獲取socket放入全局等待讀處理隊列。
  2. 主線程處理完讀事件之後,通過RR(Round Robin)將這些連接分配給這些IO線程,也會分配給主線程自己。
  3. 主線程先讀取分配給自己的客户端數據,然後阻塞等待其他IO線程讀取socket完畢。
  4. IO線程將請求數據讀取並解析完成(這裏只是讀數據和解析、並不執行)。
  5. 主線程通過單線程的方式執行請求命令。
  6. 主線程通過RR(Round Robin)將回寫客户端事件分配給這些IO線程,也會分配給主線程自己。
  7. 主線程同樣執行部分寫數據到客户端,然後阻塞等待IO線程將數據回寫socket完畢。

設計特點

  1. IO線程要麼同時在讀socket,要麼同時在寫,不會同時讀和寫。
  2. IO線程只負責讀寫socket解析命令,不負責命令執行。
  3. 主線程也會參與數據的讀寫。

三、源碼分析

多線程IO相關源代碼都在源文件networking.c中最下面。

初始化

主線程在main函數中調用InitServerLast函數,InitServerLast函數中調用initThreadedIO函數,在initThreadedIO函數中根據配置文件中的線程數量,創建對應數量的IO工作線程數量。

/* Initialize the data structures needed for threaded I/O. */void initThreadedIO(void) {    io_threads_active = 0; /* We start with threads not active. */        /* Don't spawn any thread if the user selected a single thread:     * we'll handle I/O directly from the main thread. */    if (server.io_threads_num == 1) return;        if (server.io_threads_num > IO_THREADS_MAX_NUM) {        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "                             "The maximum number is %d.", IO_THREADS_MAX_NUM);        exit(1);    }       /* Spawn and initialize the I/O threads. */    for (int i = 0; i < server.io_threads_num; i++) {        /* Things we do for all the threads including the main thread. */        io_threads_list[i] = listCreate();        if (i == 0) continue; /* Thread 0 is the main thread. */               /* Things we do only for the additional threads. */        pthread_t tid;        pthread_mutex_init(&io_threads_mutex[i],NULL);        io_threads_pending[i] = 0;        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");            exit(1);        }        io_threads[i] = tid;    }}
  • 如果 io_threads_num 的數量為1,則只運行主線程, io_threads_num 的IO線程數量不允許超過 128。
  • 序號為0的線程是主線程,因此實際的工作線程數目是io-threads - 1。

初始化流程

  • 為包括主線程在內的每個線程分配list列表,用於後續保存待處理的客户端。
  • 為主線程以外的其他IO線程初始化互斥對象mutex,但是立即調用pthread_mutex_lock佔有互斥量**,將io_threads_pending[i]設置為0,接着創建對應的IO工作線程。
  • 佔用互斥量是為了創建IO工作線程後,可暫時等待後續啓動IO線程的工作,因為IOThreadMain函數在io_threads_pending[id] == 0時也調用了獲取mutex,所以此時無法繼續向下運行,等待啓動。
  • 在startThreadedIO函數中會釋放mutex來啓動IO線程工作。何時調用startThreadedIO打開多線程IO,具體見下文的「多線程IO動態暫停與開啓」。

IO 線程主函數

IO線程主函數代碼如下所示:

void *IOThreadMain(void *myid) {    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is     * used by the thread to just manipulate a single sub-array of clients. */    long id = (unsigned long)myid;    char thdname[16];       snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);    redis_set_thread_title(thdname);    redisSetCpuAffinity(server.server_cpulist);       while(1) {        /* Wait for start */        for (int j = 0; j < 1000000; j++) {            if (io_threads_pending[id] != 0) break;        }               /* Give the main thread a chance to stop this thread. */        if (io_threads_pending[id] == 0) {            pthread_mutex_lock(&io_threads_mutex[id]);            pthread_mutex_unlock(&io_threads_mutex[id]);            continue;        }               serverAssert(io_threads_pending[id] != 0);                if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));                /* Process: note that the main thread will never touch our list         * before we drop the pending count to 0. */        listIter li;        listNode *ln;        listRewind(io_threads_list[id],&li);        while((ln = listNext(&li))) {            client *c = listNodeValue(ln);            if (io_threads_op == IO_THREADS_OP_WRITE) {                writeToClient(c,0);            } else if (io_threads_op == IO_THREADS_OP_READ) {                readQueryFromClient(c->conn);            } else {                serverPanic("io_threads_op value is unknown");            }        }        listEmpty(io_threads_list[id]);        io_threads_pending[id] = 0;               if (tio_debug) printf("[%ld] Done\n", id);    }}

從IO線程主函數邏輯可以看到:

  • 如果IO線程等待處理任務數量為0,則IO線程一直在空循環,因此後面主線程給IO線程分發任務後,需要設置IO線程待處理任務數 io_threads_pending[id] ,才會觸發IO線程工作。
  • 如果IO線程等待處理任務數量為0,並且未獲取到mutex鎖,則會等待獲取鎖,暫停運行,由於主線程在創建IO線程之前先獲取了鎖,因此IO線程剛啓動時是暫停運行狀態,需要等待主線程釋放鎖,啓動IO線程。
  • IO線程待處理任務數為0時,獲取到鎖並再次釋放鎖,是為了讓主線程可以暫停IO線程。
  • 只有io_threads_pending[id]不為0時,則繼續向下執行操作,根據io_threads_op決定是讀客户端還是寫客户端,從這裏也可以看出IO線程要麼同時讀,要麼同時寫。

讀數據流程

主線程將待讀數據客户端加入隊列

當客户端連接有讀事件時,會觸發調用readQueryFromClient函數,在該函數中會調用postponeClientRead。

void readQueryFromClient(connection *conn) {    client *c = connGetPrivateData(conn);    int nread, readlen;    size_t qblen;        /* Check if we want to read from the client later when exiting from     * the event loop. This is the case if threaded I/O is enabled. */    if (postponeClientRead(c)) return;    ......以下省略}
/* Return 1 if we want to handle the client read later using threaded I/O. * This is called by the readable handler of the event loop. * As a side effect of calling this function the client is put in the * pending read clients and flagged as such. */int postponeClientRead(client *c) {    if (io_threads_active &&        server.io_threads_do_reads &&        !ProcessingEventsWhileBlocked &&        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))    {        c->flags |= CLIENT_PENDING_READ;        listAddNodeHead(server.clients_pending_read,c);        return 1;    } else {        return 0;    }}

如果開啓多線程,並且開啓多線程讀(io_threads_do_reads 為 yes),則將客户端標記為CLIENT_PENDING_READ,並且加入clients_pending_read列表。

然後readQueryFromClient函數中就立即返回,主線程沒有執行從客户端連接中讀取的數據相關邏輯,讀取了客户端數據行為等待後續各個IO線程執行。

主線程分發並阻塞等待

主線程在beforeSleep函數中會調用handleClientsWithPendingReadsUsingThreads函數。

/* When threaded I/O is also enabled for the reading + parsing side, the * readable handler will just put normal clients into a queue of clients to * process (instead of serving them synchronously). This function runs * the queue using the I/O threads, and process them in order to accumulate * the reads in the buffers, and also parse the first command available * rendering it in the client structures. */int handleClientsWithPendingReadsUsingThreads(void) {    if (!io_threads_active || !server.io_threads_do_reads) return 0;    int processed = listLength(server.clients_pending_read);    if (processed == 0) return 0;       if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);       /* Distribute the clients across N different lists. */    listIter li;    listNode *ln;    listRewind(server.clients_pending_read,&li);    int item_id = 0;    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        int target_id = item_id % server.io_threads_num;        listAddNodeTail(io_threads_list[target_id],c);        item_id++;    }        /* Give the start condition to the waiting threads, by setting the     * start condition atomic var. */    io_threads_op = IO_THREADS_OP_READ;    for (int j = 1; j < server.io_threads_num; j++) {        int count = listLength(io_threads_list[j]);        io_threads_pending[j] = count;    }       /* Also use the main thread to process a slice of clients. */    listRewind(io_threads_list[0],&li);    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        readQueryFromClient(c->conn);    }    listEmpty(io_threads_list[0]);        /* Wait for all the other threads to end their work. */    while(1) {        unsigned long pending = 0;        for (int j = 1; j < server.io_threads_num; j++)            pending += io_threads_pending[j];        if (pending == 0) break;    }    if (tio_debug) printf("I/O READ All threads finshed\n");       /* Run the list of clients again to process the new buffers. */    while(listLength(server.clients_pending_read)) {        ln = listFirst(server.clients_pending_read);        client *c = listNodeValue(ln);        c->flags &= ~CLIENT_PENDING_READ;        listDelNode(server.clients_pending_read,ln);                if (c->flags & CLIENT_PENDING_COMMAND) {            c->flags &= ~CLIENT_PENDING_COMMAND;            if (processCommandAndResetClient(c) == C_ERR) {                /* If the client is no longer valid, we avoid                 * processing the client later. So we just go                 * to the next. */                continue;            }        }        processInputBuffer(c);    }    return processed;}
  • 先檢查是否開啓多線程,以及是否開啓多線程讀數據(io_threads_do_reads),未開啓直接返回。
  • 檢查隊列clients_pending_read長度,為0直接返回,説明沒有待讀事件。
  • 遍歷clients_pending_read隊列,通過RR算法,將隊列中的客户端循環分配給各個IO線程,包括主線程本身。
  • 設置io_threads_op = IO_THREADS_OP_READ,並且將io_threads_pending數組中各個位置值設置為對應各個IO線程分配到的客户端數量,如上面介紹,目的是為了使IO線程工作。
  • 主線程開始讀取客户端數據,因為主線程也分配了任務。
  • 主線程阻塞等待,直到所有的IO線程都完成讀數據工作。
  • 主線程執行命令。

IO 線程讀數據

在IO線程主函數中,如果 io_threads_op == IO_THREADS_OP_READ ,則調用readQueryFromClient從網絡中讀取數據。

IO 線程讀取數據後,不會執行命令。

在readQueryFromClient函數中,最後會執行processInputBuffer函數,在processInputBuffe函數中,如IO線程檢查到客户端設置了CLIENT_PENDING_READ標誌,則不執行命令,直接返回。

            ......省略/* If we are in the context of an I/O thread, we can't really             * execute the command here. All we can do is to flag the client             * as one that needs to process the command. */            if (c->flags & CLIENT_PENDING_READ) {                c->flags |= CLIENT_PENDING_COMMAND;                break;            }            ...... 省略

寫數據流程

命令處理完成後,依次調用:

addReply-->prepareClientToWrite-->clientInstallWriteHandler,將待寫客户端加入隊列clients_pending_write。

void clientInstallWriteHandler(client *c) {    /* Schedule the client to write the output buffers to the socket only     * if not already done and, for slaves, if the slave can actually receive     * writes at this stage. */    if (!(c->flags & CLIENT_PENDING_WRITE) &&        (c->replstate == REPL_STATE_NONE ||         (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))    {        /* Here instead of installing the write handler, we just flag the         * client and put it into a list of clients that have something         * to write to the socket. This way before re-entering the event         * loop, we can try to directly write to the client sockets avoiding         * a system call. We'll only really install the write handler if         * we'll not be able to write the whole reply at once. */        c->flags |= CLIENT_PENDING_WRITE;        listAddNodeHead(server.clients_pending_write,c);    }}

在beforeSleep函數中調用handleClientsWithPendingWritesUsingThreads。

int handleClientsWithPendingWritesUsingThreads(void) {    int processed = listLength(server.clients_pending_write);    if (processed == 0) return 0; /* Return ASAP if there are no clients. */        /* If I/O threads are disabled or we have few clients to serve, don't     * use I/O threads, but thejboring synchronous code. */    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {        return handleClientsWithPendingWrites();    }        /* Start threads if needed. */    if (!io_threads_active) startThreadedIO();       if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);        /* Distribute the clients across N different lists. */    listIter li;    listNode *ln;    listRewind(server.clients_pending_write,&li);    int item_id = 0;    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        c->flags &= ~CLIENT_PENDING_WRITE;        int target_id = item_id % server.io_threads_num;        listAddNodeTail(io_threads_list[target_id],c);        item_id++;    }       /* Give the start condition to the waiting threads, by setting the     * start condition atomic var. */    io_threads_op = IO_THREADS_OP_WRITE;    for (int j = 1; j < server.io_threads_num; j++) {        int count = listLength(io_threads_list[j]);        io_threads_pending[j] = count;    }        /* Also use the main thread to process a slice of clients. */    listRewind(io_threads_list[0],&li);    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        writeToClient(c,0);    }    listEmpty(io_threads_list[0]);       /* Wait for all the other threads to end their work. */    while(1) {        unsigned long pending = 0;        for (int j = 1; j < server.io_threads_num; j++)            pending += io_threads_pending[j];        if (pending == 0) break;    }    if (tio_debug) printf("I/O WRITE All threads finshed\n");        /* Run the list of clients again to install the write handler where     * needed. */    listRewind(server.clients_pending_write,&li);    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);               /* Install the write handler if there are pending writes in some         * of the clients. */        if (clientHasPendingReplies(c) &&                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)        {            freeClientAsync(c);        }    }    listEmpty(server.clients_pending_write);    return processed;}
  1. 判斷clients_pending_write隊列的長度,如果為0則直接返回。
  2. 判斷是否開啓了多線程,若只有很少的客户端需要寫,則不使用多線程IO,直接在主線程完成寫操作。
  3. 如果使用多線程IO來完成寫數據,則需要判斷是否先開啓多線程IO(因為會動態開啓與暫停)。
  4. 遍歷clients_pending_write隊列,通過RR算法,循環將所有客户端分配給各個IO線程,包括主線程自身。
  5. 設置io_threads_op = IO_THREADS_OP_WRITE,並且將io_threads_pending數組中各個位置值設置為對應的各個IO線程分配到的客户端數量,目的是為了使IO線程工作。
  6. 主線程開始寫客户端數據,因為主線程也分配了任務,寫完清空任務隊列。
  7. 阻塞等待,直到所有IO線程完成寫數據工作。
  8. 再次遍歷所有客户端,如果有需要,為客户端在事件循環上安裝寫句柄函數,等待事件回調。

多線程 IO 動態暫停與開啓

從上面的寫數據的流程中可以看到,在Redis運行過程中多線程IO是會動態暫停與開啓的。

在上面的寫數據流程中,先調用stopThreadedIOIfNeeded函數判斷是否需要暫停多線程IO,當等待寫的客户端數量低於線程數的2倍時,會暫停多線程IO, 否則就會打開多線程。

int stopThreadedIOIfNeeded(void) {    int pending = listLength(server.clients_pending_write);        /* Return ASAP if IO threads are disabled (single threaded mode). */    if (server.io_threads_num == 1) return 1;       if (pending < (server.io_threads_num*2)) {        if (io_threads_active) stopThreadedIO();        return 1;    } else {        return 0;    }}

在寫數據流程handleClientsWithPendingWritesUsingThreads函數中,stopThreadedIOIfNeeded返回0的話,就會執行下面的startThreadedIO函數,開啓多線程IO。

void startThreadedIO(void) {    serverAssert(server.io_threads_active == 0);    for (int j = 1; j < server.io_threads_num; j++)        pthread_mutex_unlock(&io_threads_mutex[j]);    server.io_threads_active = 1;}
void stopThreadedIO(void) {    /* We may have still clients with pending reads when this function     * is called: handle them before stopping the threads. */    handleClientsWithPendingReadsUsingThreads();    serverAssert(server.io_threads_active == 1);    for (int j = 1; j < server.io_threads_num; j++)        pthread_mutex_lock(&io_threads_mutex[j]);    server.io_threads_active = 0;}

從上面的代碼中可以看出:

  • 開啓多線程IO是通過釋放mutex鎖來讓IO線程開始執行讀數據或者寫數據動作。
  • 暫停多線程IO則是通過加鎖來讓IO線程暫時不執行讀數據或者寫數據動作,此處加鎖後,IO線程主函數由於無法獲取到鎖,因此會暫時阻塞。

四、性能對比

測試環境

兩台物理機配置:CentOS Linux release 7.3.1611(Core) ,12核CPU1.5GHz,256G內存(free 128G)。

Redis版本

使用Redis6.0.6,多線程IO模式使用線程數量為4,即 io-threads 4 ,參數 io-threads-do-reads 分別設置為 no 和 yes ,進行對比測試。

壓測命令

redis-benchmark -h 172.xx.xx.xx -t set,get -n 1000000 -r 100000000 --threads ${threadsize} -d ${datasize} -c ${clientsize}
單線程 threadsize 為 1,多線程 threadsize 為 4datasize為value 大小,分別設置為 128/512/1024clientsize 為客户端數量,分別設置為 256/2000如:./redis-benchmark -h 172.xx.xx.xx -t set,get -n 1000000 -r 100000000 --threads 4 -d 1024 -c 256

統計結果

當 io-threads-do-reads 為 no 時,統計圖表如下所示(c 2000表示客户端數量為2000)。

image.png

當 io-threads-do-reads 為 yes 時,統計圖表如下所示(c 256表示客户端數量為256)。

image.png

結論

使用redis-benchmark做Redis6單線程和多線程簡單SET/GET命令性能測試:

  1. 從上面可以看到GET/SET命令在設置4個IO線程時,QPS相比於大部分情況下的單線程,性能幾乎是翻倍了。
  2. 連接數越多,多線程優勢越明顯。
  3. value值越小,多線程優勢越明顯。
  4. 使用多線程讀命令比寫命令優勢更加明顯,當value越大,寫命令越發沒有明顯的優勢。
  5. 參數 io-threads-do-reads 為yes,性能有微弱的優勢,不是很明顯。
  6. 總體來説,以上結果基本符合預期,結果僅作參考。

五、6.0 多線程 IO 不足

儘管引入多線程IO大幅提升了Redis性能,但是Redis6.0的多線程IO仍然存在一些不足:

  • CPU核心利用率不足:當前主線程仍負責大部分的IO相關任務,並且當主線程處理客户端的命令時,IO線程會空閒相當長的時間,同時值得注意的是,主線程在執行IO相關任務期間,性能受到最慢IO線程速度的限制。
  • IO線程執行的任務有限:目前,由於主線程同步等待IO線程,線程僅執行讀取解析和寫入操作。如果線程可以異步工作,我們可以將更多工作卸載到IO線程上,從而減少主線程的負載。
  • 不支持帶有TLS的IO線程。

最新的Valkey8.0版本中,通過引入異步IO線程,將更多的工作轉移到IO線程執行,同時通過批量預讀取內存數據減少內存訪問延遲,大幅提高Valkey單節點訪問QPS,單個實例每秒可處理100萬個請求。我們後續再詳細介紹Valkey8.0異步IO特性。

六、總結

Redis6.0引入多線程IO,但多線程部分只是用來處理網絡數據的讀寫和協議解析,執行命令仍然是單線程。通過開啓多線程IO,並設置合適的CPU數量,可以提升訪問請求一倍以上。

Redis6.0多線程IO仍然存在一些不足,沒有充分利用CPU核心,在最新的Valkey8.0版本中,引入異步IO將進一步大幅提升Valkey性能。

往期回顧

1.得物社區活動:組件化的演進與實踐

2.從CPU冒煙到絲滑體驗:算法SRE性能優化實戰全揭秘|得物技術

3.CSS闖關指南:從手寫地獄到“類”積木之旅|得物技術

4.以細節詮釋專業,用成長定義價值——對話@孟同學 |得物技術

5.大語言模型的訓練後量化算法綜述 | 得物技術

文 / 竹徑

關注得物技術,每週更新技術乾貨

要是覺得文章對你有幫助的話,歡迎評論轉發點贊~

未經得物技術許可嚴禁轉載,否則依法追究法律責任。

user avatar mannayang 頭像 u_15702012 頭像 imba97 頭像 ldh-blog 頭像 shixiaoyuanya 頭像 yizhidanshendetielian 頭像 xialeistudio 頭像 changqingdezi 頭像 sishuiliunian_58f891c129ab1 頭像 iwan_68b8da84d3d8b 頭像 innsane 頭像 haiyong 頭像
點贊 21 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.