成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

一文搞懂epoll:高效I/O多路復用的核心技術

開發 前端
epoll 作為一種高效的 I/O 多路復用技術,與傳統的 select 和 poll 相比,具有許多獨特的優勢和強大的性能。它就像是一把專門為高并發場景打造的利器,能夠讓我們的程序在處理大量連接時更加高效、穩定。

在 Linux 系統中,處理 I/O 操作有多種方式,像我們熟知的 select 和 poll 等。在連接數較少的情況下,它們或許還能應付自如,但一旦面對大量的并發連接,它們的性能就會大打折扣,就像小馬拉大車一樣,顯得力不從心。然而,有一個技術卻能在這種高并發的場景下脫穎而出,它就是 epoll。

epoll 作為一種高效的 I/O 多路復用技術,與傳統的 select 和 poll 相比,具有許多獨特的優勢和強大的性能。它就像是一把專門為高并發場景打造的利器,能夠讓我們的程序在處理大量連接時更加高效、穩定。那么,epoll 到底是如何做到的呢?讓我們一起深入理解 epoll,探尋它的奧秘吧。

一、epoll簡介

epoll是Linux內核為處理大批量文件描述符而作了改進的poll,是Linux下多路復用IO接口select/poll的增強版本,它能顯著提高程序在大量并發連接中只有少量活躍的情況下的系統CPU利用率。另一點原因就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入Ready隊列的描述符集合就行了。

epoll除了提供select/poll那種IO事件的水平觸發(Level Triggered)外,還提供了邊緣觸發(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態,減少epoll_wait/epoll_pwait的調用,提高應用程序效率。

1.1 epoll 初印象

epoll 可是 Linux 下多路復用 I/O 接口的 “超級增強版”,專為應對高并發而生。與傳統的 select 和 poll 相比,那優勢可不是一星半點。select 在處理大量并發連接時,就像個沒頭蒼蠅,每次都得把所有文件描述符集合一股腦從用戶態拷貝到內核態,開銷巨大,而且還得在內核里線性遍歷這些描述符,看看哪個 “有事”,效率低得感人,關鍵它還有個致命弱點,默認最多只能處理 1024 個文件描述符,稍微多點連接就應付不來。

poll 雖說在一些方面改進了 select,比如不需要計算最大文件描述符加一的大小,對大批文件描述符處理速度稍快,基于鏈表存儲沒了最大連接數限制,但本質上還是得遍歷所有描述符找就緒的,大量無謂的遍歷讓它在高并發下也力不從心。

epoll 就不一樣了,它采用全新的設計理念。當創建一個 epoll 實例后,在內核中有個精心構建的數據結構,像是用紅黑樹來高效管理所有要監聽的文件描述符,添加、刪除操作那叫一個快,時間復雜度僅 O (log n);還有個就緒列表,通常用雙向鏈表實現,專門存放已經就緒、有事件發生的文件描述符。

當調用 epoll_wait 時,壓根不用像 select、poll 那樣大海撈針般遍歷所有描述符,只需瞅瞅這個就緒列表就行,輕松定位到 “有事” 的連接,大大節省了 CPU 時間。就好比在一個大型倉庫里找幾件特定物品,select 和 poll 是逐個貨架、逐件貨物查看,epoll 則是有個智能清單,直接指引到目標貨物所在貨架,效率高下立判。這使得 epoll 在面對海量并發連接時,系統資源開銷小,響應迅速,成為眾多高性能網絡應用的堅實后盾。

epoll除了提供select/poll那種IO事件的電平觸發 (Level Triggered)外,還提供了邊沿觸發(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態,減少epoll_wait/epoll_pwait的調用,提高應用程序效率。Linux2.6內核中對/dev/epoll設備的訪問的封裝(system epoll)。這個使我們開發網絡應用程序更加簡單,并且更加高效。

1.2 為什么要使用epoll?

同樣,我們在linux系統下,影響效率的依然是I/O操作,linux提供給我們select/poll/epoll等多路復用I/O方式(kqueue暫時沒研究過),為什么我們對epoll情有獨鐘呢?原因如下:

⑴文件描述符數量的對比

epoll并沒有fd(文件描述符)的上限,它只跟系統內存有關,我的2G的ubuntu下查看是20480個,輕松支持20W個fd。可使用如下命令查看:

cat /proc/sys/fs/file-max

再來看select/poll,有一個限定的fd的數量,linux/posix_types.h頭文件中

#define __FD_SETSIZE    1024

⑵效率對比

當然了,你可以修改上述值,然后重新編譯內核,然后再次寫代碼,這也是沒問題的,不過我先說說select/poll的機制,估計你馬上會作廢上面修改枚舉值的想法。

select/poll會因為監聽fd的數量而導致效率低下,因為它是輪詢所有fd,有數據就處理,沒數據就跳過,所以fd的數量會降低效率;而epoll只處理就緒的fd,它有一個就緒設備的隊列,每次只輪詢該隊列的數據,然后進行處理。

⑶內存處理方式對比

不管是哪種I/O機制,都無法避免fd在操作過程中拷貝的問題,而epoll使用了mmap(是指文件/對象的內存映射,被映射到多個內存頁上),所以同一塊內存就可以避免這個問題。

btw:TCP/IP協議棧使用內存池管理sk_buff結構,你還可以通過修改內存池pool的大小,畢竟linux支持各種微調內核。

二、epoll核心原理

2.1 epoll的工作方式

epoll分為兩種工作方式LT和ET:

LT(level triggered) 是默認/缺省的工作方式,同時支持 block和no_block socket。這種工作方式下,內核會通知你一個fd是否就緒,然后才可以對這個就緒的fd進行I/O操作。就算你沒有任何操作,系統還是會繼續提示fd已經就緒,不過這種工作方式出錯會比較小,傳統的select/poll就是這種工作方式的代表。

ET(edge-triggered) 是高速工作方式,僅支持no_block socket,這種工作方式下,當fd從未就緒變為就緒時,內核會通知fd已經就緒,并且內核認為你知道該fd已經就緒,不會再次通知了,除非因為某些操作導致fd就緒狀態發生變化。如果一直不對這個fd進行I/O操作,導致fd變為未就緒時,內核同樣不會發送更多的通知,因為only once。所以這種方式下,出錯率比較高,需要增加一些檢測程序。

LT可以理解為水平觸發,只要有數據可以讀,不管怎樣都會通知。而ET為邊緣觸發,只有狀態發生變化時才會通知,可以理解為電平變化。

2.2 如何使用epoll?

使用epoll很簡單,只需要:

#include <sys/epoll.h>

有三個關鍵函數:

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_events* event);
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

當然了,不要忘記關閉函數。

epoll和select

epoll 和 select 的主要區別是:

epoll 監聽的 fd(file descriptor)集合是常駐內核的,它有 3 個系統調用 (epoll_create, epoll_wait, epoll_ctl),通過 epoll_wait

select 只有一個系統調用,每次要監聽都要將其從用戶態傳到內核,有事件時返回整個集合。

從性能上看,如果 fd 集合很大,用戶態和內核態之間數據復制的花銷是很大的,所以 select 一般限制 fd 集合最大1024。

從使用上看,epoll 返回的是可用的 fd 子集,select 返回的是全部,哪些可用需要用戶遍歷判斷。

盡管如此,epoll 的性能并不必然比 select 高,對于 fd 數量較少并且 fd IO 都非常繁忙的情況 select 在性能上有優勢。

2.3 epoll原理

⑴為什么要 I/O 多路復用

epoll 是一個優秀的 I/O 多路復用方式。所以,在講解 epoll 之前,我們先來看一下為什么需要 I/O 多路復用。

阻塞 OR 非阻塞

我們知道,對于 linux 來說,I/O 設備為特殊的文件,讀寫和文件是差不多的,但是 I/O 設備因為讀寫與內存讀寫相比,速度差距非常大。與 cpu 讀寫速度更是沒法比,所以相比于對內存的讀寫,I/O 操作總是拖后腿的那個。網絡 I/O 更是如此,我們很多時候不知道網絡 I/O 什么時候到來,就好比我們點了一份外賣,不知道外賣小哥們什么時候送過來,這個時候有兩個處理辦法:

  • 第一個是我們可以先去睡覺,外賣小哥送到樓下了自然會給我們打電話,這個時候我們在醒來取外賣就可以了。
  • 第二個是我們可以每隔一段時間就給外賣小哥打個電話,這樣就能實時掌握外賣的動態信息了。

第一種方式對應的就是阻塞的 I/O 處理方式,進程在進行 I/O 操作的時候,進入睡眠,如果有 I/O 時間到達,就喚醒這個進程。第二種方式對應的是非阻塞輪詢的方式,進程在進行 I/O 操作后,每隔一段時間向內核詢問是否有 I/O 事件到達,如果有就立刻處理。

線程池OR輪詢

在現實中,我們當然選擇第一種方式,但是在計算機中,情況就要復雜一些。我們知道,在 linux 中,不管是線程還是進程都會占用一定的資源,也就是說,系統總的線程和進程數是一定的。如果有許多的線程或者進程被掛起,無疑是白白消耗了系統的資源。而且,線程或者進程的切換也是需要一定的成本的,需要上下文切換,如果頻繁的進行上下文切換,系統會損失很大的性能。一個網絡服務器經常需要連接成千上萬個客戶端,而它能創建的線程可能之后幾百個,線程耗光就不能對外提供服務了。這些都是我們在選擇 I/O 機制的時候需要考慮的。這種阻塞的 I/O 模式下,一個線程只能處理一個流的 I/O 事件,這是問題的根源。

這個時候我們首先想到的是采用線程池的方式限制同時訪問的線程數,這樣就能夠解決線程不足的問題了。但是這又會有第二個問題了,多余的任務會通過隊列的方式存儲在內存只能夠,這樣很容易在客戶端過多的情況下出現內存不足的情況。

還有一種方式是采用輪詢的方式,我們只要不停的把所有流從頭到尾問一遍,又從頭開始。這樣就可以處理多個流了。

代理

采用輪詢的方式雖然能夠處理多個 I/O 事件,但是也有一個明顯的缺點,那就是會導致 CPU 空轉。試想一下,如果所有的流中都沒有數據,那么 CPU 時間就被白白的浪費了。

為了避免CPU空轉,可以引進了一個代理。這個代理比較厲害,可以同時觀察許多流的I/O事件,在空閑的時候,會把當前線程阻塞掉,當有一個或多個流有I/O事件時,就從阻塞態中醒來,于是我們的程序就會輪詢一遍所有的流。這就是 select 與 poll 所做的事情,可見,采用 I/O 復用極大的提高了系統的效率。

(2)核心原理大揭秘

紅黑樹 —— 精準管理的 “魔法樹”

紅黑樹在 epoll 里可是扮演著 “大管家” 的關鍵角色,它專門負責存儲和管理海量的文件描述符。這棵樹有著獨特的 “魔力”,它是一種自平衡的二叉搜索樹,意味著無論插入、刪除還是查找操作,時間復雜度都能穩穩地保持在 O (log n)。

想象一下,在高并發場景下,每秒有成千上萬個新連接涌入,每個連接對應一個文件描述符。要是沒有紅黑樹,查找一個特定的文件描述符就如同大海撈針,效率極其低下。而有了紅黑樹,就好比給每個文件描述符都安排了一個專屬的智能導航。當需要添加新連接(即新文件描述符)時,它能快速指引插入位置;要關閉某個連接刪除對應描述符,也能迅速定位并移除,絲毫不亂。舉個例子,在大型在線游戲服務器里,同時在線玩家眾多,網絡連接頻繁變動,紅黑樹就能高效管理這些連接,確保游戲運行順暢,玩家操作即時響應,不會因連接管理混亂而卡頓。

就緒鏈表 —— 即時響應的 “情報站”

就緒鏈表就像是 epoll 的 “情報收集站”。當某個被監聽的文件描述符狀態發生變化,比如有數據可讀、可寫,內核立馬知曉,并通過回調機制,閃電般地將這個就緒的文件描述符添加到就緒鏈表中。這個鏈表通常是用雙向鏈表實現,插入和刪除操作那叫一個快,時間復雜度僅 O (1)。

打個比方,這就好比快遞驛站收到了你的包裹(數據就緒),立馬把你的取件碼(文件描述符)放到一個專門的 “待取貨架”(就緒鏈表)上,你一來就能快速拿到包裹。對于應用程序而言,當調用 epoll_wait 時,根本不用費時費力去遍歷所有文件描述符,直接到這個 “情報站”—— 就緒鏈表瞅一眼,就能瞬間獲取所有已就緒的文件描述符,第一時間進行數據讀寫操作,大大提升了響應速度,讓數據處理快如閃電。

mmap—— 高效傳輸的 “隱形橋梁”

mmap 堪稱 epoll 實現高效的幕后英雄,它搭建起了內核空間與用戶空間的 “隱形橋梁”—— 共享內存。在傳統的 I/O 操作里,數據從內核緩沖區拷貝到用戶緩沖區,這個過程就像搬運工來回搬貨,費時費力,還增加系統開銷。

但有了 mmap 就不一樣了,它直接讓內核空間和用戶空間共享同一塊內存區域,數據來了,雙方都能直接訪問,減少了數據拷貝次數。這就好比圖書館有個公共書架,管理員(內核)和讀者(用戶程序)都能直接在上面取放書籍(數據),無需來回搬運。像是視頻流處理應用,大量視頻數據頻繁傳輸,mmap 使得數據能快速從內核流向用戶空間,減少傳輸延遲,讓視頻播放流暢無卡頓,極大提升了系統整體性能。

2.4 epoll優缺點

select 與 poll 的缺陷

上文中我們發現,實現一個代理來幫助我們處理 I/O 時間能夠極大的提高工作效率,select 與 poll 就是這樣的代理。但是它們也不是完美的,從上文中我們可以發現,我們能夠從 select 中知道是只是有 I/O 事件發生了。但是我們不知道那一個事件發生,每一個 I/O 事件發生的時候,都需要輪詢所有的流,這樣的時間復雜度 O(N)。但是很多情況下,發生 I/O 時間的只是少數的幾個。通過輪詢所有的找出少數的幾個發生 I/O 的流顯然效率非常低下,因此 select 和 epoll 通常只能處理幾千個并發連接。

epoll 的優勢

select的缺點之一就是在網絡IO流到來的時候,線程會輪詢監控文件數組,并且是線性掃描,還有最大值的限制。相比select,epoll則無需如此。服務器主線程創建了epoll對象,并且注冊socket和文件事件即可。當數據抵達的時候,也就是對于事件發生,則會調用此前注冊的那個io文件。

先看一個python的epoll例子,采用了網絡上一段著名的code:

import socket
import select

EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'

# 創建套接字對象并綁定監聽端口
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
serversocket.setblocking(0)

# 創建epoll對象,并注冊socket對象的 epoll可讀事件
epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)

