成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

一篇帶你了解io_uring和Node.js

開發 前端
io_uring是大神Jens Axboe開發的異步IO框架,在Linux內核5.1引入。本文介紹什么是異步框架和io_uring的一些基礎內容,最后介紹Node.js(Libuv)中,之前有人提但至今還沒有合并的一個關于io_uring的pr。

[[409009]]

前言:io_uring是大神Jens Axboe開發的異步IO框架,在Linux內核5.1引入。本文介紹什么是異步框架和io_uring的一些基礎內容,最后介紹Node.js(Libuv)中,之前有人提但至今還沒有合并的一個關于io_uring的pr。

1 io_uring介紹

在io_uring之前,Linux沒有成熟的異步IO能力,什么是異步IO呢?回想我們讀取資源的過程,我們可以以阻塞或非阻塞的模式調用read、readv,也可以通過epoll監聽文件描述符和事件的方式,在回調里調用read系列函數進行讀取,這些API有個共同的地方是,不管是主動探還是被動探測資源是否可讀,當可讀的時候,都需要進程自己去執行讀操作。而io_uring強大的地方是,進程不需要再自己主動執行讀操作,而是內核讀完后通知進程,相比epoll,io_uring又進了一步,類似的能力是windows的IOCP。

2 io_uring基本使用

2.1 初始化

io_uring和epoll一樣,API不多,但是io_uring比epoll復雜得多。我們首先需要調用io_uring_setup初始化io_uring,拿到一個fd。

  1. int ring_fd; 
  2.  
  3. unsigned *sring_tail, *sring_mask, *sring_array, *cring_head, *cring_tail, *cring_mask; 
  4.  
  5. struct io_uring_sqe *sqes; 
  6.  
  7. struct io_uring_cqe *cqes; 
  8.  
  9. char buff[BLOCK_SZ]; 
  10.  
  11. off_t offset; 
  12.  
  13. struct io_uring_params p; 
  14.  
  15. void *sq_ptr, *cq_ptr; 
  16.  
  17. memset(&p, 0, sizeof(p)); 
  18.  
  19. // 拿到io_uring對應的fd 
  20.  
  21. int ring_fd = io_uring_setup(QUEUE_DEPTH, &p); 
  22.  
  23. int sring_sz = p.sq_off.array + p.sq_entries * sizeof(unsigned); 
  24.  
  25. int cring_sz = p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe); 
  26.  
  27. // 映射ring_fd到mmap返回的地址,我們可以以操作返回地址的方式操作ring_fd,達到用戶和內核共享數據的目的 
  28.  
  29. cq_ptr = sq_ptr = mmap(0, sring_sz, PROT_READ | PROT_WRITE, 
  30.               MAP_SHARED | MAP_POPULATE, 
  31.               ring_fd, IORING_OFF_SQ_RING); 
  32.  
  33. sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe), 
  34.                PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 
  35.                ring_fd, IORING_OFF_SQES); 
  36.  
  37.  
  38.  
  39. // 保存任務隊列和完成隊列的地址,后續提交任務和獲取完成任務節點時需要用 
  40.  
  41. sring_tail = sq_ptr + p.sq_off.tail; 
  42. sring_mask = sq_ptr + p.sq_off.ring_mask; 
  43. sring_array = sq_ptr + p.sq_off.array; 
  44. cring_head = cq_ptr + p.cq_off.head; 
  45. cring_tail = cq_ptr + p.cq_off.tail; 
  46. cring_mask = cq_ptr + p.cq_off.ring_mask; 
  47. cqes = cq_ptr + p.cq_off.cqes; 

io_uring不僅實現非常復雜,就連使用也非常復雜,但是目前只需要大致了解原理就好了。上面的代碼主要目的有以下幾個。

1 出生后io_uring并拿到一個io_uring實例對應的fd。

2 通過mmap映射io_uring對應的fd到一個內存地址,后續我們就可以通過操作內存地址的方式和內核通信。

3 保存任務隊列和完成隊列的地址信息,后續需要用到。

2.2 提交任務

我們看到io_uring底層維護了任務隊列(sq)和完成隊列兩個隊列(cq)。對應的節點叫sqe和cqe。當我們需要操作一個資源的時候,就可以獲取一個seq,并且填充字段,然后提交給內核,我們看一下sqe的核心字段。

  1. struct io_uring_sqe { 
  2.         __u8    opcode; /* 操作類型,比如讀、寫 */ 
  3.         __s32   fd; /* 資源對應的fd */ 
  4.         __u64   off; /* 資源的偏移(操作的起點) */ 
  5.         __u64   addr; /* 保存數據的內存首地址 */ 
  6.         __u32   len; /* 數據長度 */ 
  7.         __u64   user_data;  /* 用戶定義的字段,通常用于關聯請求和響應 */ 
  8.         __u8    flags; /* 標記 */ 
  9.         ... 
  10.  
  11. }; 

