深度長文:從Bio到Nio到Aio,再到響應式編程
本文轉載自微信公眾號「小姐姐味道」,作者小姐姐養的狗 。轉載本文請聯系小姐姐味道公眾號。
要問計算機系統里,有哪些概念比較折騰人,nio絕對能算上一個。配合著多是異的網絡編程,nio加上多線程一般能夠完成雙殺。
Linux有5種常見的IO模型。其中,阻塞IO就是bio,IO復用就是nio,異步IO就是aio,我們本篇文章就聚焦于此。
- 阻塞式IO (bio)
- 非阻塞式IO
- IO復用 (nio)
- 信號驅動式IO
- 異步IO(aio)
在網絡編程中,Reactor模型是必須要了解的。現在,大多數與IO相關的組件,都會使用Reactor模型,比如Tomcat、Redis、Nginx等,可見Reactor應用的廣泛性。
Reactor是NIO的基礎。為什么NIO的性能就能夠比傳統的阻塞IO性能高呢?我們首先來看一下傳統阻塞式IO的一些特點。
1.阻塞IO模型
如上圖,是典型的BIO模型,每當有一個連接到來,經過協調器的處理,就開啟一個對應的線程進行接管。如果連接有1000條,那就需要1000個線程。線程資源是非常昂貴的,除了占用大量的內存,還會占用非常多的CPU調度時間,所以BIO在連接非常多的情況下,效率會變得非常低。
下面的代碼是使用ServerSocket實現的一個簡單socket服務器,監聽在8888端口。
- public class BIO {
- static boolean stop = false;
- public static void main(String[] args) throws Exception {
- int connectionNum = 0;
- int port = 8888;
- ExecutorService service = Executors.newCachedThreadPool();
- ServerSocket serverSocket = new ServerSocket(port);
- while (!stop) {
- if (10 == connectionNum) {
- stop = true;
- }
- Socket socket = serverSocket.accept();
- service.execute(() -> {
- try {
- Scanner scanner = new Scanner(socket.getInputStream());
- PrintStream printStream = new PrintStream(socket.getOutputStream());
- while (!stop) {
- String s = scanner.next().trim();
- printStream.println("PONG:" + s);
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- });
- connectionNum++;
- }
- service.shutdown();
- serverSocket.close();
- }
- }
啟動之后,使用nc命令進行連接測試,結果如下。
- $ nc -v localhost 8888
- Connection to localhost port 8888 [tcp/ddi-tcp-1] succeeded!
- hello
- PONG:hello
- nice
- PONG:nice
可以看到,BIO的讀寫操作是阻塞的,線程的整個生命周期和連接的生命周期是一樣的,而且不能夠被復用。
就單個阻塞IO來說,它的效率并不比NIO慢。但是當服務的連接增多,考慮到整個服務器的資源調度和資源利用率等因素,NIO就有了顯著的效果,NIO非常適合高并發場景。
2.非阻塞IO模型
其實,在處理IO動作時,有大部分時間是在等待。比如,socket連接要花費很長時間進行連接操作,在完成連接的這段時間內,它并沒有占用額外的系統資源,但它只能阻塞等待在線程中。這種情況下,系統資源并不能被合理的利用。
Java的NIO,在Linux上底層是使用epoll實現的。epoll是一個高性能的多路復用I/O工具,改進了select和poll等工具的一些功能。在網絡編程中,對epoll概念的一些理解,幾乎是面試中必問的問題。
epoll的數據結構是直接在內核上進行支持的。通過epoll_create和epoll_ctl等函數的操作,可以構造描述符(fd)相關的事件組合(event)。
這里有兩個比較重要的概念:
- fd 每條連接、每個文件,都對應著一個描述符,比如端口號。內核在定位到這些連接的時候,就是通過fd進行尋址的
- event 當fd對應的資源,有狀態或者數據變動,就會更新epoll_item結構。在沒有事件變更的時候,epoll就阻塞等待,也不會占用系統資源;一旦有新的事件到來,epoll就會被激活,將事件通知到應用方
關于epoll還會有一個面試題:相對于select,epoll有哪些改進?這里直接給出答案:
- epoll不再需要像select一樣對fd集合進行輪詢,也不需要在調用時將fd集合在用戶態和內核態進行交換
- 應用程序獲得就緒fd的事件復雜度,epoll時O(1),select是O(n)
- select最大支持約1024個fd,epoll支持65535個
- select使用輪詢模式檢測就緒事件,epoll采用通知方式,更加高效
我們還是以Java中的NIO代碼為例,來看一下NIO的具體概念。
- public class NIO {
- static boolean stop = false;
- public static void main(String[] args) throws Exception {
- int connectionNum = 0;
- int port = 8888;
- ExecutorService service = Executors.newCachedThreadPool();
- ServerSocketChannel ssc = ServerSocketChannel.open();
- ssc.configureBlocking(false);
- ssc.socket().bind(new InetSocketAddress("localhost", port));
- Selector selector = Selector.open();
- ssc.register(selector, ssc.validOps());
- while (!stop) {
- if (10 == connectionNum) {
- stop = true;
- }
- int num = selector.select();
- if (num == 0) {
- continue;
- }
- Iterator<SelectionKey> events = selector.selectedKeys().iterator();
- while (events.hasNext()) {
- SelectionKey event = events.next();
- if (event.isAcceptable()) {
- SocketChannel sc = ssc.accept();
- sc.configureBlocking(false);
- sc.register(selector, SelectionKey.OP_READ);
- connectionNum++;
- } else if (event.isReadable()) {
- try {
- SocketChannel sc = (SocketChannel) event.channel();
- ByteBuffer buf = ByteBuffer.allocate(1024);
- int size = sc.read(buf);
- if(-1==size){
- sc.close();
- }
- String result = new String(buf.array()).trim();
- ByteBuffer wrap = ByteBuffer.wrap(("PONG:" + result).getBytes());
- sc.write(wrap);
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- } else if (event.isWritable()) {
- SocketChannel sc = (SocketChannel) event.channel();
- }
- events.remove();
- }
- }
- service.shutdown();
- ssc.close();
- }
- }
上面這段代碼比較長,是使用NIO實現的和BIO相同的功能。從它的API設計上,我們就能夠看到epoll的一些影子。
首先,我們創建了一個服務端ssc,并開啟一個新的事件選擇器,監聽它的OP_ACCEPT事件。
- ServerSocketChannel ssc = ServerSocketChannel.open();
- Selector selector = Selector.open();
- ssc.register(selector, ssc.validOps());
共有4種事件類型。分別是新連接事件(OP_ACCEPT)、連接就緒事件(OP_CONNECT)、讀就緒事件(OP_READ)、寫就緒事件(OP_WRITE)。任何網絡和文件操作,都可以抽象成這四個事件。
接下來,在while循環里,使用select函數,阻塞在主線程里。所謂阻塞,就是操作系統不再分配CPU事件片到當前線程中,所以select函數是幾乎不占用任何系統資源的。
- int num = selector.select();
一旦有新的事件到達,比如有新的連接到來,主線程就能夠被調度到,程序就能夠向下執行。這時候,就能夠根據訂閱的事件通知,持續獲取訂閱的事件。
由于注冊到selector的連接和事件可能會有多個,所以這些事件也會有多個。我們使用安全的迭代器循環進行處理,在處理完畢之后,將它刪除。
如果事件不刪除的話,或者漏掉了某個事件的處理,會怎么樣呢?后果還是比較嚴重的,由于事件總是存在,我們的程序會陷入無休無止的循環之中。
- Iterator<SelectionKey> events = selector.selectedKeys().iterator();
- while (events.hasNext()) {
- SelectionKey event = events.next();
- ...
- events.remove();
- }
- }
有新的連接到達時,我們訂閱了更多的事件。對于我們的數據讀取來說,對應的事件就是OP_READ。和BIO編程面向流的方式不同,NIO操作的對象是抽象的概念Channel,通過緩沖區進行數據交換。
- SocketChannel sc = ssc.accept();
- sc.configureBlocking(false);
- sc.register(selector, SelectionKey.OP_READ);
值得注意的是:服務端和客戶端的實現方式,可以是不同的。比如,服務端是NIO,客戶端可以是BIO,它們并沒有什么強制要求。
另外一個面試時候經常問到的事件就是OP_WRITE。我們上面提到過,這個事件是表示寫就緒的,當底層的緩沖區有空閑,這個事件就會一直發生,浪費占用CPU資源。所以,我們一般是不注冊OP_WRITE的。
這里還有一個細節,在讀取數據的時候,并沒有像BIO的方式一樣使用循環來獲取數據。如下面的代碼,我們創建了一個1024字節的緩沖區,用于數據的讀取。如果連接中的數據,大于1024字節怎么辦?
- SocketChannel sc = (SocketChannel) event.channel();
- ByteBuffer buf = ByteBuffer.allocate(1024);
- int size = sc.read(buf);
這涉及到兩種事件的通知機制。
- 水平觸發 (level-triggered) 稱作LT模式。只要緩沖區有數據,事件就會一直發生
- 邊緣觸發 (edge-triggered) 稱作ET模式。緩沖區有數據,僅會觸發一次。事件想要再次觸發,必須先將fd中的數據讀完才行
可以看到,Java的NIO采用的就是水平觸發的方式。LT模式頻繁環喚醒線程,效率相比較ET模式低,所以Netty使用JNI的方式,實現了ET模式,效率上更高一些。
3.Reactor模式
了解了BIO和NIO的一些使用方式,Reactor模式就呼之欲出了。
NIO是基于事件機制的,有一個叫做Selector的選擇器,阻塞獲取關注的事件列表。獲取到事件列表后,可以通過分發器,進行真正的數據操作。
上圖是Doug Lea在講解NIO時候的一張圖,指明了最簡單的Reactor模型的基本元素。你可以對比這上面的NIO代碼分析一下,里面有四個主要元素:
- Acceptor 處理client的連接,并綁定具體的事件處理器
- Event 具體發生的事件
- Handler 執行具體事件的處理者。比如處理讀寫事件
- Reactor 將具體的事件分配給Handler
我們可以對上面的模型進行近一步細化,下面這張圖同樣是Doug Lea的ppt中的。它把Reactor部分分為mainReactor和subReactor兩部分。mainReactor負責監聽處理新的連接,然后將后續的事件處理交給subReactor,subReactor對事件處理的方式,也由阻塞模式變成了多線程處理,引入了任務隊列的模式。
熟悉Netty的同學可以看到,這個模型就是Netty設計的基礎。在Netty中,Boss線程對應著對連接的處理和分派,相當于mainReactor;Work線程 對應著subReactor,使用多線程負責讀寫事件的分發和處理。
這種模式將每個組件的職責分的更細,耦合度也更低,能有效的解決C10k問題。
4.AIO
關于NIO的概念,誤解還是比較多的。面試官可能會問你:為什么我在使用NIO的時候,使用Channel進行讀寫,socket的操作依然是阻塞的?NIO主要體現在哪里?
- //這行代碼是阻塞的
- int size = sc.read(buf);
答案就是,NIO只負責對發生在fd描述符上的事件進行通知。事件的獲取和通知部分是非阻塞的,但收到通知之后的操作,卻是阻塞的。即使使用多線程去處理這些事件,它依然是阻塞的。
AIO更近一步,將這些對事件的操作也變成非阻塞的。下面是一段典型的AIO代碼,它通過注冊CompletionHandler 回調函數進行事件處理。這里的事件是隱藏的,比如read函數,它不僅僅代表Channel可讀了,而且會把數據自動的讀取到ByteBuffer中。等完成了讀取,就會通過回調函數通知你,進行后續的操作。
- public class AIO {
- public static void main(String[] args) throws Exception {
- int port = 8888;
- AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
- ssc.bind(new InetSocketAddress("localhost", port));
- ssc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
- void job(final AsynchronousSocketChannel sc) {
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- sc.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
- @Override
- public void completed(Integer result, ByteBuffer attachment) {
- String str = new String(attachment.array()).trim();
- ByteBuffer wrap = ByteBuffer.wrap(("PONG:" + str).getBytes());
- sc.write(wrap, null, new CompletionHandler<Integer, Object>() {
- @Override
- public void completed(Integer result, Object attachment) {
- job(sc);
- }
- @Override
- public void failed(Throwable exc, Object attachment) {
- System.out.println("error");
- }
- });
- }
- @Override
- public void failed(Throwable exc, ByteBuffer attachment) {
- System.out.println("error");
- }
- });
- }
- @Override
- public void completed(AsynchronousSocketChannel sc, Object attachment) {
- ssc.accept(null, this);
- job(sc);
- }
- @Override
- public void failed(Throwable exc, Object attachment) {
- exc.printStackTrace();
- System.out.println("error");
- }
- });
- Thread.sleep(Integer.MAX_VALUE);
- }
- }
AIO是Java1.7加入的,理論上性能是會提升的,但它現在發展的不太好。那部分對數據進行自動讀取的操作,總得有地方實現,不在框架里,就得在內核里。Netty的NIO模型加上多線程處理,在這方面已經做的很好,編程模式也非常簡單。所以,市面上對AIO的實踐并不多,在采用技術選型的時候,一定要謹慎。
5.響應式編程
你可能聽說過Spring5的webflux,webflux是可以替代spring mvc的一套解決方案,可以編寫響應式的應用,兩者之間的關系可以看下圖。它的底層使用的是netty,所以操作是異步非阻塞的。類似的組件還有vert.x、akka、rxjava等。
webflux是運行在project reactor之上的一個封裝,其根本特性是由后者提供的。至于再底層的非阻塞模型,就是由Netty保證的了。
非阻塞的特性我們可以理解,響應式又是什么概念呢?
響應式編程是一種面向數據流和變化傳播的編程范式。這意味著可以在編程語言中很方便地表達靜態或動態的數據流,而相關的計算模型會自動將變化的值通過數據流進行傳播。
這段話很晦澀,在編程方面,它表達的意思是:把生產者消費者模式,使用簡單的API表示出來,并自動處理背壓(backpressure)問題。
背壓,指的是生產者與消費者之間的流量控制。通過將操作全面異步化,來減少無效的等待和資源消耗。
Java的lambda表達式可以承擔簡單這個職責,Java9更是引入了響應式流(Reactive Stream),方便了我們的操作。比如,下面是Spring Cloud GateWay的Fluent API寫法,響應式編程的API都是類似的。
- public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {
- return builder.routes()
- .route(r -> r.path("/market/**")
- .filters(f -> f.filter(new RequestTimeFilter())
- .addResponseHeader("X-Response-Default-Foo", "Default-Bar"))
- .uri("http://localhost:8080/market/list")
- .order(0)
- .id("customer_filter_router")
- )
- .build();
- }
從傳統的開發模式過渡到reactor的開發模式,是有一定成本的,不過它確實能夠提高我們應用程序的性能。具體用不用,就要看在編程難度和性能之間的取舍了。
小結
從上面的描述,我們了解到,BIO的線程模型是一個連接對應一個線程的,非常的浪費資源;NIO通過對關鍵事件的監聽,通過主動通知的方式完成非阻塞操作,但它對事件本身的處理依然是非阻塞的;AIO完全是異步非阻塞的,但現實中使用很少。
使用Netty的多Acceptor模式和多線程模式,我們能夠方便的完成類似AIO這樣的操作。Netty的事件觸發機制使用了高效的ET模式,使得支持的連接更多,性能更高。
使用Netty,能夠構建響應式編程的基礎,加上類似lambda表達式這樣的書寫風格,能夠完成類似WebFlux這樣的響應式框架。響應式編程是一個趨勢,現在有越來越多的框架和底層的數據庫支持響應式編程,我們的應用響應也會更加迅速。
作者簡介:小姐姐味道 (xjjdog),一個不允許程序員走彎路的公眾號。聚焦基礎架構和Linux。十年架構,日百億流量,與你探討高并發世界,給你不一樣的味道。