try:
    connections = {}
    requests = {}
    responses = {}
    while True:
        # 主循環,epoll的系統調用,一旦有網絡IO事件發生,poll調用返回。這是和select系統調用的關鍵區別
        events = epoll.poll(1)
        # 通過事件通知獲得監聽的文件描述符,進而處理
        for fileno, event in events:
            # 注冊監聽的socket對象可讀,獲取連接,并注冊連接的可讀事件
            if fileno == serversocket.fileno():
                connection, address = serversocket.accept()
                connection.setblocking(0)
                epoll.register(connection.fileno(), select.EPOLLIN)
                connections[connection.fileno()] = connection
                requests[connection.fileno()] = b''
                responses[connection.fileno()] = response
            elif event & select.EPOLLIN:
                # 連接對象可讀,處理客戶端發生的信息,并注冊連接對象可寫
                requests[fileno] += connections[fileno].recv(1024)
                if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
                    epoll.modify(fileno, select.EPOLLOUT)
                    print('-' * 40 + '\n' + requests[fileno].decode()[:-2])
            elif event & select.EPOLLOUT:
                # 連接對象可寫事件發生,發送數據到客戶端
                byteswritten = connections[fileno].send(responses[fileno])
                responses[fileno] = responses[fileno][byteswritten:]
                if len(responses[fileno]) == 0:
                    epoll.modify(fileno, 0)
                    connections[fileno].shutdown(socket.SHUT_RDWR)
            elif event & select.EPOLLHUP:
                epoll.unregister(fileno)
                connections[fileno].close()
                del connections[fileno]
finally:
    epoll.unregister(serversocket.fileno())
    epoll.close()
    serversocket.close()

可見epoll使用也很簡單,并沒有過多復雜的邏輯,當然主要是在系統層面封裝的好。至于Epoll的原理,也不是三言兩語可以解釋清楚,作為開發者,先學會如何使用API。

epoll與tornado

既然epoll是一種高性能的網絡io模型,很多web框架也采取epoll模型。大名鼎鼎tornado是python框架中一個高性能的異步框架,其底層也是來者epoll的IO模型。

當然,tornado是跨平臺的,因此他的網絡io,在linux下是epoll,unix下則是kqueue。幸好tornado都做了封裝,對于開發者及其友好,下面看一個tornado寫的回顯例子。

import errno
import functools
import tornado.ioloop
import socket


def handle_connection(connection, address):
    """ 處理請求,返回數據給客戶端 """
    data = connection.recv(2014)
    print data
    connection.send(data)


def connection_ready(sock, fd, events):
    """ 事件回調函數,主要用于socket可讀事件,用于獲取socket的鏈接 """
    while True:
        try:
            connection, address = sock.accept()
        except socket.error as e:
            if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
                raise
            return
        connection.setblocking(0)
        handle_connection(connection, address)


if __name__ == '__main__':
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setblocking(0)
    sock.bind(("", 5000))
    sock.listen(128)
    # 使用tornado封裝好的epoll接口,即IOLoop對象
    io_loop = tornado.ioloop.IOLoop.current()
    callback = functools.partial(connection_ready, sock)
    # io_loop對象注冊網絡io文件描述符和回調函數與io事件的綁定
    io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
    io_loop.start()

上面的代碼來者tornado的模塊IOLoop源碼的文檔,很簡明的介紹了在tornado中如何使用網絡IO。當然具體的封裝實現,可以參考tornado源碼獲知,在此不做介紹了。