io_uring_sqe的核心字段都比較好理解,構造了一個請求后,就插入到內核的請求任務隊列。接著調用io_uring_enter通知內核,有需要處理的任務,我們可以在調用io_uring_enter的時候設置等待多少個請求完成后再返回。另外內核處理poll的模式,這時候內核會開啟內核線程去檢測任務是否完成,不需要進程調用io_uring_enter。下面是我們發送一個讀取請求的邏輯。

  1. unsigned index, tail; 
  2. tail = *sring_tail; 
  3.  
  4. // 拿到請求隊列的一個空閑位置,是一個環,需要做回環處理 
  5.  
  6. index = tail & *sring_mask; 
  7.  
  8. struct io_uring_sqe *sqe = &sqes[index]; 
  9.  
  10. // 初始化請求結構體 
  11.  
  12. sqe->opcode = op; 
  13.  
  14. // 讀取的fd 
  15.  
  16. sqe->fd = fd; 
  17.  
  18. // 讀取的數據保存到buff 
  19.  
  20. // 可以通過關聯buff,等到響應的時候能找到對應的請求上下文 
  21.  
  22. sqe->addr = (unsigned long) buff; 
  23. sqe->user_data = (unsigned long long) buff; 
  24.  
  25. memset(buff, 0, sizeof(buff)); 
  26.  
  27. sqe->len = BLOCK_SZ; 
  28. sqe->off = offset; 
  29.  
  30. // 插入請求隊列 
  31.  
  32. sring_array[index] = index
  33.  
  34. // 更新索引 
  35.  
  36. tail++; 
  37.  
  38. // 通知內核有任務需要處理,并等待有一個任務完成后再返回 
  39.  
  40. io_uring_smp_store_release(sring_tail, tail); 
  41.  
  42. int ret =  io_uring_enter(ring_fd, 1,1, IORING_ENTER_GETEVENTS); 

2.3 任務完成

當任務完成的時候,io_uring_enter就會返回。但是這里有個問題,請求任務和響應不是對應的,內核不保證任務完成的順序,內核只是告訴我們哪些任務完成了,我們可以通過user_data關聯請求和響應,類似rpc通信里的seq一樣。user_data字段在請求里設置,響應里會返回,從而請求方知道這個響應對應的是哪個請求。響應對應的結構體比較簡單。

  1. struct io_uring_cqe { 
  2.  
  3.     /* 用戶定義字段,通常用于關聯請求和響應 */ 
  4.  
  5.     __u64   user_data;   
  6.  
  7.     /* 系統調用的返回碼,比如read */ 
  8.  
  9.     __s32   res;         
  10.  
  11.     // 暫時沒用到 
  12.  
  13.     __u32   flags;   
  14.  
  15. }; 

我們這里假設請求和響應是串行的,所以不需要用到user_data字段關聯請求和響應。從前面代碼我們可以看到,我們把數據讀取到buff變量里。我們看看內核返回后我們的處理邏輯。

  1. struct io_uring_cqe *cqe; 
  2.  
  3. unsigned head, reaped = 0; 
  4.  
  5. // 拿到完成隊列隊頭節點,可消費buff里面存儲的數據 
  6.  
  7. head = io_uring_smp_load_acquire(cring_head); 
  8.  
  9. cqe = &cqes[head & (*cring_mask)]; 
  10.  
  11. // 更新頭索引 
  12.  
  13. head++; 
  14. io_uring_smp_store_release(cring_head, head); 

這就是io_uring一個讀取操作的大致過程,我們看到用戶層面的邏輯還是挺復雜的,作者也想到了,所以又封裝了Liburing庫簡化使用。

3 Liburing的使用

