Redis 是單線程模型?
一、背景
二、Redis6.0多線程IO概述
1. 參數與配置
2. 執行流程概述
三、源碼分析
1. 初始化
2. 讀數據流程
3. 寫數據流程
4. 多線程IO動態暫停與開啟
四、性能對比
1. 測試環境
2. Redis版本
3. 壓測命令
4. 統計結果
5. 結論
五、6.0多線程IO不足
六、總結
一、背景
使用過Redis的同學肯定都了解過一個說法,說Redis是單線程模型,那么實際情況是怎樣的呢?
其實,我們常說Redis是單線程模型,是指Redis采用單線程的事件驅動模型,只有并且只會在一個主線程中執行Redis命令操作,這意味著它在處理請求時不使用復雜的上下文切換或鎖機制。盡管只是單線程的架構,但Redis通過非阻塞的I/O操作和高效的事件循環來處理大量的并發連接,性能仍然非常高。
然而在Redis4.0開始也引入了一些后臺線程執行異步淘汰、異步刪除過期key、異步執行大key刪除等任務,然后,在Redis6.0中引入了多線程IO特性,將Redis單節點訪問請求從10W提升到20W。
而在去年Valkey社區發布的Valkey8.0版本,在I/O線程系統上進行了重大升級,特別是異步I/O線程的引入,使主線程和I/O線程能夠并行工作,可實現最大化服務吞吐量并減少瓶頸,使得Valkey單節點訪問請求可以提升到100W。
那么在Redis6.0和Valkey8.0中多線程IO是怎么回事呢?是否改變了Redis原有單線程模型?
- 2024年,Redis商業支持公司Redis Labs宣布Redis核心代碼的許可證從BSD變更為RSALv2,明確禁止云廠商提供Redis托管服務,這一決定直接導致社區分裂。
- 為維護開源自由,Linux基金會聯合多家科技公司(包括AWS、Google、Cloud、Oracle等)宣布支持Valkey,作為Redis的替代分支。
- Valkey8.0系Valkey社區發布的首個主要大版本。
- 最新消息,在Redis項目創始人antirez今年加入Redis商業公司5個月后,Redis宣傳從Redis8開始,Redis項目重新開源。
本篇文章主要介紹Redis6.0多線程IO特性。
二、Redis6.0 多線程 IO 概述
Redis6.0引入多線程IO,但多線程部分只是用來處理網絡數據的讀寫和協議解析,執行命令仍然是單線程。默認是不開啟的,需要進程啟動前開啟配置,并且在運行期間無法通過 config set 命令動態修改。
參數與配置
多線程IO涉及下面兩個配置參數:
# io-threads 4 IO 線程數量
# io-threads-do-reads no 讀數據及數據解析是否也用 IO 線程
- io-threads 表示IO線程數量, io-threads 設置為1時(代碼中默認值),表示只使用主線程,不開啟多線程IO。因此,若要配置開啟多線程IO,需要設置 io-threads 大于1,但不可以超過最大值128。
- 但在默認情況下,Redis只將多線程IO用于向客戶端寫數據,因為作者認為通常使用多線程執行讀數據的操作幫助不是很大。如果需要使用多線程用于讀數據和解析數據,則需要將參數 io-threads-do-reads 設置為 yes 。
- 此兩項配置參數在Redis運行期間無法通過 config set 命令修改,并且開啟SSL時,不支持多線程IO特性。
- 若機器CPU將至少超過4核時,則建議開啟,并且至少保留一個備用CPU核,使用超過8個線程可能并不會有多少幫助。
執行流程概述
Redis6.0引入多線程IO后,讀寫數據執行流程如下所示:
圖片
流程簡述
- 主線程負責接收建立連接請求,獲取socket放入全局等待讀處理隊列。
- 主線程處理完讀事件之后,通過RR(Round Robin)將這些連接分配給這些IO線程,也會分配給主線程自己。
- 主線程先讀取分配給自己的客戶端數據,然后阻塞等待其他IO線程讀取socket完畢。
- IO線程將請求數據讀取并解析完成(這里只是讀數據和解析、并不執行)。
- 主線程通過單線程的方式執行請求命令。
- 主線程通過RR(Round Robin)將回寫客戶端事件分配給這些IO線程,也會分配給主線程自己。
- 主線程同樣執行部分寫數據到客戶端,然后阻塞等待IO線程將數據回寫socket完畢。
設計特點
- IO線程要么同時在讀socket,要么同時在寫,不會同時讀和寫。
- IO線程只負責讀寫socket解析命令,不負責命令執行。
- 主線程也會參與數據的讀寫。
三、源碼分析
多線程IO相關源代碼都在源文件networking.c中最下面。
初始化
主線程在main函數中調用InitServerLast函數,InitServerLast函數中調用initThreadedIO函數,在initThreadedIO函數中根據配置文件中的線程數量,創建對應數量的IO工作線程數量。
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
- 如果 io_threads_num 的數量為1,則只運行主線程, io_threads_num 的IO線程數量不允許超過 128。
- 序號為0的線程是主線程,因此實際的工作線程數目是io-threads - 1。
初始化流程
- 為包括主線程在內的每個線程分配list列表,用于后續保存待處理的客戶端。
- 為主線程以外的其他IO線程初始化互斥對象mutex,但是立即調用pthread_mutex_lock占有互斥量,將io_threads_pending[i]設置為0,接著創建對應的IO工作線程。
- 占用互斥量是為了創建IO工作線程后,可暫時等待后續啟動IO線程的工作,因為IOThreadMain函數在io_threads_pending[id] == 0時也調用了獲取mutex,所以此時無法繼續向下運行,等待啟動。
- 在startThreadedIO函數中會釋放mutex來啟動IO線程工作。何時調用startThreadedIO打開多線程IO,具體見下文的「多線程IO動態暫停與開啟」。
IO 線程主函數
IO線程主函數代碼如下所示:
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
從IO線程主函數邏輯可以看到:
- 如果IO線程等待處理任務數量為0,則IO線程一直在空循環,因此后面主線程給IO線程分發任務后,需要設置IO線程待處理任務數 io_threads_pending[id] ,才會觸發IO線程工作。
- 如果IO線程等待處理任務數量為0,并且未獲取到mutex鎖,則會等待獲取鎖,暫停運行,由于主線程在創建IO線程之前先獲取了鎖,因此IO線程剛啟動時是暫停運行狀態,需要等待主線程釋放鎖,啟動IO線程。
- IO線程待處理任務數為0時,獲取到鎖并再次釋放鎖,是為了讓主線程可以暫停IO線程。
- 只有io_threads_pending[id]不為0時,則繼續向下執行操作,根據io_threads_op決定是讀客戶端還是寫客戶端,從這里也可以看出IO線程要么同時讀,要么同時寫。
讀數據流程
主線程將待讀數據客戶端加入隊列
當客戶端連接有讀事件時,會觸發調用readQueryFromClient函數,在該函數中會調用postponeClientRead。
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
......以下省略
}
/* Return 1 if we want to handle the client read later using threaded I/O.
* This is called by the readable handler of the event loop.
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
if (io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
如果開啟多線程,并且開啟多線程讀(io_threads_do_reads 為 yes),則將客戶端標記為CLIENT_PENDING_READ,并且加入clients_pending_read列表。
然后readQueryFromClient函數中就立即返回,主線程沒有執行從客戶端連接中讀取的數據相關邏輯,讀取了客戶端數據行為等待后續各個IO線程執行。
主線程分發并阻塞等待
主線程在beforeSleep函數中會調用handleClientsWithPendingReadsUsingThreads函數。
/* When threaded I/O is also enabled for the reading + parsing side, the
* readable handler will just put normal clients into a queue of clients to
* process (instead of serving them synchronously). This function runs
* the queue using the I/O threads, and process them in order to accumulate
* the reads in the buffers, and also parse the first command available
* rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
/* Run the list of clients again to process the new buffers. */
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
}
processInputBuffer(c);
}
return processed;
}
- 先檢查是否開啟多線程,以及是否開啟多線程讀數據(io_threads_do_reads),未開啟直接返回。
- 檢查隊列clients_pending_read長度,為0直接返回,說明沒有待讀事件。
- 遍歷clients_pending_read隊列,通過RR算法,將隊列中的客戶端循環分配給各個IO線程,包括主線程本身。
- 設置io_threads_op = IO_THREADS_OP_READ,并且將io_threads_pending數組中各個位置值設置為對應各個IO線程分配到的客戶端數量,如上面介紹,目的是為了使IO線程工作。
- 主線程開始讀取客戶端數據,因為主線程也分配了任務。
- 主線程阻塞等待,直到所有的IO線程都完成讀數據工作。
- 主線程執行命令。
IO 線程讀數據
在IO線程主函數中,如果 io_threads_op == IO_THREADS_OP_READ ,則調用readQueryFromClient從網絡中讀取數據。
IO 線程讀取數據后,不會執行命令。
在readQueryFromClient函數中,最后會執行processInputBuffer函數,在processInputBuffe函數中,如IO線程檢查到客戶端設置了CLIENT_PENDING_READ標志,則不執行命令,直接返回。
......省略
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
...... 省略
寫數據流程
命令處理完成后,依次調用:
addReply-->prepareClientToWrite-->clientInstallWriteHandler,將待寫客戶端加入隊列clients_pending_write。
void clientInstallWriteHandler(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for slaves, if the slave can actually receive
* writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
* to write to the socket. This way before re-entering the event
* loop, we can try to directly write to the client sockets avoiding
* a system call. We'll only really install the write handler if
* we'll not be able to write the whole reply at once. */
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
}
在beforeSleep函數中調用handleClientsWithPendingWritesUsingThreads。
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but thejboring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
/* Start threads if needed. */
if (!io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}
- 判斷clients_pending_write隊列的長度,如果為0則直接返回。
- 判斷是否開啟了多線程,若只有很少的客戶端需要寫,則不使用多線程IO,直接在主線程完成寫操作。
- 如果使用多線程IO來完成寫數據,則需要判斷是否先開啟多線程IO(因為會動態開啟與暫停)。
- 遍歷clients_pending_write隊列,通過RR算法,循環將所有客戶端分配給各個IO線程,包括主線程自身。
- 設置io_threads_op = IO_THREADS_OP_WRITE,并且將io_threads_pending數組中各個位置值設置為對應的各個IO線程分配到的客戶端數量,目的是為了使IO線程工作。
- 主線程開始寫客戶端數據,因為主線程也分配了任務,寫完清空任務隊列。
- 阻塞等待,直到所有IO線程完成寫數據工作。
- 再次遍歷所有客戶端,如果有需要,為客戶端在事件循環上安裝寫句柄函數,等待事件回調。
多線程 IO 動態暫停與開啟
從上面的寫數據的流程中可以看到,在Redis運行過程中多線程IO是會動態暫停與開啟的。
在上面的寫數據流程中,先調用stopThreadedIOIfNeeded函數判斷是否需要暫停多線程IO,當等待寫的客戶端數量低于線程數的2倍時,會暫停多線程IO,否則就會打開多線程。
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
/* Return ASAP if IO threads are disabled (single threaded mode). */
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
在寫數據流程handleClientsWithPendingWritesUsingThreads函數中,stopThreadedIOIfNeeded返回0的話,就會執行下面的startThreadedIO函數,開啟多線程IO。
void startThreadedIO(void) {
serverAssert(server.io_threads_active == 0);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]);
server.io_threads_active = 1;
}
void stopThreadedIO(void) {
/* We may have still clients with pending reads when this function
* is called: handle them before stopping the threads. */
handleClientsWithPendingReadsUsingThreads();
serverAssert(server.io_threads_active == 1);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
server.io_threads_active = 0;
}
從上面的代碼中可以看出:
- 開啟多線程IO是通過釋放mutex鎖來讓IO線程開始執行讀數據或者寫數據動作。
- 暫停多線程IO則是通過加鎖來讓IO線程暫時不執行讀數據或者寫數據動作,此處加鎖后,IO線程主函數由于無法獲取到鎖,因此會暫時阻塞。
四、性能對比
測試環境
兩臺物理機配置:CentOS Linux release 7.3.1611(Core) ,12核CPU1.5GHz,256G內存(free 128G)。
Redis版本
使用Redis6.0.6,多線程IO模式使用線程數量為4,即 io-threads 4 ,參數 io-threads-do-reads 分別設置為 no 和 yes ,進行對比測試。
壓測命令
redis-benchmark -h 172.xx.xx.xx -t set,get -n 1000000 -r 100000000 --threads ${threadsize} -d ${datasize} -c ${clientsize}
單線程 threadsize 為 1,多線程 threadsize 為 4
datasize為value 大小,分別設置為 128/512/1024
clientsize 為客戶端數量,分別設置為 256/2000
如:./redis-benchmark -h 172.xx.xx.xx -t set,get -n 1000000 -r 100000000 --threads 4 -d 1024 -c 256
統計結果
當 io-threads-do-reads 為 no 時,統計圖表如下所示(c 2000表示客戶端數量為2000)。
圖片
當 io-threads-do-reads 為 yes 時,統計圖表如下所示(c 256表示客戶端數量為256)。
圖片
結論
使用redis-benchmark做Redis6單線程和多線程簡單SET/GET命令性能測試:
- 從上面可以看到GET/SET命令在設置4個IO線程時,QPS相比于大部分情況下的單線程,性能幾乎是翻倍了。
- 連接數越多,多線程優勢越明顯。
- value值越小,多線程優勢越明顯。
- 使用多線程讀命令比寫命令優勢更加明顯,當value越大,寫命令越發沒有明顯的優勢。
- 參數 io-threads-do-reads 為yes,性能有微弱的優勢,不是很明顯。
- 總體來說,以上結果基本符合預期,結果僅作參考。
五、6.0 多線程 IO 不足
盡管引入多線程IO大幅提升了Redis性能,但是Redis6.0的多線程IO仍然存在一些不足:
- CPU核心利用率不足:當前主線程仍負責大部分的IO相關任務,并且當主線程處理客戶端的命令時,IO線程會空閑相當長的時間,同時值得注意的是,主線程在執行IO相關任務期間,性能受到最慢IO線程速度的限制。
- IO線程執行的任務有限:目前,由于主線程同步等待IO線程,線程僅執行讀取解析和寫入操作。如果線程可以異步工作,我們可以將更多工作卸載到IO線程上,從而減少主線程的負載。
- 不支持帶有TLS的IO線程。
最新的Valkey8.0版本中,通過引入異步IO線程,將更多的工作轉移到IO線程執行,同時通過批量預讀取內存數據減少內存訪問延遲,大幅提高Valkey單節點訪問QPS,單個實例每秒可處理100萬個請求。我們后續再詳細介紹Valkey8.0異步IO特性。
六、總結
Redis6.0引入多線程IO,但多線程部分只是用來處理網絡數據的讀寫和協議解析,執行命令仍然是單線程。通過開啟多線程IO,并設置合適的CPU數量,可以提升訪問請求一倍以上。
Redis6.0多線程IO仍然存在一些不足,沒有充分利用CPU核心,在最新的Valkey8.0版本中,引入異步IO將進一步大幅提升Valkey性能。