說了這么多,總算引出了我們的主人公 epoll 了。不同于忙輪詢和無差別輪詢,epoll 會把哪個流發生了怎樣的 I/O 事件通知我們。此時我們對這些流的操作都是有意義的。(復雜度降低到了O(k),k為產生 I/O 事件的流的個數。

三、epoll實現原理

3.1 epoll 操作

epoll 在 linux 內核中申請了一個簡易的文件系統,把原先的一個 select 或者 poll 調用分為了三個部分:調用 epoll_create 建立一個 epoll 對象(在 epoll 文件系統中給這個句柄分配資源)、調用 epoll_ctl 向 epoll 對象中添加連接的套接字、調用 epoll_wait 收集發生事件的連接。

這樣只需要在進程啟動的時候建立一個 epoll 對象,并在需要的時候向它添加或者刪除連接就可以了,因此,在實際收集的時候,epoll_wait 的效率會非常高,因為調用的時候只是傳遞了發生 IO 事件的連接。

3.2 epoll 實現

我們以 linux 內核 2.6 為例,說明一下 epoll 是如何高效的處理事件的。當某一個進程調用 epoll_create 方法的時候,Linux 內核會創建一個 eventpoll 結構體,這個結構體中有兩個重要的成員。

  • 第一個是 rb_root rbr,這是紅黑樹的根節點,存儲著所有添加到 epoll 中的事件,也就是這個 epoll 監控的事件。
  • 第二個是 list_head rdllist 這是一個雙向鏈表,保存著將要通過 epoll_wait 返回給用戶的、滿足條件的事件。

每一個 epoll 對象都有一個獨立的 eventpoll 結構體,這個結構體會在內核空間中創造獨立的內存,用于存儲使用 epoll_ctl 方法向 epoll 對象中添加進來的事件。這些事件都會掛到 rbr 紅黑樹中,這樣就能夠高效的識別重復添加的節點。

所有添加到 epoll 中的事件都會與設備(如網卡等)驅動程序建立回調關系,也就是說,相應的事件發生時會調用這里的方法。這個回調方法在內核中叫做 ep_poll_callback,它把這樣的事件放到 rdllist 雙向鏈表中。在 epoll 中,對于每一個事件都會建立一個 epitem 結構體。

當調用 epoll_wait 檢查是否有發生事件的連接時,只需要檢查 eventpoll 對象中的 rdllist 雙向鏈表中是否有 epitem 元素,如果 rdllist 鏈表不為空,則把這里的事件復制到用戶態內存中的同時,將事件數量返回給用戶。通過這種方法,epoll_wait 的效率非常高。epoll-ctl 在向 epoll 對象中添加、修改、刪除事件時,從 rbr 紅黑樹中查找事件也非常快。這樣,epoll 就能夠輕易的處理百萬級的并發連接。

⑴pollable

首先,linux 的 file 有個 pollable 的概念,只有 pollable 的 file 才可以加入到 epoll 和 select 中。一個 file 是 pollable 的當且僅當其定義了 file->f_op->poll。file->f_op->poll 的形式如下:

__poll_t poll(struct file *fp, poll_table *wait)

不同類型的 file 實現不同,但做的事情都差不多:

  • 通過 fp 拿到其對應的 waitqueue
  • 通過 wait 拿到外部設置的 callback[[1]]
  • 執行 callback(fp, waitqueue, wait),在 callback 中會將另外一個 callback2[[2]] 注冊到 waitqueue[[3]]中,此后 fp 有觸發事件就會調用 callback2

waitqueue 是事件驅動的,與驅動程序密切相關,簡單來說 poll 函數在 file 的觸發隊列中注冊了個 callback, 有事件發生時就調用callback。感興趣可以根據文后 [[4]] 的提示看看 socket 的 poll 實現

了解了 pollable 我們看看 epoll 的三個系統調用 epoll_create,,epoll_ctl,,epoll_wait:

①epoll_create:開啟 epoll 之門

epoll_create 宛如一把神奇的鑰匙,用來開啟 epoll 的大門。它的使命是創建一個 epoll 實例,函數原型為 “int epoll_create (int size);”,這里的 size 參數在早期 Linux 內核版本里,像是給內核的一個 “小提示”,暗示預計要監聽的文件描述符數量,好讓內核提前規劃資源。不過在后續版本,特別是 Linux 2.6.8 及以后,內核變得更加智能,這個參數就沒那么關鍵了,只要傳入大于 0 的值就行,通常我們就簡單傳個 1。函數執行成功,會返回一個文件描述符(fd),這可是后續操作 epoll 實例的關鍵 “入口”,有了它,才能進行添加、刪除監聽事件等一系列操作,要是創建失敗,就會返回 -1,同時 errno 會被設置成相應錯誤碼,告訴你問題出在哪,就像出門找不到鑰匙,后續啥事都干不了,所以得小心檢查錯誤。

②epoll_ctl:掌控監控大權

epoll_ctl 則是掌控監控大權的 “指揮官”,負責向 epoll 實例添加、修改或刪除文件描述符,原型是 “int epoll_ctl (int epfd, int op, int fd, struct epoll_event *event);”。其中 epfd 就是前面 epoll_create 返回的 epoll 實例文件描述符,op 指明操作類型,有三個 “指令” 可選:EPOLL_CTL_ADD 如同招募新兵,把新的 fd 及其關注事件注冊到 epoll 實例中;EPOLL_CTL_MOD 類似給士兵換崗,修改已注冊 fd 的監聽事件;EPOLL_CTL_DEL 則是讓士兵退役,從 epoll 實例里移除指定 fd。fd 就是要操作的文件描述符,而 event 是個關鍵的結構體指針,結構體里的 events 成員能指定要監聽的事件類型,像 EPOLLIN(可讀)、EPOLLOUT(可寫)、EPOLLERR(出錯)等,data 成員可以存放些自定義數據,方便識別 fd,比如存個指向結構體的指針,里面包含 fd 相關的業務信息。操作成功返回 0,失敗返回 -1,還會設置 errno,讓你知曉 “指揮” 哪里出了岔子。

epoll_ctl 的主要操作在 ep_insert, 它做了以下事情:

  • 初始化一個 epitem,里面包含 fd,監聽的事件,就緒鏈表,關聯的 epoll_fd 等信息
  • 調用 ep_item_poll(epitem, ep_ptable_queue_proc[[1]])。ep_item_poll 會調用 vfs_poll, vfs_poll 會調用上面說的 file->f_op->poll 將 ep_poll_callback[[2]] 注冊到 waitqueue
  • 調用 ep_rbtree_insert(eventpoll, epitem) 將 epitem 插入 evenpoll 對象的紅黑樹,方便后續查找

ep_poll_callback

在了解 epoll_wait 之前我們還需要知道 ep_poll_callback 做了哪些操作

  • ep_poll_callback 被調用,說明 epoll 中某個 file 有了新事件
  • eventpoll 對象有一個 rdllist 字段,用鏈表存著當前就緒的所有 epitem
  • ep_poll_callback 被調用的時候將 file 對應的 epitem 加到 rdllist 里(不重復)
  • 如果當前用戶正在 epoll_wait 阻塞狀態 ep_poll_callback 還會通過 wake_up_locked 將 epoll_wait 喚醒

③epoll_wait:等待事件降臨

epoll_wait 就像個耐心的 “守望者”,阻塞等待文件描述符上的事件發生。函數定義為 “int epoll_wait (int epfd, struct epoll_event *events, int maxevents, int timeout);”,epfd 還是那個熟悉的 epoll 實例文件描述符,events 是個傳出參數,是個數組,用來存放就緒事件的詳細信息,就像個 “收件箱”,內核把發生的事件詳情投遞進來。maxevents 規定了這個 “收件箱” 的最大容量,也就是最多能接收多少個就緒事件,要注意不能超過創建 epoll 實例時的 size 值。timeout 是超時時間,單位毫秒,-1 表示永遠等待,直到有事件發生;0 則是急性子,立馬返回,不管有沒有事件;正數就設定個等待期限。當有事件就緒或者超時,函數就會返回就緒事件的數量,如果返回 -1,那就是遇到錯誤,errno 會記錄錯誤原因,等待結束后,就能從 events 數組里依次取出就緒事件,按業務需求處理,開啟數據的讀寫之旅。epoll_wait 主要做了以下操作:

  • 檢查 rdllist,如果不為空則去到 7,如果為空則去到 2
  • 設置 timeout
  • 開始無限循環
  • 設置線程狀態為 TASK_INTERRUPTIBLE [參看 Sleeping in the Kernal](Kernel Korner - Sleeping in the Kernel)
  • 檢查 rdllist 如果不為空去到 7, 否則去到 6
  • 調用 schedule_hrtimeout_range 睡到 timeout,中途有可能被 ep_poll_callback 喚醒回到 4,如果真的 timeout 則 break 去到 7
  • 設置線程狀態為 TASK_RUNNING,rdllist如果不為空時退出循環,否則繼續循環
  • 調用 ep_send_events 將 rdllist 返回給用戶態

epoll 的原理基本上就這些,還有很多細節如紅黑樹在哪里用,怎樣實現 level-triggered 和 edge-triggered... 我還沒看。

PS. 普通文件不是 pollable 的,詳情請看 epoll_does_not_work_with_file

3.3 epoll 工作模式

①水平觸發(LT):持續通知的 “貼心管家”

水平觸發(LT)可是 epoll 的默認工作模式,就像一位貼心管家,時刻關注著文件描述符的狀態。當某個文件描述符處于就緒狀態,比如有數據可讀或者可寫,內核就會通知應用程序。要是應用程序這次沒處理完數據,或者沒來得及處理,別擔心,下次調用 epoll_wait 時,內核依舊會不厭其煩地再次通知,直到數據被處理完或者緩沖區里沒數據可讀、可寫了為止。

舉個例子,在處理 HTTP 報文時,數據可能是一段段陸續到達的。使用 LT 模式,只要緩沖區還有沒讀完的報文片段,每次 epoll_wait 都會把對應的文件描述符事件返回,讓應用程序可以分次從容地解析報文,不用擔心錯過任何數據,大大降低了編程復雜度,對新手程序員那是相當友好,就像有個老師在旁邊,不停提醒你還有作業沒做完呢。

②邊緣觸發(ET):高效靈敏的 “情報員”

邊緣觸發(ET)模式則像一位高效靈敏的情報員,奉行 “只報新事” 原則。只有在文件描述符的狀態發生改變時,比如從無數據變為有數據可讀,或者從不可寫變為可寫,內核才會觸發事件通知應用程序。一旦通知了,它就默認你知曉此事,后續除非狀態再次改變,否則不會重復通知。這意味著應用程序得打起十二分精神,在收到通知后,必須立刻、馬上處理數據,而且要盡可能把當前就緒的數據一次性處理完。

比如說讀取大型文件,使用 ET 模式,一旦檢測到文件描述符可讀,就得趕緊用 while 循環一股腦把數據全讀完,不然下次 epoll_wait 可不會再提醒你還有剩余數據。要是讀數據時遇到 EAGAIN 或 EWOULDBLOCK 錯誤,那就說明這次數據真讀完了。這種模式雖然編程難度稍高,需要精細處理數據,但減少了不必要的喚醒次數,系統開銷小,在追求極致性能的場景下,那可是 “利器”,能讓數據如閃電般高效流轉。

當然,在 LT 模式下開發基于 epoll 的應用要簡單一些,不太容易出錯,而在 ET 模式下事件發生時,如果沒有徹底地將緩沖區的數據處理完,則會導致緩沖區的用戶請求得不到響應。注意,默認情況下 Nginx 采用 ET 模式使用 epoll 的。

四、epoll內核源碼詳解

網上很多博客說epoll使用了共享內存,這個是完全錯誤的 ,可以閱讀源碼,會發現完全沒有使用共享內存的任何api,而是 使用了copy_from_user跟__put_user進行內核跟用戶虛擬空間數據交互。

/*
 *  fs/eventpoll.c (Efficient event retrieval implementation)
 *  Copyright (C) 2001,...,2009	 Davide Libenzi
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  Davide Libenzi <davidel@xmailserver.org>
 *
 */
/*
 * 在深入了解epoll的實現之前, 先來了解內核的3個方面.
 * 1. 等待隊列 waitqueue
 * 我們簡單解釋一下等待隊列:
 * 隊列頭(wait_queue_head_t)往往是資源生產者,
 * 隊列成員(wait_queue_t)往往是資源消費者,
 * 當頭的資源ready后, 會逐個執行每個成員指定的回調函數,
 * 來通知它們資源已經ready了, 等待隊列大致就這個意思.
 * 2. 內核的poll機制
 * 被Poll的fd, 必須在實現上支持內核的Poll技術,
 * 比如fd是某個字符設備,或者是個socket, 它必須實現
 * file_operations中的poll操作, 給自己分配有一個等待隊列頭.
 * 主動poll fd的某個進程必須分配一個等待隊列成員, 添加到
 * fd的對待隊列里面去, 并指定資源ready時的回調函數.
 * 用socket做例子, 它必須有實現一個poll操作, 這個Poll是
 * 發起輪詢的代碼必須主動調用的, 該函數中必須調用poll_wait(),
 * poll_wait會將發起者作為等待隊列成員加入到socket的等待隊列中去.
 * 這樣socket發生狀態變化時可以通過隊列頭逐個通知所有關心它的進程.
 * 這一點必須很清楚的理解, 否則會想不明白epoll是如何
 * 得知fd的狀態發生變化的.
 * 3. epollfd本身也是個fd, 所以它本身也可以被epoll,
 * 可以猜測一下它是不是可以無限嵌套epoll下去... 
 *
 * epoll基本上就是使用了上面的1,2點來完成.
 * 可見epoll本身并沒有給內核引入什么特別復雜或者高深的技術,
 * 只不過是已有功能的重新組合, 達到了超過select的效果.
 */
/* 
 * 相關的其它內核知識:
 * 1. fd我們知道是文件描述符, 在內核態, 與之對應的是struct file結構,
 * 可以看作是內核態的文件描述符.
 * 2. spinlock, 自旋鎖, 必須要非常小心使用的鎖,
 * 尤其是調用spin_lock_irqsave()的時候, 中斷關閉, 不會發生進程調度,
 * 被保護的資源其它CPU也無法訪問. 這個鎖是很強力的, 所以只能鎖一些
 * 非常輕量級的操作.
 * 3. 引用計數在內核中是非常重要的概念,
 * 內核代碼里面經常有些release, free釋放資源的函數幾乎不加任何鎖,
 * 這是因為這些函數往往是在對象的引用計數變成0時被調用,
 * 既然沒有進程在使用在這些對象, 自然也不需要加鎖.
 * struct file 是持有引用計數的.
 */
/* --- epoll相關的數據結構 --- */
/*
 * This structure is stored inside the "private_data" member of the file
 * structure and rapresent the main data sructure for the eventpoll
 * interface.
 */
/* 每創建一個epollfd, 內核就會分配一個eventpoll與之對應, 可以說是
 * 內核態的epollfd. */
struct eventpoll {
	/* Protect the this structure access */
	spinlock_t lock;
	/*
	 * This mutex is used to ensure that files are not removed
	 * while epoll is using them. This is held during the event
	 * collection loop, the file cleanup path, the epoll file exit
	 * code and the ctl operations.
	 */
	/* 添加, 修改或者刪除監聽fd的時候, 以及epoll_wait返回, 向用戶空間
	 * 傳遞數據時都會持有這個互斥鎖, 所以在用戶空間可以放心的在多個線程
	 * 中同時執行epoll相關的操作, 內核級已經做了保護. */
	struct mutex mtx;
	/* Wait queue used by sys_epoll_wait() */
	/* 調用epoll_wait()時, 我們就是"睡"在了這個等待隊列上... */
	wait_queue_head_t wq;
	/* Wait queue used by file->poll() */
	/* 這個用于epollfd本事被poll的時候... */
	wait_queue_head_t poll_wait;
	/* List of ready file descriptors */
	/* 所有已經ready的epitem都在這個鏈表里面 */
	struct list_head rdllist;
	/* RB tree root used to store monitored fd structs */
	/* 所有要監聽的epitem都在這里 */
	struct rb_root rbr;
	/*
		這是一個單鏈表鏈接著所有的struct epitem當event轉移到用戶空間時
	 */
	 * This is a single linked list that chains all the "struct epitem" that
	 * happened while transfering ready events to userspace w/out
	 * holding ->lock.
	 */
	struct epitem *ovflist;
	/* The user that created the eventpoll descriptor */
	/* 這里保存了一些用戶變量, 比如fd監聽數量的最大值等等 */
	struct user_struct *user;
};
/*
 * Each file descriptor added to the eventpoll interface will
 * have an entry of this type linked to the "rbr" RB tree.
 */
/* epitem 表示一個被監聽的fd */
struct epitem {
	/* RB tree node used to link this structure to the eventpoll RB tree */
	/* rb_node, 當使用epoll_ctl()將一批fds加入到某個epollfd時, 內核會分配
	 * 一批的epitem與fds們對應, 而且它們以rb_tree的形式組織起來, tree的root
	 * 保存在epollfd, 也就是struct eventpoll中. 
	 * 在這里使用rb_tree的原因我認為是提高查找,插入以及刪除的速度.
	 * rb_tree對以上3個操作都具有O(lgN)的時間復雜度 */
	struct rb_node rbn;
	/* List header used to link this structure to the eventpoll ready list */
	/* 鏈表節點, 所有已經ready的epitem都會被鏈到eventpoll的rdllist中 */
	struct list_head rdllink;
	/*
	 * Works together "struct eventpoll"->ovflist in keeping the
	 * single linked chain of items.
	 */
	/* 這個在代碼中再解釋... */
	struct epitem *next;
	/* The file descriptor information this item refers to */
	/* epitem對應的fd和struct file */
	struct epoll_filefd ffd;
	/* Number of active wait queue attached to poll operations */
	int nwait;
	/* List containing poll wait queues */
	struct list_head pwqlist;
	/* The "container" of this item */
	/* 當前epitem屬于哪個eventpoll */
	struct eventpoll *ep;
	/* List header used to link this item to the "struct file" items list */
	struct list_head fllink;
	/* The structure that describe the interested events and the source fd */
	/* 當前的epitem關系哪些events, 這個數據是調用epoll_ctl時從用戶態傳遞過來 */
	struct epoll_event event;
};
struct epoll_filefd {
	struct file *file;
	int fd;
};
/* poll所用到的鉤子Wait structure used by the poll hooks */
struct eppoll_entry {
	/* List header used to link this structure to the "struct epitem" */
	struct list_head llink;
	/* The "base" pointer is set to the container "struct epitem" */
	struct epitem *base;
	/*
	 * Wait queue item that will be linked to the target file wait
	 * queue head.
	 */
	wait_queue_t wait;
	/* The wait queue head that linked the "wait" wait queue item */
	wait_queue_head_t *whead;
};
/* Wrapper struct used by poll queueing */
struct ep_pqueue {
	poll_table pt;
	struct epitem *epi;
};
/* Used by the ep_send_events() function as callback private data */
struct ep_send_events_data {
	int maxevents;
	struct epoll_event __user *events;
};

/* --- 代碼注釋 --- */
/* 你沒看錯, 這就是epoll_create()的真身, 基本啥也不干直接調用epoll_create1了,
 * 另外你也可以發現, size這個參數其實是沒有任何用處的... */
SYSCALL_DEFINE1(epoll_create, int, size)
{
        if (size <= 0)
                return -EINVAL;
        return sys_epoll_create1(0);
}
/* 這才是真正的epoll_create啊~~ */
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
	int error;
	struct eventpoll *ep = NULL;//主描述符
	/* Check the EPOLL_* constant for consistency.  */
	/* 這句沒啥用處... */
	BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
	/* 對于epoll來講, 目前唯一有效的FLAG就是CLOEXEC */
	if (flags & ~EPOLL_CLOEXEC)
		return -EINVAL;
	/*
	 * Create the internal data structure ("struct eventpoll").
	 */
	/* 分配一個struct eventpoll, 分配和初始化細節我們隨后深聊~ */
	error = ep_alloc(&ep);
	if (error < 0)
		return error;
	/*
	 * Creates all the items needed to setup an eventpoll file. That is,
	 * a file structure and a free file descriptor.
	 */
	/* 這里是創建一個匿名fd, 說起來就話長了...長話短說:
	 * epollfd本身并不存在一個真正的文件與之對應, 所以內核需要創建一個
	 * "虛擬"的文件, 并為之分配真正的struct file結構, 而且有真正的fd.
	 * 這里2個參數比較關鍵:
	 * eventpoll_fops, fops就是file operations, 就是當你對這個文件(這里是虛擬的)進行操作(比如讀)時,
	 * fops里面的函數指針指向真正的操作實現, 類似C++里面虛函數和子類的概念.
	 * epoll只實現了poll和release(就是close)操作, 其它文件系統操作都有VFS全權處理了.
	 * ep, ep就是struct epollevent, 它會作為一個私有數據保存在struct file的private指針里面.
	 * 其實說白了, 就是為了能通過fd找到struct file, 通過struct file能找到eventpoll結構.
	 * 如果懂一點Linux下字符設備驅動開發, 這里應該是很好理解的,
	 * 推薦閱讀 <Linux device driver 3rd>
	 */
	error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
				 O_RDWR | (flags & O_CLOEXEC));
	if (error < 0)
		ep_free(ep);
	return error;
}
/* 
* 創建好epollfd后, 接下來我們要往里面添加fd咯
* 來看epoll_ctl
* epfd 就是epollfd
* op ADD,MOD,DEL
* fd 需要監聽的描述符
* event 我們關心的events
*/
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)
{
	int error;
	struct file *file, *tfile;
	struct eventpoll *ep;
	struct epitem *epi;
	struct epoll_event epds;
	error = -EFAULT;
	/* 
	 * 錯誤處理以及從用戶空間將epoll_event結構copy到內核空間.
	 */
	if (ep_op_has_event(op) &&
	    copy_from_user(&epds, event, sizeof(struct epoll_event)))
		goto error_return;
	/* Get the "struct file *" for the eventpoll file */
	/* 取得struct file結構, epfd既然是真正的fd, 那么內核空間
	 * 就會有與之對于的一個struct file結構
	 * 這個結構在epoll_create1()中, 由函數anon_inode_getfd()分配 */
	error = -EBADF;
	file = fget(epfd);
	if (!file)
		goto error_return;
	/* Get the "struct file *" for the target file */
	/* 我們需要監聽的fd, 它當然也有個struct file結構, 上下2個不要搞混了哦 */
	tfile = fget(fd);
	if (!tfile)
		goto error_fput;
	/* The target file descriptor must support poll */
	error = -EPERM;
	/* 如果監聽的文件不支持poll, 那就沒轍了.
	 * 你知道什么情況下, 文件會不支持poll嗎?
	 */
	if (!tfile->f_op || !tfile->f_op->poll)
		goto error_tgt_fput;
	/*
	 * We have to check that the file structure underneath the file descriptor
	 * the user passed to us _is_ an eventpoll file. And also we do not permit
	 * adding an epoll file descriptor inside itself.
	 */
	error = -EINVAL;
	/* epoll不能自己監聽自己... */
	if (file == tfile || !is_file_epoll(file))
		goto error_tgt_fput;
	/*
	 * At this point it is safe to assume that the "private_data" contains
	 * our own data structure.
	 */
	/* 取到我們的eventpoll結構, 來自與epoll_create1()中的分配 */
	ep = file->private_data;
	/* 接下來的操作有可能修改數據結構內容, 鎖之~ */
	mutex_lock(&ep->mtx);
	/*
	 * Try to lookup the file inside our RB tree, Since we grabbed "mtx"
	 * above, we can be sure to be able to use the item looked up by
	 * ep_find() till we release the mutex.
	 */
	/* 對于每一個監聽的fd, 內核都有分配一個epitem結構,
	 * 而且我們也知道, epoll是不允許重復添加fd的,
	 * 所以我們首先查找該fd是不是已經存在了.
	 * ep_find()其實就是RBTREE查找, 跟C++STL的map差不多一回事, O(lgn)的時間復雜度.
	 */
	epi = ep_find(ep, tfile, fd);
	error = -EINVAL;
	switch (op) {
		/* 首先我們關心添加 */
	case EPOLL_CTL_ADD:
		if (!epi) {
			/* 之前的find沒有找到有效的epitem, 證明是第一次插入, 接受!
			 * 這里我們可以知道, POLLERR和POLLHUP事件內核總是會關心的
			 * */
			epds.events |= POLLERR | POLLHUP;
			/* rbtree插入, 詳情見ep_insert()的分析
			 * 其實我覺得這里有insert的話, 之前的find應該
			 * 是可以省掉的... */
			error = ep_insert(ep, &epds, tfile, fd);
		} else
			/* 找到了!? 重復添加! */
			error = -EEXIST;
		break;
		/* 刪除和修改操作都比較簡單 */
	case EPOLL_CTL_DEL:
		if (epi)
			error = ep_remove(ep, epi);
		else
			error = -ENOENT;
		break;
	case EPOLL_CTL_MOD:
		if (epi) {
			epds.events |= POLLERR | POLLHUP;
			error = ep_modify(ep, epi, &epds);
		} else
			error = -ENOENT;
		break;
	}
	mutex_unlock(&ep->mtx);
error_tgt_fput:
	fput(tfile);
error_fput:
	fput(file);
error_return:
	return error;
}
/* 分配一個eventpoll結構 */
static int ep_alloc(struct eventpoll **pep)
{
	int error;
	struct user_struct *user;
	struct eventpoll *ep;
	/* 獲取當前用戶的一些信息, 比如是不是root啦, 最大監聽fd數目啦 */
	user = get_current_user();
	error = -ENOMEM;
	ep = kzalloc(sizeof(*ep), GFP_KERNEL);
	if (unlikely(!ep))
		goto free_uid;
	/* 這些都是初始化啦 */
	spin_lock_init(&ep->lock);
	mutex_init(&ep->mtx);
	init_waitqueue_head(&ep->wq);//初始化自己睡在的等待隊列
	init_waitqueue_head(&ep->poll_wait);//初始化
	INIT_LIST_HEAD(&ep->rdllist);//初始化就緒鏈表
	ep->rbr = RB_ROOT;
	ep->ovflist = EP_UNACTIVE_PTR;
	ep->user = user;
	*pep = ep;
	return 0;
free_uid:
	free_uid(user);
	return error;
}
/*
 * Must be called with "mtx" held.
 */
