閱讀一個分布式框架,這些必備的 NIO 知識你要知道
本文轉載自微信公眾號「KK架構師」,作者KK架構師。轉載本文請聯系KK架構師公眾號。
一、開篇
閱讀一個分布式開源項目的時候,最重要的就是了解這個項目的通信框架。
因為一個分布式的開源框架,通常是集群部署的,不同的節點和節點之間需要相互通信來完成復雜的功能,而閱讀到這些源碼的時候,如果不了解它通信機制的話,就會迷失在代碼里,像走進了一片原始森林。
比如 HDFS ,使用的通信框架是自己封裝的 Hadoop Rpc;Spark 底層通信就是用的 Netty;而最近閱讀的 Kafka 源碼,底層使用的是原生的 Java NIO。
所以本次,我們來聊一聊 Java NIO 的那些主要的知識點。
二、多圖弄懂 NIO 三大核心概念
談到 NIO,就會有三個核心的概念:通道、緩沖、選擇器。
直接開門見山,或許聽起來會有點迷茫,我們需要從頭開始說。
1、通道
以前在并發要求不是很高的情況下,是 CPU 來全權處理輸入輸出的(中斷),如下圖:
用戶程序向服務端發起讀寫請求,cpu 直接處理這些請求。這樣有一個弊端,當 IO 請求非常多的時候,會大量占用 CPU,使得整個系統的處理能力會下降。
隨著計算機的發展,出現了一種新的方式,使用 DMA 來全權處理 IO 請求,如下圖:
DMA 是 Direct Memory Access,直接內存訪問控制。
為什么要增加這個設備呢?是因為 CPU 中斷方式不能滿足數據傳輸速度的要求,因為在中斷方式下,每次中斷需要保存斷點和現場,中斷返回時,要恢復斷點和現場。
所有這些原因,使得中斷方式難以滿足高速外設對傳輸速度的要求。
所以,就有了 DMA 這樣的設備,在 DMA 方式的數據傳輸過程中,當 I/O 設備需要進行數據傳送時,通過 DMA 控制器向 CPU 提出 DMA 傳送請求,CPU 響應之后將讓出系統總線,由 DMA 控制器接管總線進行數據傳輸,而此時 CPU 除了做一些初始化操作之外,可以去做自己的事情。
但是有了 DMA,仍然滿足不了業務快速發展的需要,因為當 I/O 請求過多時,會出現總線沖突的問題。
所以后面就出現了通道(Channel),它和 DMA 不同的地方是,通道有自己的指令系統和程序,是一個協處理器;而 DMA 只能實現固定的數據傳送控制。
而 Java NIO 中的 Channel ,就是對上圖中通道的實現。
2、緩沖
理解了通道的概念,緩沖區也很好理解了。
通道表示打開到 I/O 設備的(例如:文件、套接字)的連接,但是通道本身并不存儲數據。真正作為數據傳輸載體的是緩沖區。
當應用程序要寫數據時,需要先把數據寫到緩沖區里,然后由通道負責把緩沖區的數據發送到目的地(文件、磁盤、網絡),然后再從緩沖區把數據取出來。
若需要使用 NIO 系統,需要獲取用于連接 I/O 設備的通道以及用于容納數據的緩沖區,然后操作緩沖區,對數據進行處理。
3、選擇器
選擇器也叫做多路復用器,是一種非阻塞式的 I/O 。既然談到了非阻塞式,必然要先談談阻塞式。阻塞式如下圖所示:
客戶端向服務端發出一個讀寫請求時,服務端的線程會一直看內核地址空間是否有數據了。
客戶端沒有數據發送過來時,服務端的線程會一直等待,在此期間是什么事情都做不了的。
直到客戶端有數據發送過來,會把數據從內核地址空間拷貝到用戶地址空間,然后才讀取到了數據的。
這就導致如果有大量的請求過來,后面的請求要等待前面的請求執行完畢,會造成大量的排隊,無法充分利用 cpu 資源,性能就會急劇下降。
再看看選擇器是如何工作的。
現在客戶端服務端之間通信是用通道+緩沖區的,那么所有的通道都會注冊到選擇器上來。選擇器會監控這些通道的 I/O 狀態,比如連接、讀、寫的情況。
當某一個通道上的某個事件完全就緒時,選擇器才會把這個任務分配到服務端的一個或者多個線程上。
當客戶端沒有事件準備好時,服務端的線程是不會阻塞的,它可以做自己的事情,直到客戶端事件就緒,才會去處理。
這種非阻塞式相比較阻塞式,可以進一步的利用 cpu 資源。
三、理解了概念,再來學 API
1、緩沖區的 API
要徹底理解緩沖區,必須知道緩沖區的四個屬性,mark,position,limit,capacity,只需要跑一遍代碼就知道了。
(1)分配一定大小的緩沖區
- //1.分配一個指定大小的緩沖區
- ByteBuffer buffer = ByteBuffer.allocate(10);
- System.out.println("---------alocate");
- System.out.println("position:" + buffer.position());
- System.out.println("limit:" + buffer.limit());
- System.out.println("capacity:" + buffer.capacity());
運行結果:
- ---------alocate-----------
- position:0
- limit:10
- capacity:10
這里我們分配了 10 個字節的緩沖區,也就是在 ByteBuffer 的 final byte[] hb; 屬性上開辟了 10 個字節的空間。
所以容量 capacity 為 10 , limit 可讀寫數據的最大位置 也是 10 ,position 為可以操作數據的位置為 0 。
(2)往緩沖區寫數據
- // 2.寫入數據到緩沖區
- String str = "abcde";
- System.out.println("------------put------------");
- buffer.put(str.getBytes(StandardCharsets.UTF_8));
- System.out.println("position:" + buffer.position());
- System.out.println("limit:" + buffer.limit());
- System.out.println("capacity:" + buffer.capacity());
運行結果:
- ------------put------------
- position:5
- limit:10
- capacity:10
這里我們往緩沖區寫了 5 個字節的數據,那么 capacity 和 limit 都還是10,但是 position 為 5 了,因為前面已經寫入了 5 個了
(3)切換成讀數據的模式
- // 3.切換成讀數據的模式
- buffer.flip();
- System.out.println("------------flip------------");
- System.out.println("position:" + buffer.position());
- System.out.println("limit:" + buffer.limit());
- System.out.println("capacity:" + buffer.capacity());
那我們現在想從緩沖區讀取一些數據出來,就需要切換成 flip 模式,flip 會改變一些屬性的值
運行結果:
- ------------flip------------
- position:0
- limit:5
- capacity:10
flip 會改變 position 的值為 0 ,并且 limit 為5,表示我要從頭開始讀,并且只能讀到 5 的位置
(4)讀取一些數據
- // 4. 讀取數據
- System.out.println("------------get------------");
- byte[] dest = new byte[buffer.limit()];
- buffer.get(dest);
- System.out.println(new String(dest,0,dest.length));
- System.out.println("position:" + buffer.position());
- System.out.println("limit:" + buffer.limit());
- System.out.println("capacity:" + buffer.capacity());
運行結果:
- ------------get------------
- abcde
- position:5
- limit:5
- capacity:10
讀取了數據之后,position 就變成 5 了,表示我已經讀取到 5 了。
(5)重復讀
- //5.rewind()
- buffer.rewind();
- System.out.println("------------rewind------------");
- System.out.println("position:" + buffer.position());
- System.out.println("limit:" + buffer.limit());
- System.out.println("capacity:" + buffer.capacity());
運行結果:
- ------------rewind------------
- position:0
- limit:5
- capacity:10
rewind 表示重復讀取 buffer 里面的數據
(6)清除數據
- //6.clear()
- buffer.clear();
- System.out.println("------------clear------------");
- System.out.println("position:" + buffer.position());
- System.out.println("limit:" + buffer.limit());
- System.out.println("capacity:" + buffer.capacity());
運行結果:
- ------------clear------------
- position:0
- limit:10
- capacity:10
clear() 之后,position 回到了 0 ,limit 回到了 10,又可以重頭開始寫數據了,能寫 10 個字節。
但是要注意的是,緩沖里面的數據并沒有清空掉,數據還在里面,處于被“遺忘”狀態。這幾個指針回到了最初的狀態。
(7)標記
這是第四個屬性:mark。
mark 可以記錄 position 的位置。可以通過 reset() 方法回到 mark 的位置。
- @Test
- public void test2() {
- // 分配 10 個字節
- String str = "abcde";
- ByteBuffer buffer = ByteBuffer.allocate(10);
- buffer.put(str.getBytes(StandardCharsets.UTF_8));
- // 切換到讀模式,讀取 2 個字節
- buffer.flip();
- byte[] dest = new byte[buffer.limit()];
- buffer.get(dest, 0, 2);
- System.out.println(new String(dest, 0, 2));
- System.out.println(buffer.position());
- // mark 一下記錄當前位置
- buffer.mark();
- // 又讀取兩個字節
- buffer.get(dest, 2, 2);
- System.out.println(new String(dest, 2, 2));
- System.out.println(buffer.position());
- // reset,回到 mark 的位置
- buffer.reset();
- System.out.println(buffer.position());
- }
- 執行結果:
- ```tex
- ab
- 2
- cd
- 4
- 2
2、使用通道、緩沖區、選擇器完成一個網絡程序
(1)服務端
- @Test
- public void testServer() throws IOException {
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
- serverSocketChannel.bind(new InetSocketAddress(8989));
- Selector selector = Selector.open();
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- while (selector.select() > 0) {
- Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
- while (iterator.hasNext()) {
- SelectionKey key = iterator.next();
- if (key.isAcceptable()) {
- SocketChannel socketChannel = serverSocketChannel.accept();
- socketChannel.configureBlocking(false);
- socketChannel.register(selector, SelectionKey.OP_READ);
- } else if (key.isReadable()) {
- SocketChannel channel = (SocketChannel) key.channel();
- ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
- int len = 0;
- while ((len = channel.read(byteBuffer)) > 0) {
- byteBuffer.flip();
- System.out.println(new String(byteBuffer.array(), 0, len));
- byteBuffer.clear();
- }
- }
- }
- iterator.remove();
- }
- }
1、首先使用 ServerSocketChannel.open(),打開一個通道,設置成非阻塞模式;
2、綁定到 8989 端口上;
3、把通道注冊到選擇器上;
4、while 循環,選擇器上是否有事件,如果事件是客戶端的連接事件,則打開一個 SocketChannel,注冊成非阻塞模式,并且往選擇器上注冊一個讀數據的事件;
5、當客戶端發送數據過來的時候,就可以打開一個通道,讀取緩沖區上的數據;
6、并且此時,服務端是可以同時接受多個客戶端的請求的。
(2)客戶端
- @Test
- public void testClient() throws IOException {
- SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8989));
- socketChannel.configureBlocking(false);
- ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
- byteBuffer.put(new Date().toString().getBytes(StandardCharsets.UTF_8));
- byteBuffer.flip();
- socketChannel.write(byteBuffer);
- byteBuffer.clear();
- socketChannel.close();
- }
1、客戶端打開一個 SocketChannel,配置成非阻塞模式;
2、使用 ByteBuffer 發送數據(注意發送之前,要 flip);
3、關閉通道。
四、總結
本次我們初步探究了一下 Java NIO 的幾個核心概念,通道、緩沖區、選擇器。
但是你要知道,這是冰山一角,通道和選擇器如果要深究的話,會涉及到操作系統底層和很多計算機組成原理的知識。
比如選擇器就涉及到了 select,poll,epoll 的概念,這幾個概念如果再打開的話,還會牽涉到硬件中斷,內核的一些知識。
所以學海無涯苦作舟,越來越對這句話感同身受。