那么我們到底怎么使用它呢,我們回想epoll的使用。

  1. // 創建epoll 實例 
  2.  
  3. int epollfd = epoll_create(); 
  4.  
  5. // 封裝fd和訂閱事件 
  6.  
  7. struct epoll_event event;  
  8.  
  9. event.events = EPOLLIN;  
  10. event.data.fd = listenFd;  
  11. // 注冊到epoll 
  12.  
  13. epoll_ctl(epollfd, EPOLL_CTL_ADD, listenFd, &event);  
  14.  
  15. // 等待事件觸發 
  16.  
  17. int num = epoll_wait(epollfd, events, MAX_EVENTS, -1);  
  18.  
  19. for (i = 0; i < num; ++i) {  
  20.     // 處理事件,比如讀寫 
  21.  

接著我們看看基于Liburing的o_uring的使用。

  1. // 拿到一個請求結構體 
  2.  
  3. struct io_uring_sqe *sqe = io_uring_get_sqe(ring);  
  4.  
  5. // 設置fd和數據地址 
  6.  
  7. io_uring_prep_recv(sqe, fd, data, len, 0);  
  8.  
  9. // 通知內核有任務處理 
  10.  
  11. io_uring_submit(&ring);  
  12.  
  13. // 等待事件完成 
  14.  
  15. io_uring_submit_and_wait(&ring, 1);  
  16.  
  17. // 獲取完成的任務 
  18.  
  19. int nums = io_uring_peek_batch_cqe(&ring, cqes, sizeof(cqes) / sizeof(cqes[0]));     
  20.  
  21. for (i = 0; i < nums; ++i) {  
  22.     // 處理完成的任務 
  23.     struct io_uring_cqe *cqe = cqes[i];  

我們看到基于Liburing的使用簡單了很多,有點epoll的風格了。io_uring就介紹到這里,io_uring的細節比較多,實現也比較復雜,代碼量也達到了近1萬行(epoll是2500左右),關于io_uring網上有非常多講解得非常好的文章,大家可以自行閱讀。

4 Node.js中的io_uring

最后介紹一下之前看到的一個Node.js的pr(https://github.com/libuv/libuv/pull/2322),這個pr引入了io_uring。雖然不是取代epoll對Libuv的核心進行重構,但是依然值得探討。該pr涉及了150+文件,不過大部分是Liburing的代碼,我們只關注核心改動。首先Libuv初始化的時候做了一個處理。

  1. // loop里做了修改 
  2.  
  3. struct loop { 
  4.  
  5.   ... 
  6.   // int backend_fd; 改成下面的聯合體  
  7.   union {                                                                      
  8.     int fd;                                                                    
  9.     void* data;                                                                
  10.   } backend;  
  11.  
  12.  
  13.  
  14. // 定義一個使用io_uring時的結構體 
  15.  
  16. struct uv__backend_data_io_uring { 
  17.  
  18.   // io_uring的fd 
  19.   int fd; 
  20.   // 等待io_uring處理的任務個數 
  21.   int32_t pending; 
  22.   // io_uring相關結構體 
  23.   struct io_uring ring; 
  24.   // 用于epoll中監聽io_uring是否有事件觸發 
  25.   uv_poll_t poll_handle; 
  26.  
  27. }; 
  28.  
  29.  
  30.  
  31. // 分配一個uv__backend_data_io_uring結構體 
  32.  
  33. backend_data = uv__malloc(sizeof(*backend_data)); 
  34. ring = &backend_data->ring; 
  35.  
  36. // 初始化io_uring 
  37.  
  38. rc = io_uring_queue_init(IOURING_SQ_SIZE, ring, 0); 
  39.  
  40. // epoll的fd 
  41.  
  42. backend_data->fd = fd; 
  43.  
  44. // 初始化 
  45.  
  46. uv__handle_init(loop, &backend_data->poll_handle, UV_POLL); 
  47.  
  48. backend_data->poll_handle.flags |= UV_HANDLE_INTERNAL; 
  49.  
  50. // 初始化poll_handle的io觀察者,fd是io_uring的fd,回調是uv__io_uring_done。 
  51.  
  52. uv__io_init(&backend_data->poll_handle.io_watcher, 
  53.  
  54.             uv__io_uring_done, 
  55.             ring->ring_fd); 
  56.  
  57. loop->flags |= UV_LOOP_USE_IOURING; 
  58. loop->backend.data = backend_data; 

我們看到初始化時對io_uring進行了初始化并且初始化了一個io觀察者。接下來我們看在哪里使用。

  1. int uv_fs_read(uv_loop_t* loop, uv_fs_t* req, 
  2.                uv_file file, 
  3.                const uv_buf_t bufs[], 
  4.                unsigned int nbufs, 
  5.                int64_t off
  6.                uv_fs_cb cb) { 
  7.   int rc; 
  8.  
  9.   INIT(READ); 
  10.   req->file = file; 
  11.   req->nbufs = nbufs; 
  12.   req->bufs = req->bufsml; 
  13.   memcpy(req->bufs, bufs, nbufs * sizeof(*bufs)); 
  14.   req->off = off
  15.   /* 
  16.     優先調用uv__platform_fs_read,不支持則降級到原來線程池的方案 
  17.     static int uv__fs_retry_with_threadpool(int rc) { 
  18.       return rc == UV_ENOSYS || rc == UV_ENOTSUP || rc == UV_ENOMEM; 
  19.     } 
  20.   */ 
  21.   rc = uv__platform_fs_read(loop, req, file, bufs, nbufs, off, cb); 
  22.   if (!uv__fs_retry_with_threadpool(rc)) 
  23.     return rc; 
  24.  
  25.   // 走到這說明使用降級方案 
  26.   POST; 
  27.  

uv_fs_read函數是讀取文件內容時執行的函數,之前時候給線程池提交一個任務,修改后,加了個前置的邏輯uvplatform_fs_read。我們看看uvplatform_fs_read。

  1. int uv__platform_fs_read(uv_loop_t* loop, 
  2.                          uv_fs_t* req, 
  3.                          uv_os_fd_t file, 
  4.                          const uv_buf_t bufs[], 
  5.                          unsigned int nbufs, 
  6.                          int64_t off
  7.                          uv_fs_cb cb) { 
  8.   return uv__io_uring_fs_work(IORING_OP_READV, 
  9.                               loop, 
  10.                               req, 
  11.                               file, 
  12.                               bufs, 
  13.                               nbufs, 
  14.                               off
  15.                               cb);}int uv__io_uring_fs_work(uint8_t opcode, 
  16.                          uv_loop_t* loop, 
  17.                          uv_fs_t* req, 
  18.                          uv_os_fd_t file, 
  19.                          const uv_buf_t bufs[], 
  20.                          unsigned int nbufs, 
  21.                          int64_t off
  22.                          uv_fs_cb cb) { 
  23.   struct uv__backend_data_io_uring* backend_data; 
  24.   struct io_uring_sqe* sqe; 
  25.   int submitted; 
  26.   uint32_t incr_val; 
  27.   uv_poll_t* handle; 
  28.  
  29.   backend_data = loop->backend.data; 
  30.  
  31.   incr_val = (uint32_t)backend_data->pending + 1; 
  32.   // 獲取一個請求結構體 
  33.   sqe = io_uring_get_sqe(&backend_data->ring); 
  34.   // 初始化請求 
  35.   sqe->opcode = opcode; 
  36.   sqe->fd = file; 
  37.   sqe->off = off
  38.   sqe->addr = (uint64_t)req->bufs;  
  39.   sqe->len = nbufs; 
  40.   // 管理req上下文,任務完成時會用到 
  41.   sqe->user_data = (uint64_t)req; 
  42.   // 提交給內核,非阻塞式調用,返回提交任務的個數 
  43.   submitted = io_uring_submit(&backend_data->ring); 
  44.   // 提交成功 
  45.   if (submitted == 1) { 
  46.     req->priv.fs_req_engine |= UV__ENGINE_IOURING; 
  47.     // 提交的時是第一個任務,則注冊io觀察者的等待可讀事件 
  48.     if (backend_data->pending++ == 0) { 
  49.       handle = &backend_data->poll_handle; 
  50.       uv__io_start(loop, &handle->io_watcher, POLLIN); 
  51.       uv__handle_start(handle); 
  52.     } 
  53.     return 0; 
  54.   } 
  55.   return UV__ERR(errno); 
  56.  

我們看到上面的代碼會給內核提交一個任務,但是不會等待內核返回,并在提交第一個任務的時候給epoll注冊一個等待可讀事件。我們看看io_uring的poll接口的實現(epoll原理可參考之前的文章)。

  1. static __poll_t io_uring_poll(struct file *file, poll_table *wait){ 
  2.     struct io_ring_ctx *ctx = file->private_data; 
  3.     __poll_t mask = 0; 
  4.  
  5.     poll_wait(file, &ctx->cq_wait, wait); 
  6.     smp_rmb(); 
  7.     // 提交隊列沒滿則可寫 
  8.     if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head != 
  9.         ctx->rings->sq_ring_entries) 
  10.         mask |= EPOLLOUT | EPOLLWRNORM; 
  11.     // 完成隊列非空則可讀 
  12.     if (io_cqring_events(ctx, false)) 
  13.         mask |= EPOLLIN | EPOLLRDNORM; 
  14.  
  15.     return mask;}static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush){ 
  16.     struct io_rings *rings = ctx->rings; 
  17.     smp_rmb(); 
  18.     // 完成隊列非空則可讀 
  19.     return ctx->cached_cq_tail - READ_ONCE(rings->cq.head); 
  20.  

所以當io_uring有任務完成,即完成隊列非空的時候,就會在Libuv的poll io被檢測到,從而執行回調。

  1. void uv__io_uring_done(uv_loop_t* loop, uv__io_t* w, unsigned int events) { 
  2.   uv_poll_t* handle; 
  3.   struct io_uring* ring; 
  4.   struct uv__backend_data_io_uring* backend_data; 
  5.   struct io_uring_cqe* cqe; 
  6.   uv_fs_t* req; 
  7.   int finished1; 
  8.  
  9.   handle = container_of(w, uv_poll_t, io_watcher); 
  10.   backend_data = loop->backend.data; 
  11.   ring = &backend_data->ring; 
  12.  
  13.   finished1 = 0; 
  14.   while (1) {  
  15.     // 獲取完成節點 
  16.     io_uring_peek_cqe(ring, &cqe); 
  17.     // 全部任務完成則注銷事件 
  18.     if (--backend_data->pending == 0) 
  19.       uv_poll_stop(handle); 
  20.     // 獲取響應對應的請求上下文 
  21.     req = (void*) (uintptr_t) cqe->user_data; 
  22.  
  23.     if (req->result == 0) 
  24.       req->result = cqe->res; 
  25.     io_uring_cq_advance(ring, 1); 
  26.     // 執行回調 
  27.     req->cb(req); 
  28.   } 
  29.  

至此我們看到了這個pr的邏輯,主要是為文件io引入了io_uring,文件io因為兼容性問題,在Libuv中使用線程池實現的,而io_uring支持普通文件,自然可以用于在Linux新版本上替換掉線程池方案。

后記:io_uring既強大又復雜。一切都交給內核來處理,完成后通知我們,我們不僅不需要再手動執行read,同時也減少了系統調用的成本,尤其需要多次read的時候。看起來是一個很棒的事情,io_uring---Linux上真正的異步IO。但其中所蘊含的知識遠不止于此,有空再更。

 

責任編輯:姜華 來源: 編程雜技
相關推薦

2021-11-24 08:51:32

Node.js監聽函數

2023-05-12 07:31:58

NuxtVue.js

2023-02-07 19:46:35

NIOCQ內核

2023-04-12 18:36:20

IO框架內核

2021-07-07 23:38:05

內核IOLinux

2023-12-28 11:24:29

IO系統請求

2021-05-20 06:57:16

RabbitMQ開源消息

2023-10-20 06:26:51

Libuvio_uring

2021-07-11 23:25:29

Libuvepoll文件

2021-08-25 06:33:52

Node.jsVscode調試工具

2021-09-05 17:46:21

云計算No.jsio_uringJS

2025-06-27 01:44:00

Linux異步IO

2022-03-20 06:40:31

Node.jsperf_hooks性能數據

2023-05-12 08:19:12

Netty程序框架

2021-07-28 10:02:54

建造者模式代碼

2021-07-14 08:24:23

TCPIP 通信協議

2021-06-30 00:20:12

Hangfire.NET平臺

2021-08-11 07:02:21

npm包管理器工具

2021-08-02 06:34:55

Redis刪除策略開源

2021-11-08 08:42:44

CentOS Supervisor運維
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 日日日日操 | 亚洲一二三区在线观看 | 亚洲一区二区在线播放 | 99精品国产一区二区三区 | 精品一二三区 | 日韩在线精品视频 | 精品国产一区二区三区久久 | 亚洲欧美激情四射 | 欧美日韩亚洲在线 | 一级片片 | 欧美区在线 | 日本大香伊一区二区三区 | 在线观看中文字幕视频 | 婷婷色国产偷v国产偷v小说 | 这里只有精品99re | 久久久久久久综合 | 波多野结衣一区二区三区 | 色视频在线播放 | 美女张开腿露出尿口 | 熟女毛片| 国产在线视频在线观看 | 成人在线电影网站 | 在线观看av网站 | 日韩精品在线播放 | 中文字幕av亚洲精品一部二部 | 国产精品视频在线免费观看 | 91在线观看| 中文在线一区二区 | 成人欧美一区二区三区白人 | 午夜影院在线观看免费 | 久久婷婷麻豆国产91天堂 | 日韩成人影院在线观看 | 中文字幕精品一区久久久久 | 精品国产91亚洲一区二区三区www | 成人午夜精品 | 影音先锋中文字幕在线观看 | 最新中文字幕在线播放 | 中文字幕一区在线观看视频 | 天天操网 | 一区二区在线免费播放 | 一区二区三区亚洲 |