/* 
 * ep_insert()在epoll_ctl()中被調用, 完成往epollfd里面添加一個監聽fd的工作
 * tfile是fd在內核態的struct file結構
 */
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
		     struct file *tfile, int fd)
{
	int error, revents, pwake = 0;
	unsigned long flags;
	struct epitem *epi;
	struct ep_pqueue epq;
	/* 查看是否達到當前用戶的最大監聽數 */
	if (unlikely(atomic_read(&ep->user->epoll_watches) >=
		     max_user_watches))
		return -ENOSPC;
	/* 從著名的slab中分配一個epitem */
	if (!(epi = kmem_***_alloc(epi_***, GFP_KERNEL)))
		return -ENOMEM;
	/* Item initialization follow here ... */
	/* 這些都是相關成員的初始化... */
	INIT_LIST_HEAD(&epi->rdllink);
	INIT_LIST_HEAD(&epi->fllink);
	INIT_LIST_HEAD(&epi->pwqlist);
	epi->ep = ep;
	/* 這里保存了我們需要監聽的文件fd和它的file結構 */
	ep_set_ffd(&epi->ffd, tfile, fd);
	epi->event = *event;
	epi->nwait = 0;
	/* 這個指針的初值不是NULL哦... */
	epi->next = EP_UNACTIVE_PTR;
	/* Initialize the poll table using the queue callback */
	/* 好, 我們終于要進入到poll的正題了 */
	epq.epi = epi;
	/* 初始化一個poll_table
	 * 其實就是指定調用poll_wait(注意不是epoll_wait!!!)時的回調函數,和我們關心哪些events,
	 * ep_ptable_queue_proc()就是我們的回調啦, 初值是所有event都關心 */
	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
	/*
	 * Attach the item to the poll hooks and get current event bits.
	 * We can safely use the file* here because its usage count has
	 * been increased by the caller of this function. Note that after
	 * this operation completes, the poll callback can start hitting
	 * the new item.
	 */
	/* 這一部很關鍵, 也比較難懂, 完全是內核的poll機制導致的...
	 * 首先, f_op->poll()一般來說只是個wrapper, 它會調用真正的poll實現,
	 * 拿UDP的socket來舉例, 這里就是這樣的調用流程: f_op->poll(), sock_poll(),
	 * udp_poll(), datagram_poll(), sock_poll_wait(), 最后調用到我們上面指定的
	 * ep_ptable_queue_proc()這個回調函數...(好深的調用路徑...).
	 * 完成這一步, 我們的epitem就跟這個socket關聯起來了, 當它有狀態變化時,
	 * 會通過ep_poll_callback()來通知.
	 * 最后, 這個函數還會查詢當前的fd是不是已經有啥event已經ready了, 有的話
	 * 會將event返回. */
	revents = tfile->f_op->poll(tfile, &epq.pt);
	/*
	 * We have to check if something went wrong during the poll wait queue
	 * install process. Namely an allocation for a wait queue failed due
	 * high memory pressure.
	 */
	error = -ENOMEM;
	if (epi->nwait < 0)
		goto error_unregister;
	/* Add the current item to the list of active epoll hook for this file */
	/* 這個就是每個文件會將所有監聽自己的epitem鏈起來 */
	spin_lock(&tfile->f_lock);
	list_add_tail(&epi->fllink, &tfile->f_ep_links);
	spin_unlock(&tfile->f_lock);
	/*
	 * Add the current item to the RB tree. All RB tree operations are
	 * protected by "mtx", and ep_insert() is called with "mtx" held.
	 */
	/* 都搞定后, 將epitem插入到對應的eventpoll中去 */
	ep_rbtree_insert(ep, epi);
	/* We have to drop the new item inside our item list to keep track of it */
	spin_lock_irqsave(&ep->lock, flags);
	/* If the file is already "ready" we drop it inside the ready list */
	/* 到達這里后, 如果我們監聽的fd已經有事件發生, 那就要處理一下 */
	if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
		/* 將當前的epitem加入到ready list中去 */
		list_add_tail(&epi->rdllink, &ep->rdllist);
		/* Notify waiting tasks that events are available */
		/* 誰在epoll_wait, 就喚醒它... */
		if (waitqueue_active(&ep->wq))
			wake_up_locked(&ep->wq);
		/* 誰在epoll當前的epollfd, 也喚醒它... */
		if (waitqueue_active(&ep->poll_wait))
			pwake++;
	}
	spin_unlock_irqrestore(&ep->lock, flags);
	atomic_inc(&ep->user->epoll_watches);
	/* We have to call this outside the lock */
	if (pwake)
		ep_poll_safewake(&ep->poll_wait);
	return 0;
