解密高性能異步I/O:io_uring的魔力與應用
它的高效性和創新性,使得開發者能夠構建出性能更卓越的應用程序,無論是在高性能網絡服務、數據庫系統,還是大規模文件處理等場景中,io_uring 都展現出了巨大的潛力。接下來,讓我們一起深入探索 io_uring 的實現原理與應用案例。
一、io_uring簡介
1.1io_uring概述
io_uring 是一個Linux內核提供的高性能異步 I/O 框架,最初在 Linux 5.1 版本中引入。它的設計目標是解決傳統的異步 I/O 模型(如epoll或者 POSIX AIO)在大規模 I/O 操作中效率不高的問題。
在傳統的 Linux I/O 操作中,存在一些性能瓶頸。例如,系統調用的開銷較大,同步 I/O 操作會導致線程在等待 I/O 完成時被阻塞,浪費了 CPU 資源。隨著對高性能、高并發服務器和應用程序的需求不斷增加,需要一種更高效的 I/O 處理機制。io_uring 應運而生,它是由 Jens Axboe 開發的,目的是為了解決這些傳統 I/O 機制的效率問題。
*過往IO接口的缺陷
(1)同步IO接口
最原始的文件IO系統調用就是read,write。read系統調用從文件描述符所指代的打開文件中讀取數據。write系統調用將數據寫入一個已打開的文件中。在文件特定偏移處的IO是pread,pwrite。調用時可以指定位置進行文件IO操作,而非始于文件的當前偏移處,且他們不會改變文件的當前偏移量。
分散輸入和集中輸出(Scatter-Gather IO)是readv, writev,調用并非只對單個緩沖區進行讀寫操作,而是一次即可傳輸多個緩沖區的數據,免除了多次系統調用的開銷,提高文件 I/O 的效率,特別是當需要讀寫多個連續或非連續的數據塊時。
該機制使用一個數組iov定義了一組用來傳輸數據的緩沖區,一個整形數iovcnt指定iov的成員個數,其中,iov中的每個成員都是如下形式的數據結構。
struct iovec {
void *iov_base; /* Starting address */
size_t iov_len; /* Number of bytes to transfer */
};
上述接口在讀寫IO時,系統調用會阻塞住等待,在數據讀取或寫入后才返回結果。同步導致的后果就是在阻塞的同時無法繼續執行其他的操作,只能等待IO結果返回。存儲場景中對性能的要求非常高,所以需要異步IO。
(2)異步IO接口:AIO
Linux 的異步 IO(AIO,Asynchronous I/O)是一種高級的文件 IO 模型,允許應用程序在發起 IO 操作后不必等待操作完成,而是可以繼續執行其他任務。這與傳統的同步 IO 模型不同,后者在 IO 操作完成之前會阻塞應用程序的執行。
1.2io_uring設計思路
(1)解決“系統調用開銷大”的問題?
針對這個問題,考慮是否每次都需要系統調用。如果能將多次系統調用中的邏輯放到有限次數中來,就能將消耗降為常數時間復雜度。
(2)解決“拷貝開銷大”的問題?
之所以在提交和完成事件中存在大量的內存拷貝,是因為應用程序和內核之間的通信需要拷貝數據,所以為了避免這個問題,需要重新考量應用與內核間的通信方式。我們發現,兩者通信,不是必須要拷貝,通過現有技術,可以讓應用與內核共享內存。
要實現核外與內核的零拷貝,最佳方式就是實現一塊內存映射區域,兩者共享一段內存,核外往這段內存寫數據,然后通知內核使用這段內存數據,或者內核填寫這段數據,核外使用這部分數據。因此,需要一對共享的ring buffer用于應用程序和內核之間的通信。
- 一塊用于核外傳遞數據給內核,一塊是內核傳遞數據給核外,一方只讀,一方只寫。
- 提交隊列SQ(submission queue)中,應用是IO提交的生產者,內核是消費者。
- 完成隊列CQ(completion queue)中,內核是IO完成的生產者,應用是消費者。
- 內核控制SQ ring的head和CQ ring的tail,應用程序控制SQ ring的tail和CQ ring的head
(3)解決“API不友好”的問題?
問題在于需要多個系統調用才能完成,考慮是否可以把多個系統調用合而為一。有時候,將多個類似的函數合并并通過參數區分不同的行為是更好的選擇,而有時候可能需要將復雜的函數分解為更簡單的部分來進行重構。
如果發現函數中的某一部分代碼可以獨立出來成為一個單獨的函數,可以先進行這樣的提煉,然后再考慮是否需要進一步使用參數化方法重構。
1.3與其他 I/O 模型的對比
阻塞 I/O:阻塞 I/O 是最基本的 I/O 模型,在這種模型下,當應用程序調用 I/O 操作(如 read、write)時,線程會被阻塞,直到 I/O 操作完成。例如,當讀取一個文件時,線程會一直等待,直到數據從磁盤讀取到內存中。這種模型的優點是簡單直觀,易于理解和實現,但缺點也很明顯,在 I/O 操作期間,線程無法執行其他任務,這在高并發場景下會導致大量線程被阻塞,嚴重降低系統的性能和響應速度。例如,在一個同時處理多個客戶端請求的服務器中,如果使用阻塞 I/O,每個請求都可能導致線程阻塞,當請求數量較多時,服務器將無法及時響應其他請求。
非阻塞 I/O:非阻塞 I/O 允許應用程序在 I/O 操作未完成時立即返回,線程不會被阻塞。應用程序可以通過輪詢的方式檢查 I/O 操作的狀態,以確定是否完成。雖然這種模型避免了線程的阻塞,但頻繁的輪詢會消耗大量的 CPU 資源,并且在 I/O 操作較多時,管理和協調這些操作會變得非常復雜。以網絡編程為例,在非阻塞 I/O 模式下,應用程序需要不斷地檢查 socket 是否有數據可讀或可寫,這會增加 CPU 的負擔,降低系統的整體性能。
epoll:epoll 是一種 I/O 多路復用技術,它允許應用程序同時監控多個文件描述符的事件(如可讀、可寫、異常等)。當有事件發生時,epoll 會通知應用程序進行處理。epoll 在一定程度上提高了 I/O 操作的效率,特別是在高并發場景下,它通過減少系統調用的次數,降低了 CPU 的開銷。然而,epoll 本質上還是同步 I/O,它只是提供了一種高效的事件通知機制,應用程序在處理 I/O 事件時,仍然需要進行實際的 I/O 操作,這可能會導致線程阻塞。比如,在一個使用 epoll 的網絡服務器中,當有新的連接請求或數據到達時,epoll 會通知應用程序,但應用程序在讀取或寫入數據時,仍然可能會因為 I/O 操作的延遲而阻塞線程。
傳統 AIO:傳統 AIO 雖然提供了異步 I/O 的功能,但存在諸多限制。如前文所述,它只能在 Direct I/O 模式下使用,無法利用頁緩存,這使得數據讀寫的效率受到影響。此外,傳統 AIO 在 I/O 提交時可能會出現阻塞,導致其異步性并不完全可靠。在實際應用中,由于這些限制,傳統 AIO 的使用場景相對較窄,難以滿足大多數應用程序對高效 I/O 的需求。
相比之下,io_uring 具有明顯的優勢。它通過用戶態和內核態共享提交隊列(Submission Queue)和完成隊列(Completion Queue) ,減少了系統調用的次數和上下文切換的開銷。在 io_uring 中,應用程序只需將 I/O 請求放入提交隊列,內核會在后臺處理這些請求,并將結果放入完成隊列,應用程序可以隨時從完成隊列中獲取結果,無需頻繁進行系統調用和輪詢。此外,io_uring 支持更多的異步系統調用,不僅適用于存儲文件的 I/O 操作,還能很好地應用于網絡套接字(network sockets)的 I/O 處理,具有更廣泛的適用性和更高的靈活性。
二、io_uring的實現原理
io_uring實現異步I/O的方式其實是一個生產者-消費者模型:
- 用戶進程生產I/O請求,放入提交隊列(Submission Queue,簡稱SQ)。
- 內核消費SQ中的I/O請求,完成后將結果放入完成隊列(Completion Queue,簡稱CQ)。
- 用戶進程從CQ中收割I/O結果。
SQ和CQ是內核初始化io_uring實例的時候創建的。為了減少系統調用和減少用戶進程與內核之間的數據拷貝,io_uring使用mmap的方式讓用戶進程和內核共享SQ和CQ的內存空間。
另外,由于先提交的I/O請求不一定先完成,SQ保存的其實是一個數組索引(數據類型 uint32),真正的SQE(Submission Queue Entry)保存在一個獨立的數組(SQ Array)。所以要提交一個I/O請求,得先在SQ Array中找到一個空閑的SQE,設置好之后,將其數組索引放到SQ中。
用戶進程、內核、SQ、CQ和SQ Array之間的基本關系如下:
圖片
2.1核心組件解析
提交隊列(SQ)與提交隊列項(SQE):提交隊列(Submission Queue,簡稱 SQ)是 io_uring 中用于存儲 I/O 請求的隊列,它是一個環形緩沖區,位于用戶態和內核態共享的內存區域。每個 I/O 請求在提交隊列中都以提交隊列項(Submission Queue Entry,簡稱 SQE)的形式存在。SQE 是一個結構體,它存儲了 I/O 請求的詳細信息,包括操作類型(如讀、寫、異步連接等)、目標文件描述符、緩沖區地址、操作長度、偏移量等關鍵信息。
例如,在進行文件讀取操作時,SQE 會記錄要讀取的文件描述符、讀取數據的緩沖區地址、讀取的字節數以及文件中的偏移量等信息。應用程序通過填充 SQE 結構體,并將其添加到 SQ 中,來向內核提交 I/O 請求。由于 SQ 是環形緩沖區,當隊列滿時,新的請求會覆蓋舊的請求,從而保證 I/O 請求的持續提交。
完成隊列(CQ)與完成隊列項(CQE):完成隊列(Completion Queue,簡稱 CQ)同樣是一個環形緩沖區,用于存儲 I/O 請求的完成結果。當內核完成一個 I/O 操作后,會將操作的結果封裝成一個完成隊列項(Completion Queue Entry,簡稱 CQE),并將其放入 CQ 中。CQE 結構體包含了 I/O 操作的返回值、狀態碼、用戶自定義數據等信息。
通過這些信息,應用程序可以判斷 I/O 操作是否成功,并獲取操作的相關結果。比如,在文件讀取操作完成后,CQE 中的返回值會表示實際讀取的字節數,狀態碼則用于指示操作是否成功,若操作失敗,狀態碼會包含具體的錯誤信息。應用程序可以通過輪詢 CQ 或者等待特定的事件通知,來獲取完成的 I/O 請求結果,從而進行后續的處理。
SQ Ring 與 CQ Ring:SQ Ring 和 CQ Ring 分別是提交隊列和完成隊列的環形緩沖區結構。它們包含了隊列本身(即 SQ 和 CQ)、頭部索引(head)、尾部索引(tail)以及隊列大小等關鍵信息。頭部索引(head)指向隊列中第一個待處理的元素,而尾部索引(tail)則指向隊列中下一個空閑的位置。當應用程序向 SQ 提交 I/O 請求時,它會將請求信息填充到 tail 指向的 SQE 中,然后將 tail 指針遞增,指向下一個空閑位置。
內核在處理 I/O 請求時,會從 head 指向的 SQE 中獲取請求信息,處理完成后,將結果放入 CQ 中。同樣,CQ Ring 通過 head 和 tail 指針來管理完成隊列,內核將完成的 I/O 結果放入 tail 指向的 CQE 中,并遞增 tail 指針,應用程序則從 head 指向的 CQE 中獲取結果。這種環形緩沖區結構以及基于 head 和 tail 指針的操作方式,實現了用戶態和內核態之間高效的數據交換,減少了鎖的使用和上下文切換的開銷,從而大大提高了 I/O 操作的效率。
2.2系統調用詳解
io_uring的實現僅僅使用了三個syscall:io_uring_setup, io_uring_enter和io_uring_register。
這幾個系統調用接口都在io_uring.c文件中:
⑴io_uring_setup
io_uring_setup 是用于初始化 io_uring 環境的系統調用。在使用 io_uring 進行異步 I/O 操作之前,首先需要調用 io_uring_setup 來創建一個 io_uring 實例。它接受兩個參數,第一個參數是期望的提交隊列(SQ)的大小,即隊列中可以容納的 I/O 請求數量;第二個參數是一個指向 io_uring_params 結構體的指針,該結構體用于返回 io_uring 實例的相關參數,如實際分配的 SQ 和完成隊列(CQ)的大小、隊列的偏移量等信息。
在調用 io_uring_setup 時,內核會為 io_uring 實例分配所需的內存空間,包括 SQ、CQ 以及相關的控制結構。同時,內核還會創建一些內部數據結構,用于管理和調度 I/O 請求。如果初始化成功,io_uring_setup 會返回一個文件描述符,這個文件描述符用于標識創建的 io_uring 實例,后續的 io_uring 系統調用(如 io_uring_enter、io_uring_register)將通過這個文件描述符來操作該 io_uring 實例。若初始化失敗,函數將返回一個負數,表示相應的錯誤代碼。
io_uring_setup():
SYSCALL_DEFINE2(io_uring_setup, u32, entries,
struct io_uring_params __user *, params)
{
return io_uring_setup(entries, params);
}
- 功能:用于初始化和配置 io_uring 。
- 應用用途:在使用 io_uring 之前,首先需要調用此接口初始化一個 io_uring 環,并設置其參數。
⑵io_uring_enter
io_uring_enter 是用于提交和等待 I/O 操作的系統調用。它的主要作用是將應用程序準備好的 I/O 請求提交給內核,并可以選擇等待這些操作完成。io_uring_enter 接受多個參數,其中包括 io_uring_setup 返回的文件描述符,用于指定要操作的 io_uring 實例;to_submit 參數表示要提交的 I/O 請求的數量,即從提交隊列(SQ)中取出并提交給內核的 SQE 的數量;min_complete 參數指定了內核在返回之前必須等待完成的 I/O 操作的最小數量;flags 參數則用于控制 io_uring_enter 的行為,例如可以設置是否等待 I/O 操作完成、是否獲取完成的 I/O 事件等。當調用 io_uring_enter 時,如果 to_submit 參數大于 0,內核會從 SQ 中取出相應數量的 SQE,并將這些 I/O 請求提交到內核中進行處理。
同時,如果設置了等待 I/O 操作完成的標志,內核會阻塞等待,直到至少有 min_complete 個 I/O 操作完成,然后將這些完成的操作結果放入完成隊列(CQ)中。應用程序可以通過檢查 CQ 來獲取這些完成的 I/O 請求的結果。通過 io_uring_enter,應用程序可以靈活地控制 I/O 請求的提交和等待策略,提高 I/O 操作的效率和靈活性。
io_uring_enter():
SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
u32, min_complete, u32, flags, const void __user *, argp,
size_t, argsz)
- 功能:用于提交和處理異步 I/O 操作。
- 應用用途:在向 io_uring 環中提交 I/O 操作后,通過調用此接口觸發內核處理這些操作,并獲取完成的操作結果。
⑶io_uring_register
io_uring_register 用于注冊文件描述符或事件文件描述符到 io_uring 實例中,以便在后續的 I/O 操作中使用。它接受四個參數,第一個參數是 io_uring_setup 返回的文件描述符,用于指定要注冊到的 io_uring 實例;第二個參數 opcode 表示注冊的類型,例如可以是 IORING_REGISTER_FILES(注冊文件描述符集合)、IORING_REGISTER_BUFFERS(注冊內存緩沖區)、IORING_REGISTER_EVENTFD(注冊 eventfd 用于通知完成事件)等;
第三個參數 arg 是一個指針,根據 opcode 的類型不同,它指向不同的內容,如注冊文件描述符時,arg 指向一個包含文件描述符的數組;注冊緩沖區時,arg 指向一個描述緩沖區的結構體數組;第四個參數 nr_args 表示 arg 所指向的數組的長度。通過 io_uring_register 注冊文件描述符或緩沖區等資源后,內核在處理 I/O 請求時,可以直接訪問這些預先注冊的資源,而無需每次都重新設置相關信息,從而提高了 I/O 操作的效率。例如,在進行大量文件讀寫操作時,預先注冊文件描述符可以避免每次提交 I/O 請求時都進行文件描述符的查找和驗證,減少了系統開銷,提升了 I/O 性能。
io_uring_register():
SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
void __user *, arg, unsigned int, nr_args)
- 功能:用于注冊文件描述符、緩沖區、事件文件描述符等資源到 io_uring 環中。
- 應用用途:在進行 I/O 操作之前,需要將相關的資源注冊到 io_uring 環中,以便進行后續的異步 I/O 操作。
2.3工作流程深度剖析
①創建 io_uring 對象
使用 io_uring 進行異步 I/O 操作的第一步是創建 io_uring 對象。內核提供了io_uring_setup系統調用來初始化一個io_uring實例,創建SQ、CQ和SQ Array,entries參數表示的是SQ和SQArray的大小,CQ的大小默認是2 * entries。params參數既是輸入參數,也是輸出參數。
該函數返回一個file descriptor,并將io_uring支持的功能、以及各個數據結構在fd中的偏移量存入params。用戶根據偏移量將fd通過mmap內存映射得到一塊內核用戶共享的內存區域。這塊內存區域中,有io_uring的上下文信息:SQ信息、CQ信息和SQ Array信息。
int io_uring_setup(int entries, struct io_uring_params *params);
這通過調用 io_uring_setup 系統調用來完成。在調用 io_uring_setup 時,用戶需要指定提交隊列(SQ)的大小,即期望的 I/O 請求隊列長度。內核會根據這個請求,為 io_uring 對象分配必要的內存空間,包括提交隊列(SQ)、完成隊列(CQ)以及相關的控制結構。內核會創建一個 io_ring_ctx 結構體對象,用于管理 io_uring 的上下文信息。
同時,還會創建一個 io_urings 結構體對象,該對象包含了 SQ 和 CQ 的具體實現,如隊列的頭部索引(head)、尾部索引(tail)、隊列大小等信息。在創建過程中,內核會初始化 SQ 和 CQ 的所有隊列項(SQE 和 CQE),并設置好相關的指針和標志位。如果用戶在調用 io_uring_setup 時設置了 IORING_SETUP_SQPOLL 標志位,內核還會創建一個 SQ 線程,用于從 SQ 隊列中獲取 I/O 請求并提交給內核處理。
創建完成后,io_uring_setup 會返回一個文件描述符,這個文件描述符是后續操作 io_uring 對象的關鍵標識,通過它可以進行 I/O 請求的提交、注冊文件描述符等操作。
②準備 I/O 請求
在創建 io_uring 對象后,需要準備具體的 I/O 請求。這通常通過 io_uring_prep_XXX 系列函數來完成,這些函數用于準備不同類型的 I/O 請求,如 io_uring_prep_read 用于準備讀取操作,io_uring_prep_write 用于準備寫入操作,io_uring_prep_accept 用于準備異步接受連接操作等。
以 io_uring_prep_read 為例,它接受多個參數,包括指向提交隊列項(SQE)的指針、目標文件描述符、讀取數據的緩沖區地址、讀取的字節數以及文件中的偏移量等。函數會根據這些參數,將 I/O 請求的相關信息填充到 SQE 結構體中,包括設置操作類型(如 IORING_OP_READ)、目標文件描述符、緩沖區地址、數據長度、偏移量等字段。
除了基本的 I/O 操作參數外,還可以設置一些額外的標志位和選項,如 I/O 操作的優先級、是否使用直接 I/O 等,以滿足不同的應用需求。通過這些函數,應用程序可以靈活地構建各種類型的 I/O 請求,并將其準備好以便提交到內核中進行處理。
③提交 I/O 請求
當 I/O 請求準備好后,需要將其提交到內核中執行。這通過調用 io_uring_submit 函數(內部調用 io_uring_enter 系統調用)來實現。在提交 I/O 請求時,首先應用程序會將準備好的 SQE 添加到提交隊列(SQ)中。SQ 是一個環形緩沖區,應用程序通過操作 SQ Ring 中的 tail 指針來將 SQE 放入隊列。具體來說,應用程序會將 tail 指向的 SQE 填充為準備好的 I/O 請求信息,然后將 tail 指針遞增,指向下一個空閑的 SQE 位置。在填充 SQE 時,需要注意按照 SQE 結構體的定義,正確設置各項字段,確保 I/O 請求的信息準確無誤。
默認情況下,使用 io_uring 提交 I/O 請求需要:
- 從SQ Arrary中找到一個空閑的SQE;
- 根據具體的I/O請求設置該SQE;
- 將SQE的數組索引放到SQ中;
- 調用系統調用io_uring_enter提交SQ中的I/O請求。
圖片
當所有要提交的 I/O 請求都添加到 SQ 中后,調用 io_uring_submit 函數,該函數會觸發 io_uring_enter 系統調用,將 SQ 中的 I/O 請求提交給內核。內核接收到請求后,會從 SQ 中獲取 SQE,并根據 SQE 中的信息執行相應的 I/O 操作。在這個過程中,由于 SQ 是用戶態和內核態共享的內存區域,避免了數據的多次拷貝和額外的系統調用開銷,提高了 I/O 請求提交的效率。
④等待 IO 請求完成
提交 I/O 請求后,應用程序可以選擇等待請求完成。等待 I/O 請求完成有兩種主要方式。一種是使用 io_uring_wait_cqe 函數,該函數會阻塞調用線程,直到至少有一個 I/O 請求完成,并返回完成的完成隊列項(CQE)。當調用 io_uring_wait_cqe 時,它會檢查完成隊列(CQ)中是否有新完成的 I/O 請求。如果沒有,線程會進入阻塞狀態,直到內核將完成的 I/O 請求結果放入 CQ 中。一旦有新的 CQE 可用,io_uring_wait_cqe 會返回該 CQE,應用程序可以通過 CQE 獲取 I/O 操作的結果。
另一種方式是使用 io_uring_peek_batch_cqe 函數,它是非阻塞的,用于檢查 CQ 中是否有已經完成的 I/O 請求。如果有,它會返回已完成的 CQE 列表,應用程序可以根據返回的 CQE 進行相應的處理;如果沒有完成的請求,函數會立即返回,應用程序可以繼續執行其他任務,然后在適當的時候再次調用該函數檢查 CQ。這兩種方式為應用程序提供了靈活的等待策略,使其可以根據自身的業務需求和性能要求,選擇合適的方式來處理 I/O 請求的完成事件。
⑤獲取 IO 請求結果
當 I/O 請求完成后,應用程序需要從完成隊列(CQ)中獲取結果。這可以通過 io_uring_peek_cqe 函數來實現。io_uring_peek_cqe 函數用于從 CQ 中獲取一個完成的 CQE,而不將其從隊列中移除。應用程序獲取到 CQE 后,可以根據 CQE 中的信息來處理完成的 I/O 請求。CQE 中包含了豐富的信息,如 I/O 操作的返回值、狀態碼、用戶自定義數據等。例如,對于文件讀取操作,CQE 中的返回值表示實際讀取的字節數,狀態碼用于指示操作是否成功,若操作失敗,狀態碼會包含具體的錯誤信息。
應用程序可以根據這些信息進行相應的處理,如讀取數據并進行后續的業務邏輯處理,或者在操作失敗時進行錯誤處理,如記錄錯誤日志、重新嘗試 I/O 操作等。在獲取 CQE 后,應用程序通常會根據 I/O 操作的類型和結果,執行相應的業務邏輯,以實現應用程序的功能需求。
⑥釋放 IO 請求結果
在獲取并處理完 IO 請求結果后,需要釋放該結果,以便內核可以繼續使用完成隊列(CQ)。這通過調用 io_uring_cqe_seen 函數來實現。io_uring_cqe_seen 函數的作用是標記一個完成的 CQE 已經被處理,它會將 CQ Ring 中的 head 指針遞增,指向下一個未處理的 CQE。通過這種方式,內核可以知道哪些 CQE 已經被應用程序處理,從而可以繼續向 CQ 中放入新的完成結果。
在釋放 IO 請求結果時,需要注意確保已經完成了對 CQE 中信息的處理,避免在釋放后再次訪問已釋放的 CQE。同時,及時釋放 CQE 也有助于提高系統的性能和資源利用率,避免 CQ 隊列被占用過多而影響后續 I/O 請求結果的存儲和處理。通過正確地釋放 IO 請求結果,保證了 io_uring 的工作流程能夠持續高效地運行,為應用程序提供穩定的異步 I/O 服務。
三、io_uring案例分析
3.1簡單文件讀寫案例
⑴代碼實現
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <linux/io_uring.h>
int main() {
struct io_uring ring;
struct io_uring_sqe *sqe;
struct io_uring_cqe *cqe;
int fd, ret;
// 打開文件
fd = open("example.txt", O_RDONLY);
if (fd < 0) {
perror("Failed to open file");
return 1;
}
// 初始化io_uring
io_uring_queue_init(8, &ring, 0);
// 獲取一個提交隊列條目
sqe = io_uring_get_sqe(&ring);
if (!sqe) {
fprintf(stderr, "Could not get sqe\n");
return 1;
}
// 準備異步讀操作
char *buf = malloc(1024);
io_uring_prep_read(sqe, fd, buf, 1024, 0);
// 提交請求
io_uring_submit(&ring);
// 等待完成
ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
perror("io_uring_wait_cqe");
return 1;
}
// 檢查結果
if (cqe->res < 0) {
fprintf(stderr, "Async read failed: %s\n", strerror(-cqe->res));
} else {
printf("Read %d bytes: %s\n", cqe->res, buf);
}
// 釋放資源
io_uring_cqe_seen(&ring, cqe);
io_uring_queue_exit(&ring);
close(fd);
free(buf);
return 0;
}
代碼解讀
文件打開:fd = open("example.txt", O_RDONLY); 這行代碼使用 open 函數打開名為 example.txt 的文件,以只讀模式(O_RDONLY)打開。如果打開失敗,open 函數會返回一個負數,并通過 perror 函數打印錯誤信息,然后程序返回錯誤代碼 1。
io_uring 初始化:io_uring_queue_init(8, &ring, 0); 這行代碼用于初始化 io_uring 實例。其中,第一個參數 8 表示提交隊列(SQ)和完成隊列(CQ)的大小,即隊列中可以容納的 I/O 請求數量;第二個參數 &ring 是指向 io_uring 結構體的指針,用于存儲初始化后的 io_uring 實例;第三個參數 0 表示使用默認的初始化標志。
獲取提交隊列條目:sqe = io_uring_get_sqe(&ring); 從 io_uring 的提交隊列中獲取一個提交隊列項(SQE)。如果獲取失敗,io_uring_get_sqe 函數會返回 NULL,程序會打印錯誤信息并返回錯誤代碼 1。
準備異步讀操作:
char *buf = malloc(1024); //分配 1024 字節的內存空間,用于存儲讀取的文件數據。
io_uring_prep_read(sqe, fd, buf, 1024, 0); 使用 io_uring_prep_read 函數準備一個異步讀操作。它接受五個參數,第一個參數 sqe 是之前獲取的提交隊列項;第二個參數 fd 是要讀取的文件描述符;第三個參數 buf 是用于存儲讀取數據的緩沖區;第四個參數 1024 表示要讀取的字節數;第五個參數 0 表示從文件的起始位置開始讀取。
提交請求:io_uring_submit(&ring); 將準備好的 I/O 請求提交到內核中執行。這個函數會觸發 io_uring_enter 系統調用,將提交隊列中的請求提交給內核。
等待完成:ret = io_uring_wait_cqe(&ring, &cqe); 等待 I/O 操作完成。這個函數會阻塞調用線程,直到至少有一個 I/O 請求完成,并返回完成的完成隊列項(CQE)。如果等待過程中出現錯誤,io_uring_wait_cqe 函數會返回一個負數,程序會通過 perror 函數打印錯誤信息并返回錯誤代碼 1。
檢查結果:
if (cqe->res < 0) 檢查 I/O 操作的結果。如果 cqe->res 小于 0,表示操作失敗,通過 fprintf 函數打印錯誤信息。
else 分支表示操作成功,打印實際讀取的字節數和讀取到的數據。
釋放資源:
io_uring_cqe_seen(&ring, cqe); /* 知內核已經處理完一個完成事件,
釋放相關資源。這通過將完成隊列的頭部指針遞增來實現,以便內核可以繼續使用完成隊列。*/
io_uring_queue_exit(&ring); 釋放 io_uring 實例所占用的資源,包括提交隊列和完成隊列等。
close(fd); 關閉之前打開的文件。
free(buf); 釋放之前分配的內存緩沖區。
3.2網絡編程案例(TCP 服務器)
⑴代碼實現
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <liburing.h>
#define ENTRIES_LENGTH 4096
#define MAX_CONNECTIONS 1024
#define BUFFER_LENGTH 1024
char buf_table[MAX_CONNECTIONS][BUFFER_LENGTH] = {0};
enum {
READ,
WRITE,
ACCEPT,
};
struct conninfo {
int connfd;
int type;
};
void set_read_event(struct io_uring *ring, int fd, void *buf, size_t len, int flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_recv(sqe, fd, buf, len, flags);
struct conninfo ci = {.connfd = fd,.type = READ};
memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
}
void set_write_event(struct io_uring *ring, int fd, const void *buf, size_t len, int flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_send(sqe, fd, buf, len, flags);
struct conninfo ci = {.connfd = fd,.type = WRITE};
memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
}
void set_accept_event(struct io_uring *ring, int fd, struct sockaddr *cliaddr, socklen_t *clilen, unsigned flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_accept(sqe, fd, cliaddr, clilen, flags);
struct conninfo ci = {.connfd = fd,.type = ACCEPT};
memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
}
int main() {
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1) return -1;
struct sockaddr_in servaddr, clientaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(9999);
if (-1 == bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) {
return -2;
}
listen(listenfd, 10);
struct io_uring_params params;
memset(?ms, 0, sizeof(params));
struct io_uring ring;
memset(&ring, 0, sizeof(ring));
/*初始化params 和 ring*/
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ?ms);
socklen_t clilen = sizeof(clientaddr);
set_accept_event(&ring, listenfd, (struct sockaddr *)&clientaddr, &clilen, 0);
while (1) {
struct io_uring_cqe *cqe;
io_uring_submit(&ring);
int ret = io_uring_wait_cqe(&ring, &cqe);
struct io_uring_cqe *cqes[10];
int cqecount = io_uring_peek_batch_cqe(&ring, cqes, 10);
unsigned count = 0;
for (int i = 0; i < cqecount; i++) {
cqe = cqes[i];
count++;
struct conninfo ci;
memcpy(&ci, &cqe->user_data, sizeof(ci));
if (ci.type == ACCEPT) {
int connfd = cqe->res;
char *buffer = buf_table[connfd];
set_read_event(&ring, connfd, buffer, 1024, 0);
set_accept_event(&ring, listenfd, (struct sockaddr *)&clientaddr, &clilen, 0);
} else if (ci.type == READ) {
int bytes_read = cqe->res;
if (bytes_read == 0) {
close(ci.connfd);
} else if (bytes_read < 0) {
close(ci.connfd);
printf("client %d disconnected!\n", ci.connfd);
} else {
char *buffer = buf_table[ci.connfd];
set_write_event(&ring, ci.connfd, buffer, bytes_read, 0);
}
} else if (ci.type == WRITE) {
char *buffer = buf_table[ci.connfd];
set_read_event(&ring, ci.connfd, buffer, 1024, 0);
}
}
io_uring_cq_advance(&ring, count);
}
return 0;
}
⑵代碼解讀
創建監聽套接字:int listenfd = socket(AF_INET, SOCK_STREAM, 0); 使用 socket 函數創建一個 TCP 套接字,AF_INET 表示使用 IPv4 協議,SOCK_STREAM 表示使用流式套接字(即 TCP 協議),0 表示默認協議。如果創建失敗,socket 函數會返回 -1,程序返回 -1。
綁定地址和端口:
填充服務器地址結構體 servaddr,包括地址族(AF_INET)、IP 地址(INADDR_ANY 表示綁定到所有可用的網絡接口)和端口號(9999)。
if (-1 == bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) 使用 bind 函數將創建的套接字綁定到指定的地址和端口。如果綁定失敗,bind 函數返回 -1,程序返回 -2。
監聽連接:listen(listenfd, 10); 使用 listen 函數開始監聽套接字,第二個參數 10 表示最大連接數,即允許同時存在的未處理連接請求的最大數量。
初始化 io_uring:
struct io_uring_params params; 和 struct io_uring ring; 分別定義了 io_uring 的參數結構體和實例結構體。 memset(¶ms, 0, sizeof(params)); 和 memset(&ring, 0, sizeof(ring)); 初始化這兩個結構體的內容為 0。 io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms); 使用 io_uring_queue_init_params 函數初始化 io_uring 實例,ENTRIES_LENGTH 表示提交隊列和完成隊列的大小,&ring 是指向 io_uring 實例的指針,¶ms 是指向參數結構體的指針。
設置接受連接事件:set_accept_event(&ring, listenfd, (struct sockaddr *)&clientaddr, &clilen, 0); 調用 set_accept_event 函數設置一個接受連接的異步事件。在這個函數中,首先獲取一個提交隊列項(SQE),然后使用 io_uring_prep_accept 函數準備接受連接的請求,將相關信息(如監聽套接字、客戶端地址、地址長度等)填充到 SQE 中,并將自定義的連接信息結構體 conninfo 復制到 SQE 的用戶數據區域,用于標識該請求的類型和相關連接信息。
事件循環處理:
while (1) 進入一個無限循環,用于持續處理 I/O 事件。
io_uring_submit(&ring); 提交準備好的 I/O 請求到內核。
int ret = io_uring_wait_cqe(&ring, &cqe); 等待 I/O 操作完成,獲取完成的完成隊列項(CQE)。
struct io_uring_cqe *cqes[10]; 和 int cqecount = io_uring_peek_batch_cqe(&ring, cqes, 10); 使用 io_uring_peek_batch_cqe 函數嘗試批量獲取完成的 CQE,最多獲取 10 個。
遍歷獲取到的 CQE:
struct conninfo ci; 和 memcpy(&ci, &cqe->user_data, sizeof(ci)); 從 CQE 的用戶數據區域復制之前設置的連接信息結構體 conninfo。
根據連接信息中的類型(ci.type)進行不同的處理:
如果是 ACCEPT 類型,表示有新的連接請求被接受。獲取新的連接描述符 connfd,設置讀取事件,準備從新連接中讀取數據,并再次設置接受連接事件,以便繼續接受新的連接請求。 如果是 READ 類型,表示有數據可讀。根據讀取的字節數進行處理,如果讀取到的字節數為 0,表示客戶端斷開連接,關閉連接;如果讀取失?。ㄗ止潝敌∮?0),也關閉連接并打印斷開連接的信息;如果讀取成功,設置寫入事件,將讀取到的數據回顯給客戶端。 如果是 WRITE 類型,表示數據寫入完成,設置讀取事件,準備從客戶端讀取下一次的數據。 io_uring_cq_advance(&ring, count); 告知內核已經處理完 count 個完成事件,通過將完成隊列的頭部指針遞增 count 個位置,以便內核可以繼續使用完成隊列。
3.3性能對比測試
⑴測試環境與方法
測試環境搭建:在一臺配備 Intel (R) Xeon (R) CPU E5 - 2682 v4 @ 2.50GHz 處理器、16GB 內存、運行 Linux 5.10 內核的服務器上進行測試。使用的存儲設備為 NVMe SSD,以確保 I/O 性能不受磁盤性能的過多限制。測試機器的網絡配置為千兆以太網,以保證網絡傳輸的穩定性。
⑵測試方法設計
針對文件讀寫場景,使用 fio 工具進行測試。分別設置不同的 I/O 模式,包括阻塞 I/O、非阻塞 I/O、epoll 以及 io_uring。對于每種模式,進行多次測試,每次測試設置不同的文件大?。ㄈ?1MB、10MB、100MB)和 I/O 操作類型(如隨機讀、順序讀、隨機寫、順序寫)。在每次測試中,fio 工具會按照設定的參數進行 I/O 操作,并記錄操作的時間、吞吐量等性能指標。例如,在隨機讀測試中,fio 會隨機讀取文件中的數據塊,并統計單位時間內讀取的數據量。
在網絡編程場景下,搭建一個簡單的 echo 服務器模型,分別使用 epoll 和 io_uring 實現。客戶端通過多線程模擬大量并發連接,向服務器發送數據并接收服務器回顯的數據。在測試過程中,逐漸增加并發連接數,從 100 個連接開始,每次增加 100 個,直到達到 1000 個連接。使用 iperf 等工具測量不同并發連接數下的 QPS(每秒查詢率)、延遲等性能指標。iperf 工具會在客戶端和服務器之間建立 TCP 連接,發送一定量的數據,并記錄數據傳輸的速率、延遲等信息。
⑶測試結果分析
文件讀寫性能:在小文件(1MB)讀寫測試中,阻塞 I/O 由于線程阻塞等待 I/O 操作完成,導致其吞吐量最低,平均吞吐量約為 50MB/s。非阻塞 I/O 雖然避免了線程阻塞,但頻繁的輪詢使得 CPU 利用率較高,且由于 I/O 操作的碎片化,其吞吐量也不高,平均約為 80MB/s。epoll 在處理多個文件描述符的 I/O 事件時,通過高效的事件通知機制,提高了 I/O 操作的效率,平均吞吐量達到 120MB/s。
四、io_uring的應用場景及未來發展
4.1適用場景探討
數據庫系統:在數據庫系統中,大量的數據讀寫操作對 I/O 性能要求極高。io_uring 的高效異步 I/O 特性能夠顯著提升數據庫的性能。以關系型數據庫 MySQL 為例,在處理大量并發查詢和更新操作時,傳統的 I/O 模型會導致線程頻繁阻塞和上下文切換,從而降低系統的響應速度。而使用 io_uring,MySQL 可以將 I/O 請求異步提交到內核,內核在后臺處理這些請求,當請求完成時,通過完成隊列通知 MySQL。這樣,MySQL 的線程在 I/O 操作期間可以繼續執行其他任務,如查詢優化、事務處理等,大大提高了系統的并發處理能力。
同時,io_uring 支持直接 I/O 模式,這對于數據庫系統來說非常重要,因為數據庫通常需要直接訪問存儲設備以提高數據讀寫的效率,避免了操作系統頁緩存帶來的額外開銷。此外,io_uring 的批量提交和處理能力,使得數據庫在進行大規模數據導入、導出等操作時,能夠一次性提交多個 I/O 請求,減少系統調用次數,進一步提升了 I/O 性能。
網絡服務器:在網絡服務器領域,io_uring 同樣展現出了巨大的優勢。以 Nginx 服務器為例,傳統的基于 epoll 的 I/O 模型在處理高并發連接時,雖然通過事件驅動機制提高了 I/O 的效率,但在 I/O 操作過程中,仍然存在一定的上下文切換開銷。而 io_uring 通過用戶態和內核態共享的提交隊列和完成隊列,減少了系統調用和上下文切換的次數,使得 Nginx 在處理大量并發連接時,能夠更加高效地進行數據的讀寫操作。
例如,當有大量客戶端同時請求 Nginx 服務器時,Nginx 可以使用 io_uring 將這些請求的 I/O 操作異步提交到內核,內核在后臺處理這些請求,并將完成結果放入完成隊列。Nginx 可以隨時從完成隊列中獲取完成的 I/O 操作結果,進行相應的處理,如返回響應數據給客戶端。這種方式大大提高了 Nginx 的并發處理能力,降低了延遲,提升了服務器的性能和響應速度。同時,io_uring 支持網絡套接字的異步操作,使得 Nginx 在處理網絡連接的建立、斷開以及數據傳輸等操作時,能夠更加靈活和高效。
文件存儲系統:在文件存儲系統中,io_uring 的應用可以有效提升文件的讀寫性能和系統的整體效率。以 Ceph 分布式文件系統為例,它需要處理大量的文件讀寫請求,并且要保證數據的一致性和可靠性。使用 io_uring 后,Ceph 可以將文件讀寫請求異步提交到內核,利用內核的高效 I/O 處理能力來完成這些請求。
在文件讀取時,io_uring 可以提前將文件數據預讀到內存中,當應用程序請求數據時,能夠快速從內存中獲取,減少了磁盤 I/O 的等待時間。在文件寫入時,io_uring 可以將數據異步寫入磁盤,同時應用程序可以繼續執行其他任務,提高了系統的并發性能。此外,io_uring 的零拷貝特性在文件存儲系統中也具有重要意義,它減少了數據在內存中的拷貝次數,提高了數據傳輸的效率,降低了 CPU 的開銷。這對于大規模文件存儲系統來說,能夠顯著提升系統的性能和可擴展性,更好地滿足用戶對文件存儲和訪問的需求。
4.2未來發展趨勢展望
內核支持的增強:隨著 Linux 內核的不斷發展,對 io_uring 的支持有望進一步增強。未來的內核版本可能會優化 io_uring 的實現,減少其在高并發場景下的鎖競爭和資源爭用問題,從而進一步提升其性能。在多線程同時訪問提交隊列和完成隊列時,內核可能會采用更高效的無鎖數據結構或優化的鎖機制,以確保多個線程能夠高效地進行 I/O 請求的提交和結果的獲取。此外,內核可能會增加對更多設備和文件系統的支持,使 io_uring 能夠更好地應用于各種硬件平臺和存儲設備。
例如,對于新型的存儲設備,如基于 3D XPoint 技術的非易失性內存,內核可能會優化 io_uring 的驅動程序,充分發揮這些設備的高性能優勢。同時,對于不同的文件系統,如 ext4、XFS、Btrfs 等,內核可能會針對 io_uring 進行特定的優化,提高其在不同文件系統上的兼容性和性能表現。
應用領域的拓展:io_uring 在未來有望拓展到更多的應用領域。隨著物聯網(IoT)的快速發展,大量的物聯網設備需要進行高效的數據傳輸和處理。io_uring 可以應用于物聯網網關和邊緣計算設備中,提高這些設備在處理大量傳感器數據和設備通信時的 I/O 性能。在智能工廠中,物聯網網關需要實時采集和處理大量的生產設備數據,使用 io_uring 可以實現高效的異步 I/O 操作,確保數據的及時傳輸和處理,提高生產效率和設備的智能化管理水平。
此外,在大數據處理和人工智能領域,io_uring 也具有廣闊的應用前景。大數據處理框架如 Hadoop、Spark 等,在處理大規模數據集時,需要進行大量的文件讀寫和網絡傳輸操作,io_uring 可以提高這些框架的數據處理速度和效率。在人工智能訓練和推理過程中,需要頻繁地讀取和寫入模型數據和訓練樣本,io_uring 的高效 I/O 特性可以加速這些操作,提升人工智能系統的性能和響應速度。
五、io_uring代碼實踐
#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#define EVENT_ACCEPT 0
#define EVENT_READ 1
#define EVENT_WRITE 2
struct conn_info
{
int fd;
int event;
};
int init_server(unsigned short port)
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(struct sockaddr_in));
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(port);
if (-1 == bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr)))
{
perror("bind");
return -1;
}
listen(sockfd, 10);
return sockfd;
}
#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024
int set_event_recv(struct io_uring *ring, int sockfd,
void *buf, size_t len, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_READ,
};
io_uring_prep_recv(sqe, sockfd, buf, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
int set_event_send(struct io_uring *ring, int sockfd,
void *buf, size_t len, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_WRITE,
};
io_uring_prep_send(sqe, sockfd, buf, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,
socklen_t *addrlen, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_ACCEPT,
};
io_uring_prep_accept(sqe, sockfd, (struct sockaddr *)addr, addrlen, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
int main(int argc, char *argv[])
{
unsigned short port = 9999;
int sockfd = init_server(port);
struct io_uring_params params;
memset(?ms, 0, sizeof(params));
struct io_uring ring;
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ?ms);
#if 0
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
#endif
char buffer[BUFFER_LENGTH] = {0};
while (1)
{
io_uring_submit(&ring);
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe);
struct io_uring_cqe *cqes[128];
int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // epoll_wait
int i = 0;
for (i = 0; i < nready; i++)
{
struct io_uring_cqe *entries = cqes[i];
struct conn_info result;
memcpy(&result, &entries->user_data, sizeof(struct conn_info));
if (result.event == EVENT_ACCEPT)
{
set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
// printf("set_event_accept\n"); //
int connfd = entries->res;
set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
}
else if (result.event == EVENT_READ)
{ //
int ret = entries->res;
// printf("set_event_recv ret: %d, %s\n", ret, buffer); //
if (ret == 0)
{
close(result.fd);
}
else if (ret > 0)
{
set_event_send(&ring, result.fd, buffer, ret, 0);
}
}
else if (result.event == EVENT_WRITE)
{
//
int ret = entries->res;
// printf("set_event_send ret: %d, %s\n", ret, buffer);
set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
}
}
io_uring_cq_advance(&ring, nready);
}
}
5.1服務器初始化
int init_server(unsigned short port)
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(struct sockaddr_in));
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(port);
if (-1 == bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr)))
{
perror("bind");
return -1;
}
listen(sockfd, 10);
return sockfd;
}
該函數初始化了一個 TCP 服務器套接字,用于監聽客戶端連接請求。socket、bind 和 listen 是常規的服務器初始化步驟,將服務器綁定到指定的端口,并使其開始監聽客戶端連接。
5.2io_uring 環境初始化
struct io_uring_params params;
memset(?ms, 0, sizeof(params));
struct io_uring ring;
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ?ms);
io_uring_queue_init_params 函數初始化了一個 io_uring 實例,這個實例將用于管理所有的異步 I/O 操作,ENTRIES_LENGTH 定義了提交隊列和完成隊列的大小,表示可以同時處理的最大 I/O 操作數量。
5.3設置 accept 事件
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
set_event_accept 函數將一個 accept 操作添加到 io_uring 的提交隊列中。這個操作用于接受客戶端連接請求。這一步是服務器啟動時的初始操作,它告訴 io_uring 開始監聽并處理客戶端連接。
5.4主循環:提交操作和處理完成事件
while (1)
{
io_uring_submit(&ring);
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe);
struct io_uring_cqe *cqes[128];
int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
- io_uring_submit:將之前添加到提交隊列中的所有操作提交給內核,由內核異步執行這些操作。
- io_uring_wait_cqe:等待至少一個操作完成,這是一個阻塞調用。
- io_uring_peek_batch_cqe:批量獲取已經完成的操作結果,nready 表示完成的操作數量。
5.5處理完成的事件
for (i = 0; i < nready; i++)
{
struct io_uring_cqe *entries = cqes[i];
struct conn_info result;
memcpy(&result, &entries->user_data, sizeof(struct conn_info));
if (result.event == EVENT_ACCEPT)
{
set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
int connfd = entries->res;
set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
}
else if (result.event == EVENT_READ)
{
int ret = entries->res;
if (ret == 0)
{
close(result.fd);
}
else if (ret > 0)
{
set_event_send(&ring, result.fd, buffer, ret, 0);
}
}
else if (result.event == EVENT_WRITE)
{
int ret = entries->res;
set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
}
}
- EVENT_ACCEPT:處理 accept 事件。當一個新的客戶端連接到來時,io_uring 完成隊列會返回 EVENT_ACCEPT 事件,表示一個新的連接已經建立。此時,服務器會:重新設置 accept 事件,繼續監聽新的客戶端連接。獲取新連接的文件描述符 connfd,并設置一個 recv 事件來準備接收數據。
- EVENT_READ:處理 recv 事件。當從客戶端接收到數據時,io_uring 返回 EVENT_READ 事件。如果接收到的數據長度大于0,則會設置一個 send 事件來將數據發送回客戶端。如果 ret == 0,說明客戶端關閉了連接,則關閉文件描述符。
- EVENT_WRITE:處理 send 事件。當數據成功發送給客戶端后,io_uring 返回 EVENT_WRITE 事件。此時,服務器會再次設置一個 recv 事件,準備接收更多數據。
5.6完成隊列的推進
io_uring_cq_advance(&ring, nready);
這個函數通知 io_uring,你已經處理完了 nready 個完成隊列條目(CQE)。io_uring 可以釋放這些 CQE 供后續操作使用。
總結
io_uring 的作用:在這個示例中,io_uring 被用來高效地處理網絡 I/O 操作。通過異步提交和處理 accept、recv、send 操作,服務器能夠高效處理多個并發連接,而無需阻塞等待每個I/O操作完成。
異步模型:io_uring 提供了一種低延遲、高并發的異步 I/O 處理方式。操作在提交后由內核異步執行,完成后再由應用程序查詢并處理結果。這種方式大大減少了系統調用的開銷,提高了程序的并發處理能力。
關鍵點:
- 提交操作:使用 io_uring_prep_* 函數準備操作,并提交給內核處理。
- 等待完成:使用 io_uring_wait_cqe 等方法等待操作完成,并獲取結果。
- 處理結果:根據完成隊列中的事件類型(如 EVENT_ACCEPT、EVENT_READ、EVENT_WRITE)進行相應的處理和后續操作。