error_unregister:
	ep_unregister_pollwait(ep, epi);
	/*
	 * We need to do this because an event could have been arrived on some
	 * allocated wait queue. Note that we don't care about the ep->ovflist
	 * list, since that is used/cleaned only inside a section bound by "mtx".
	 * And ep_insert() is called with "mtx" held.
	 */
	spin_lock_irqsave(&ep->lock, flags);
	if (ep_is_linked(&epi->rdllink))
		list_del_init(&epi->rdllink);
	spin_unlock_irqrestore(&ep->lock, flags);
	kmem_***_free(epi_***, epi);
	return error;
}
/*
 * This is the callback that is used to add our wait queue to the
 * target file wakeup lists.
 */
/* 
 * 該函數在調用f_op->poll()時會被調用.
 * 也就是epoll主動poll某個fd時, 用來將epitem與指定的fd關聯起來的.
 * 關聯的辦法就是使用等待隊列(waitqueue)
 */
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
				 poll_table *pt)
{
	struct epitem *epi = ep_item_from_epqueue(pt);
	struct eppoll_entry *pwq;
	if (epi->nwait >= 0 && (pwq = kmem_***_alloc(pwq_***, GFP_KERNEL))) {
		/* 初始化等待隊列, 指定ep_poll_callback為喚醒時的回調函數,
		 * 當我們監聽的fd發生狀態改變時, 也就是隊列頭被喚醒時,
		 * 指定的回調函數將會被調用. */
		init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
		pwq->whead = whead;
		pwq->base = epi;
		/* 將剛分配的等待隊列成員加入到頭中, 頭是由fd持有的 */
		add_wait_queue(whead, &pwq->wait);
		list_add_tail(&pwq->llink, &epi->pwqlist);
		/* nwait記錄了當前epitem加入到了多少個等待隊列中,
		 * 我認為這個值最大也只會是1... */
		epi->nwait++;
	} else {
		/* We have to signal that an error occurred */
		epi->nwait = -1;
	}
}
/*
 * This is the callback that is passed to the wait queue wakeup
 * machanism. It is called by the stored file descriptors when they
 * have events to report.
 */
/* 
 * 這個是關鍵性的回調函數, 當我們監聽的fd發生狀態改變時, 它會被調用.
 * 參數key被當作一個unsigned long整數使用, 攜帶的是events.
 */
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
	int pwake = 0;
	unsigned long flags;
	struct epitem *epi = ep_item_from_wait(wait);//從等待隊列獲取epitem.需要知道哪個進程掛載到這個設備
	struct eventpoll *ep = epi->ep;//獲取
	spin_lock_irqsave(&ep->lock, flags);
	/*
	 * If the event mask does not contain any poll(2) event, we consider the
	 * descriptor to be disabled. This condition is likely the effect of the
	 * EPOLLONESHOT bit that disables the descriptor when an event is received,
	 * until the next EPOLL_CTL_MOD will be issued.
	 */
	if (!(epi->event.events & ~EP_PRIVATE_BITS))
		goto out_unlock;
	/*
	 * Check the events coming with the callback. At this stage, not
	 * every device reports the events in the "key" parameter of the
	 * callback. We need to be able to handle both cases here, hence the
	 * test for "key" != NULL before the event match test.
	 */
	/* 沒有我們關心的event... */
	if (key && !((unsigned long) key & epi->event.events))
		goto out_unlock;
	/*
	 * If we are trasfering events to userspace, we can hold no locks
	 * (because we're accessing user memory, and because of linux f_op->poll()
	 * semantics). All the events that happens during that period of time are
	 * chained in ep->ovflist and requeued later on.
	 */
	/* 
	 * 這里看起來可能有點費解, 其實干的事情比較簡單:
	 * 如果該callback被調用的同時, epoll_wait()已經返回了,
	 * 也就是說, 此刻應用程序有可能已經在循環獲取events,
	 * 這種情況下, 內核將此刻發生event的epitem用一個單獨的鏈表
	 * 鏈起來, 不發給應用程序, 也不丟棄, 而是在下一次epoll_wait
	 * 時返回給用戶.
	 */
	if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
		if (epi->next == EP_UNACTIVE_PTR) {
			epi->next = ep->ovflist;
			ep->ovflist = epi;
		}
		goto out_unlock;
	}
	/* If this file is already in the ready list we exit soon */
	/* 將當前的epitem放入ready list */
	if (!ep_is_linked(&epi->rdllink))
		list_add_tail(&epi->rdllink, &ep->rdllist);
	/*
	 * Wake up ( if active ) both the eventpoll wait list and the ->poll()
	 * wait list.
	 */
	/* 喚醒epoll_wait... */
	if (waitqueue_active(&ep->wq))
		wake_up_locked(&ep->wq);
	/* 如果epollfd也在被poll, 那就喚醒隊列里面的所有成員. */
	if (waitqueue_active(&ep->poll_wait))
		pwake++;
out_unlock:
	spin_unlock_irqrestore(&ep->lock, flags);
	/* We have to call this outside the lock */
	if (pwake)
		ep_poll_safewake(&ep->poll_wait);
	return 1;
}
/*
 * Implement the event wait interface for the eventpoll file. It is the kernel
 * part of the user space epoll_wait(2).
 */
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
		int, maxevents, int, timeout)
{
	int error;
	struct file *file;
	struct eventpoll *ep;
	/* The maximum number of event must be greater than zero */
	if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
		return -EINVAL;
	/* Verify that the area passed by the user is writeable */
	/* 這個地方有必要說明一下:
	 * 內核對應用程序采取的策略是"絕對不信任",
	 * 所以內核跟應用程序之間的數據交互大都是copy, 不允許(也時候也是不能...)指針引用.
	 * epoll_wait()需要內核返回數據給用戶空間, 內存由用戶程序提供,
	 * 所以內核會用一些手段來驗證這一段內存空間是不是有效的.
	 */
	if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
		error = -EFAULT;
		goto error_return;
	}
	/* Get the "struct file *" for the eventpoll file */
	error = -EBADF;
	/* 獲取epollfd的struct file, epollfd也是文件嘛 */
	file = fget(epfd);
	if (!file)
		goto error_return;
	/*
	 * We have to check that the file structure underneath the fd
	 * the user passed to us _is_ an eventpoll file.
	 */
	error = -EINVAL;
	/* 檢查一下它是不是一個真正的epollfd... */
	if (!is_file_epoll(file))
		goto error_fput;
	/*
	 * At this point it is safe to assume that the "private_data" contains
	 * our own data structure.
	 */
	/* 獲取eventpoll結構 */
	ep = file->private_data;
	/* Time to fish for events ... */
	/* OK, 睡覺, 等待事件到來~~ */
	error = ep_poll(ep, events, maxevents, timeout);
error_fput:
	fput(file);
error_return:
	return error;
}
/* 這個函數真正將執行epoll_wait的進程帶入睡眠狀態... */
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, long timeout)
{
	int res, eavail;
	unsigned long flags;
	long jtimeout;
	wait_queue_t wait;//等待隊列
	/*
	 * Calculate the timeout by checking for the "infinite" value (-1)
	 * and the overflow condition. The passed timeout is in milliseconds,
	 * that why (t * HZ) / 1000.
	 */
	/* 計算睡覺時間, 毫秒要轉換為HZ */
	jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
		MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
	spin_lock_irqsave(&ep->lock, flags);
	res = 0;
	/* 如果ready list不為空, 就不睡了, 直接干活... */
	if (list_empty(&ep->rdllist)) {
		/*
		 * We don't have any available event to return to the caller.
		 * We need to sleep here, and we will be wake up by
		 * ep_poll_callback() when events will become available.
		 */
		/* OK, 初始化一個等待隊列, 準備直接把自己掛起,
		 * 注意current是一個宏, 代表當前進程 */
		init_waitqueue_entry(&wait, current);//初始化等待隊列,wait表示當前進程
		__add_wait_queue_exclusive(&ep->wq, &wait);//掛載到ep結構的等待隊列
		for (;;) {
			/*
			 * We don't want to sleep if the ep_poll_callback() sends us
			 * a wakeup in between. That's why we set the task state
			 * to TASK_INTERRUPTIBLE before doing the checks.
			 */
			/* 將當前進程設置位睡眠, 但是可以被信號喚醒的狀態,
			 * 注意這個設置是"將來時", 我們此刻還沒睡! */
			set_current_state(TASK_INTERRUPTIBLE);
			/* 如果這個時候, ready list里面有成員了,
			 * 或者睡眠時間已經過了, 就直接不睡了... */
			if (!list_empty(&ep->rdllist) || !jtimeout)
				break;
			/* 如果有信號產生, 也起床... */
			if (signal_pending(current)) {
				res = -EINTR;
				break;
			}
			/* 啥事都沒有,解鎖, 睡覺... */
			spin_unlock_irqrestore(&ep->lock, flags);
			/* jtimeout這個時間后, 會被喚醒,
			 * ep_poll_callback()如果此時被調用,
			 * 那么我們就會直接被喚醒, 不用等時間了... 
			 * 再次強調一下ep_poll_callback()的調用時機是由被監聽的fd
			 * 的具體實現, 比如socket或者某個設備驅動來決定的,
			 * 因為等待隊列頭是他們持有的, epoll和當前進程
			 * 只是單純的等待...
			 **/
			jtimeout = schedule_timeout(jtimeout);//睡覺
			spin_lock_irqsave(&ep->lock, flags);
		}
		__remove_wait_queue(&ep->wq, &wait);
		/* OK 我們醒來了... */
		set_current_state(TASK_RUNNING);
	}
	/* Is it worth to try to dig for events ? */
	eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
	spin_unlock_irqrestore(&ep->lock, flags);
	/*
	 * Try to transfer events to user space. In case we get 0 events and
	 * there's still timeout left over, we go trying again in search of
	 * more luck.
	 */
	/* 如果一切正常, 有event發生, 就開始準備數據copy給用戶空間了... */
	if (!res && eavail &&
	    !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
		goto retry;
	return res;
}
/* 這個簡單, 我們直奔下一個... */
static int ep_send_events(struct eventpoll *ep,
			  struct epoll_event __user *events, int maxevents)
{
	struct ep_send_events_data esed;
	esed.maxevents = maxevents;
	esed.events = events;
	return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
/**
 * ep_scan_ready_list - Scans the ready list in a way that makes possible for
 *                      the scan code, to call f_op->poll(). Also allows for
 *                      O(NumReady) performance.
 *
 * @ep: Pointer to the epoll private data structure.
 * @sproc: Pointer to the scan callback.
 * @priv: Private opaque data passed to the @sproc callback.
 *
 * Returns: The same integer error code returned by the @sproc callback.
 */
static int ep_scan_ready_list(struct eventpoll *ep,
			      int (*sproc)(struct eventpoll *,
					   struct list_head *, void *),
			      void *priv)
{
	int error, pwake = 0;
	unsigned long flags;
	struct epitem *epi, *nepi;
	LIST_HEAD(txlist);
	/*
	 * We need to lock this because we could be hit by
	 * eventpoll_release_file() and epoll_ctl().
	 */
	mutex_lock(&ep->mtx);
	/*
	 * Steal the ready list, and re-init the original one to the
	 * empty list. Also, set ep->ovflist to NULL so that events
	 * happening while looping w/out locks, are not lost. We cannot
	 * have the poll callback to queue directly on ep->rdllist,
	 * because we want the "sproc" callback to be able to do it
	 * in a lockless way.
	 */
	spin_lock_irqsave(&ep->lock, flags);
	/* 這一步要注意, 首先, 所有監聽到events的epitem都鏈到rdllist上了,
	 * 但是這一步之后, 所有的epitem都轉移到了txlist上, 而rdllist被清空了,
	 * 要注意哦, rdllist已經被清空了! */
	list_splice_init(&ep->rdllist, &txlist);
	/* ovflist, 在ep_poll_callback()里面我解釋過, 此時此刻我們不希望
	 * 有新的event加入到ready list中了, 保存后下次再處理... */
	ep->ovflist = NULL;
	spin_unlock_irqrestore(&ep->lock, flags);
	/*
	 * Now call the callback function.
	 */
	/* 在這個回調函數里面處理每個epitem
	 * sproc 就是 ep_send_events_proc, 下面會注釋到. */
	error = (*sproc)(ep, &txlist, priv);
	spin_lock_irqsave(&ep->lock, flags);
	/*
	 * During the time we spent inside the "sproc" callback, some
	 * other events might have been queued by the poll callback.
	 * We re-insert them inside the main ready-list here.
	 */
	/* 現在我們來處理ovflist, 這些epitem都是我們在傳遞數據給用戶空間時
	 * 監聽到了事件. */
	for (nepi = ep->ovflist; (epi = nepi) != NULL;
	     nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
		/*
		 * We need to check if the item is already in the list.
		 * During the "sproc" callback execution time, items are
		 * queued into ->ovflist but the "txlist" might already
		 * contain them, and the list_splice() below takes care of them.
		 */
		/* 將這些直接放入readylist */
		if (!ep_is_linked(&epi->rdllink))
			list_add_tail(&epi->rdllink, &ep->rdllist);
	}
	/*
	 * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
	 * releasing the lock, events will be queued in the normal way inside
	 * ep->rdllist.
	 */
	ep->ovflist = EP_UNACTIVE_PTR;
	/*
	 * Quickly re-inject items left on "txlist".
	 */
	/* 上一次沒有處理完的epitem, 重新插入到ready list */
	list_splice(&txlist, &ep->rdllist);
	/* ready list不為空, 直接喚醒... */
	if (!list_empty(&ep->rdllist)) {
		/*
		 * Wake up (if active) both the eventpoll wait list and
		 * the ->poll() wait list (delayed after we release the lock).
		 */
		if (waitqueue_active(&ep->wq))
			wake_up_locked(&ep->wq);
		if (waitqueue_active(&ep->poll_wait))
			pwake++;
	}
	spin_unlock_irqrestore(&ep->lock, flags);
	mutex_unlock(&ep->mtx);
	/* We have to call this outside the lock */
	if (pwake)
		ep_poll_safewake(&ep->poll_wait);
	return error;
}
/* 該函數作為callbakc在ep_scan_ready_list()中被調用
 * head是一個鏈表, 包含了已經ready的epitem,
 * 這個不是eventpoll里面的ready list, 而是上面函數中的txlist.
 */
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
			       void *priv)
{
	struct ep_send_events_data *esed = priv;
	int eventcnt;
	unsigned int revents;
	struct epitem *epi;
	struct epoll_event __user *uevent;
	/*
	 * We can loop without lock because we are passed a task private list.
	 * Items cannot vanish during the loop because ep_scan_ready_list() is
	 * holding "mtx" during this call.
	 */
	/* 掃描整個鏈表... */
	for (eventcnt = 0, uevent = esed->events;
	     !list_empty(head) && eventcnt < esed->maxevents;) {
		/* 取出第一個成員 */
		epi = list_first_entry(head, struct epitem, rdllink);
		/* 然后從鏈表里面移除 */
		list_del_init(&epi->rdllink);
		/* 讀取events, 
		 * 注意events我們ep_poll_callback()里面已經取過一次了, 為啥還要再取?
		 * 1. 我們當然希望能拿到此刻的最新數據, events是會變的~
		 * 2. 不是所有的poll實現, 都通過等待隊列傳遞了events, 有可能某些驅動壓根沒傳
		 * 必須主動去讀取. */
		revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
			epi->event.events;
		if (revents) {
			/* 將當前的事件和用戶傳入的數據都copy給用戶空間,
			 * 就是epoll_wait()后應用程序能讀到的那一堆數據. */
			if (__put_user(revents, &uevent->events) ||
			    __put_user(epi->event.data, &uevent->data)) {
				list_add(&epi->rdllink, head);
				return eventcnt ? eventcnt : -EFAULT;
			}
			eventcnt++;
			uevent++;
			if (epi->event.events & EPOLLONESHOT)
				epi->event.events &= EP_PRIVATE_BITS;
			else if (!(epi->event.events & EPOLLET)) {
				/* 嘿嘿, EPOLLET和非ET的區別就在這一步之差呀~
				 * 如果是ET, epitem是不會再進入到readly list,
				 * 除非fd再次發生了狀態改變, ep_poll_callback被調用.
				 * 如果是非ET, 不管你還有沒有有效的事件或者數據,
				 * 都會被重新插入到ready list, 再下一次epoll_wait
				 * 時, 會立即返回, 并通知給用戶空間. 當然如果這個
				 * 被監聽的fds確實沒事件也沒數據了, epoll_wait會返回一個0,
				 * 空轉一次.
				 */
				list_add_tail(&epi->rdllink, &ep->rdllist);
			}
		}
	}
	return eventcnt;
}
/* ep_free在epollfd被close時調用,
 * 釋放一些資源而已, 比較簡單 */
static void ep_free(struct eventpoll *ep)
{
	struct rb_node *rbp;
	struct epitem *epi;
	/* We need to release all tasks waiting for these file */
	if (waitqueue_active(&ep->poll_wait))
		ep_poll_safewake(&ep->poll_wait);
	/*
	 * We need to lock this because we could be hit by
	 * eventpoll_release_file() while we're freeing the "struct eventpoll".
	 * We do not need to hold "ep->mtx" here because the epoll file
	 * is on the way to be removed and no one has references to it
	 * anymore. The only hit might come from eventpoll_release_file() but
	 * holding "epmutex" is sufficent here.
	 */
	mutex_lock(&epmutex);
	/*
	 * Walks through the whole tree by unregistering poll callbacks.
	 */
	for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) {
		epi = rb_entry(rbp, struct epitem, rbn);
		ep_unregister_pollwait(ep, epi);
	}
	/*
	 * Walks through the whole tree by freeing each "struct epitem". At this
	 * point we are sure no poll callbacks will be lingering around, and also by
	 * holding "epmutex" we can be sure that no file cleanup code will hit
	 * us during this operation. So we can avoid the lock on "ep->lock".
	 */
	/* 之所以在關閉epollfd之前不需要調用epoll_ctl移除已經添加的fd,
	 * 是因為這里已經做了... */
	while ((rbp = rb_first(&ep->rbr)) != NULL) {
		epi = rb_entry(rbp, struct epitem, rbn);
		ep_remove(ep, epi);
	}
	mutex_unlock(&epmutex);
	mutex_destroy(&ep->mtx);
	free_uid(ep->user);
	kfree(ep);
}
/* File callbacks that implement the eventpoll file behaviour */
static const struct file_operations eventpoll_fops = {
	.release	= ep_eventpoll_release,
	.poll		= ep_eventpoll_poll
};
/* Fast test to see if the file is an evenpoll file */
static inline int is_file_epoll(struct file *f)
{
	return f->f_op == &eventpoll_fops;
}
/* OK, eventpoll我認為比較重要的函數都注釋完了... */

4.1 epoll_create

從slab緩存中創建一個eventpoll對象,并且創建一個匿名的fd跟fd對應的file對象, 而eventpoll對象保存在struct file結構的private指針中,并且返回, 該fd對應的file operations只是實現了poll跟release操作。

創建eventpoll對象的初始化操作,獲取當前用戶信息,是不是root,最大監聽fd數目等并且保存到eventpoll對象中 初始化等待隊列,初始化就緒鏈表,初始化紅黑樹的頭結點。

4.2 epoll_ctl操作

將epoll_event結構拷貝到內核空間中,并且判斷加入的fd是否支持poll結構(epoll,poll,selectI/O多路復用必須支持poll操作),并且從epfd->file->privatedata獲取event_poll對象,根據op區分是添加刪除還是修改, 首先在eventpoll結構中的紅黑樹查找是否已經存在了相對應的fd,沒找到就支持插入操作,否則報重復的錯誤,相對應的修改,刪除比較簡單就不啰嗦了。

插入操作時,會創建一個與fd對應的epitem結構,并且初始化相關成員,比如保存監聽的fd跟file結構之類的,重要的是指定了調用poll_wait時的回調函數用于數據就緒時喚醒進程,(其內部,初始化設備的等待隊列,將該進程注冊到等待隊列)完成這一步, 我們的epitem就跟這個socket關聯起來了, 當它有狀態變化時, 會通過ep_poll_callback()來通知,最后調用加入的fd的file operation->poll函數(最后會調用poll_wait操作)用于完成注冊操作,最后將epitem結構添加到紅黑樹中。

4.3 epoll_wait操作

計算睡眠時間(如果有),判斷eventpoll對象的鏈表是否為空,不為空那就干活不睡明.并且初始化一個等待隊列,把自己掛上去,設置自己的進程狀態,為可睡眠狀態.判斷是否有信號到來(有的話直接被中斷醒來,),如果啥事都沒有那就調用schedule_timeout進行睡眠,如果超時或者被喚醒,首先從自己初始化的等待隊列刪除,然后開始拷貝資源給用戶空間了,拷貝資源則是先把就緒事件鏈表轉移到中間鏈表,然后挨個遍歷拷貝到用戶空間, 并且挨個判斷其是否為水平觸發,是的話再次插入到就緒鏈表。

五、epoll使用實例:TCP服務器處理多個客戶端請求

5.1 epoll創建

int epoll_create(int size); //監聽個數

5.2 epoll事件設置

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

第一個參數epfd是epoll_create()的返回值。

第二個參數op表示動作,用三個宏來表示:

EPOLL_CTL_ADD:注冊新的fd到epfd中;
EPOLL_CTL_MOD:修改已經注冊的fd的監聽事件;
EPOLL_CTL_DEL:從epfd中刪除一個fd;

第三個參數是需要監聽的fd。

第四個參數是告訴內核需要監聽什么事。

struct epoll_event結構如下:

struct epoll_event {  
    __uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
};

events可以是以下幾個宏的集合:

  • EPOLLIN :表示對應的文件描述符可以讀(包括對端SOCKET正常關閉);
  • EPOLLOUT:表示對應的文件描述符可以寫;
  • EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這里應該表示有帶外數據到來);
  • EPOLLERR:表示對應的文件描述符發生錯誤;
  • EPOLLHUP:表示對應的文件描述符被掛斷;
  • EPOLLET:將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對于水平觸發(Level Triggered)來說的。
  • EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之后,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列里

5.3 epoll監聽

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
  • 等待事件的產生,類似于select()調用。
  • 參數events用來從內核得到事件的集合,
  • maxevents告之內核這個events有多大,這個maxevents的值不能大于創建epoll_create()時的size,
  • 參數timeout是超時時間(毫秒,0會立即返回,-1將不確定,也有說法說是永久阻塞)。
  • 該函數返回需要處理的事件數目,如返回0表示已超時。

5.4 編程實例測試

本次測試在上篇Unix域socket通信代碼的基礎上進行修改,只使用TCP方式的socket通信進行測試。上篇的測試代碼,服務端接收到一個客戶端的連接后,就僅對該客戶端進行服務,沒有再接收其它客戶端的處理邏輯,本篇要實現的,就是一個服務端,能夠接收多個客戶端的數據。

編程之前,先來看下要實現的程序結構,其中黃色的部分為本篇在上篇例程的基礎上,需要增加的部分:

圖片圖片


①為socket服務端增加epoll監聽功能,TCP服務端的代碼修改后如下,主要的修改在listen之后,創建一個epoll,然后把服務端的socketfd加入epoll進行監聽:

當有新的客戶端請求連接時,服務端的socketfd會收到事件,進而epoll會收到服務端socketfd的EPOLLIN事件,此時可以讓服務端接受客戶端的請求,并把創建的客戶端fd也加入到epoll進行監聽

當客戶端連接成功并被epoll監聽后,客戶端再發消息過來,epoll就會收到對應客戶端fd的EPOLLIN事件,此時可以讓服務端讀取客戶端的消息

#define LISTEN_MAX     5
#define EPOLL_FDSIZE   LISTEN_MAX
#define EPOLL_EVENTS   20
#define CLIENT_NUM     3

void EpollAddEvent(int epollfd, int fd, int event)
{
    PRINT("epollfd:%d add fd:%d(event:%d)\n", epollfd, fd, event);
    struct epoll_event ev;
    ev.events = event;
    ev.data.fd = fd;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}

void TcpServerThread()
{
	//------------socket
	int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
	if (sockfd < 0)
	{
		PRINT("create socket fail\n");
		return;
	}
	PRINT("create socketfd:%d\n", sockfd);

	struct sockaddr_un addr;
	memset (&addr, 0, sizeof(addr));
	addr.sun_family = AF_UNIX;
	strcpy(addr.sun_path, UNIX_TCP_SOCKET_ADDR);

	//------------bind
	if (bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)))
	{
		PRINT("bind fail\n");
		return;
	}
	PRINT("bind ok\n");

	//------------listen
	if (listen(sockfd, LISTEN_MAX))
	{
		PRINT("listen fail\n");
		return;
	}
	PRINT("listen ok\n");

	//------------epoll---------------
	int epollfd = epoll_create(EPOLL_FDSIZE);
	if (epollfd < 0)
	{
		PRINT("epoll create fail\n");
		return;
	}
	PRINT("epoll create fd:%d\n", epollfd);

	EpollAddEvent(epollfd, sockfd, EPOLLIN);

	struct epoll_event events[EPOLL_EVENTS];
	while(1)
	{
		PRINT("epoll wait...\n");
		int num = epoll_wait(epollfd, events, EPOLL_EVENTS, -1);
		PRINT("epoll wait done, num:%d\n", num);
		for (int i = 0;i < num;i++)
		{
			int fd = events[i].data.fd;
			if (EPOLLIN == events[i].events)
			{
				//接受客戶端的連接請求
				if (fd == sockfd)
				{
					//------------accept
					int clientfd = accept(sockfd, NULL, NULL);
					if (clientfd == -1)
					{
						PRINT("accpet error\n");
					}
					else
					{
						PRINT("=====> accept new clientfd:%d\n", clientfd);
						
						EpollAddEvent(epollfd, clientfd, EPOLLIN);
					}
				}
				//讀取客戶端發來的數據
				else
				{
					char buf[BUF_SIZE] = {0};
					//------------recv
					size_t size = recv(fd, buf, BUF_SIZE, 0);
					//size = read(clientfd, buf, BUF_SIZE);
					if (size > 0)
					{
						PRINT("recv from clientfd:%d, msg:%s\n", fd, buf);
					}
				}
			}
		}
	}

	PRINT("end\n");
}

⑵啟動多個客戶端進行測試,修改主程序,創建多個客戶端線程,產生多個客戶端,去連接同一個服務端,來測試epoll監聽多個事件的功能。

int main()
{
	unlink(UNIX_TCP_SOCKET_ADDR);

	//創建一個服務端
	thread thServer(TcpServerThread);

	//創建多個客戶端
	thread thClinet[CLIENT_NUM];
	for (int i=0; i<CLIENT_NUM; i++)
	{
		thClinet[i] = thread(TcpClientThread);
		sleep(1);
	}

	while(1)
	{
		sleep(5);
	}
}

本例中,CLIENT_NUM為3,使用3個客戶端來測試epoll功能。

⑶測試結果,在Ubuntu上編譯運行,程序運行時的打印如下:

[TcpServerThread] create socketfd:3
[TcpServerThread] bind ok
[TcpClientThread] create socketfd:4
[TcpServerThread] listen ok
[TcpServerThread] epoll create fd:5
[EpollAddEvent] epollfd:5 add fd:3(event:1)
[TcpServerThread] epoll wait...
[TcpClientThread] create socketfd:6
[TcpClientThread] connect ok
[TcpServerThread] epoll wait done, num:1
[TcpServerThread] =====> accept new clientfd:7
[EpollAddEvent] epollfd:5 add fd:7(event:1)
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:1
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)1
[TcpServerThread] epoll wait...
[TcpClientThread] create socketfd:8
[TcpClientThread] connect ok
[TcpServerThread] epoll wait done, num:2
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)2
[TcpServerThread] =====> accept new clientfd:9
[EpollAddEvent] epollfd:5 add fd:9(event:1)
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:1
[TcpServerThread] recv from clientfd:9, msg:helloTCP(fd:6)3
[TcpServerThread] epoll wait...
[TcpClientThread] connect ok
[TcpServerThread] epoll wait done, num:3
[TcpServerThread] =====> accept new clientfd:10
[EpollAddEvent] epollfd:5 add fd:10(event:1)
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)5
[TcpServerThread] recv from clientfd:9, msg:helloTCP(fd:6)6
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:1
[TcpServerThread] recv from clientfd:10, msg:helloTCP(fd:8)4
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:3
[TcpServerThread] recv from clientfd:10, msg:helloTCP(fd:8)7
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)8
[TcpServerThread] recv from clientfd:9, msg:helloTCP(fd:6)9
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:1
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)10
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:2
[TcpServerThread] recv from clientfd:9, msg:helloTCP(fd:6)12
[TcpServerThread] recv from clientfd:10, msg:helloTCP(fd:8)11
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:3
[TcpServerThread] recv from clientfd:10, msg:helloTCP(fd:8)14
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)13
[TcpServerThread] recv from clientfd:9, msg:helloTCP(fd:6)15
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:3
[TcpServerThread] recv from clientfd:10, msg:helloTCP(fd:8)16
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)17
[TcpServerThread] recv from clientfd:9, msg:helloTCP(fd:6)18
[TcpServerThread] epoll wait...

對結果標注一下,更容易理解程序運行過程:

圖片

可以看到,服務端依次接受了3個客戶端的連接請求,然后可以接收3個客戶端發來的數據。

責任編輯:武曉燕 來源: 深度Linux
相關推薦

2024-08-08 14:57:32

2023-05-08 00:06:45

Go語言機制

2021-03-17 16:53:51

IO多路

2021-05-31 06:50:47

SelectPoll系統

2025-03-07 10:14:03

2021-02-10 08:09:48

Netty網絡多路復用

2023-12-06 07:16:31

Go語言語句

2023-03-08 17:54:29

802.1x協議無線網絡

2025-05-08 08:01:05

2021-03-24 08:03:38

NettyJava NIO網絡技術

2020-10-13 07:51:03

五種IO模型

2023-11-08 09:22:14

I/ORedis阻塞

2023-03-01 14:32:31

redisIOEpoll

2017-12-21 14:48:43

數據安全數據泄漏檢測技術

2019-12-23 14:53:26

IO復用

2021-06-09 19:25:13

IODubbo

2020-11-04 07:49:04

Select

2025-06-06 00:33:00

2023-03-06 21:29:41

mmap技術操作系統

2023-12-26 12:18:02

Java設計開發
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产成人综合亚洲欧美94在线 | 亚洲一区国产 | 国产十日韩十欧美 | 九九伦理电影 | 亚洲成人日韩 | 日韩一区二区免费视频 | 91一区二区三区 | 欧美一区二区三区久久精品视 | 国产丝袜一区二区三区免费视频 | 91精品国产乱码久久久久久久 | 在线视频亚洲 | 久久另类| 青青久在线视频 | 91免费小视频 | 日韩免费福利视频 | 欧美一区二区三区日韩 | 一区在线免费视频 | 日韩成人精品在线 | 久久久久国产精品一区二区 | 五月天天丁香婷婷在线中 | 国产一区二区三区在线 | 国产免费一区二区三区网站免费 | 国产高清视频 | 婷婷色成人 | 久久久久国产精品一区 | 成人精品一区二区三区中文字幕 | 欧美精品一区二区三区蜜桃视频 | 欧美日韩中文字幕在线 | 影音先锋中文字幕在线观看 | 国产精品高清在线 | 国产免费一区二区 | 精品一区二区三区四区五区 | 两性午夜视频 | 国产福利91精品 | 在线免费观看视频你懂的 | 亚洲欧美一区二区三区情侣bbw | 精品欧美一区二区精品久久久 | 天堂网av在线 | 午夜久久久 | 视频精品一区二区三区 | 在线观看亚洲专区 |