Java三種IO模型,一次搞懂!
大家好,我是老三,上一節我們討論了Linux的五種IO模型,接下來,我們從Java語言層面,來看看對IO的實現。
在Java中,一共有三種IO模型,分別是阻塞IO(BIO)、非阻塞IO(NIO)和異步IO(AIO)。
Linux五種IO模型和Java三種IO模型
Java BIO
Java BIO就是Java的傳統IO模型,對應了操作系統IO模型里的阻塞IO。
Java BIO相關的實現都位于java.io包下,其通信原理是客戶端、服務端之間通過Socket套接字建立管道連接,然后從管道中獲取對應的輸入/輸出流,最后利用輸入/輸出流對象實現發送/接收信息。
我們來看個Demo:
- BioServer:
/**
* @Author 三分惡
* @Date 2023/4/30
* @Description BIO服務端
*/
public class BioServer {
public static void main(String[] args) throws IOException {
//定義一個ServerSocket服務端對象,并為其綁定端口號
ServerSocket server = new ServerSocket(8888);
System.out.println("===========BIO服務端啟動================");
//對BIO來講,每個Socket都需要一個Thread
while (true) {
//監聽客戶端Socket連接
Socket socket = server.accept();
new BioServerThread(socket).start();
}
}
/**
* BIO Server線程
*/
static class BioServerThread extends Thread{
//socket連接
private Socket socket;
public BioServerThread(Socket socket){
this.socket=socket;
}
@Override
public void run() {
try {
//從socket中獲取輸入流
InputStream inputStream=socket.getInputStream();
//轉換為
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(inputStream));
String msg;
//從Buffer中讀取信息,如果讀取到信息則輸出
while((msg=bufferedReader.readLine())!=null){
System.out.println("收到客戶端消息:"+msg);
}
//從socket中獲取輸出流
OutputStream outputStream=socket.getOutputStream();
PrintStream printStream=new PrintStream(outputStream);
//通過輸出流對象向客戶端傳遞信息
printStream.println("你好,吊毛!");
//清空輸出流
printStream.flush();
//關閉socket
socket.shutdownOutput();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- BioClient
/**
* @Author 三分惡
* @Date 2023/4/30
* @Description BIO客戶端
*/
public class BioClient {
public static void main(String[] args) throws IOException {
List<String> names= Arrays.asList("帥哥","靚仔","坤坤");
//通過循環創建多個多個client
for (String name:names){
//創建socket并根據IP地址與端口連接服務端
Socket socket=new Socket("127.0.0.1",8888);
System.out.println("===========BIO客戶端啟動================");
//從socket中獲取字節輸出流
OutputStream outputStream=socket.getOutputStream();
//通過輸出流向服務端傳遞信息
String hello="你好,"+name+"!";
outputStream.write(hello.getBytes());
//清空流,關閉socket輸出
outputStream.flush();
socket.shutdownOutput();
//從socket中獲取字節輸入流
InputStream inputStream=socket.getInputStream();
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(inputStream));
//讀取服務端消息
String msg;
while((msg=bufferedReader.readLine())!=null){
System.out.println("收到服務端消息:"+msg);
}
inputStream.close();
outputStream.close();
socket.close();
}
}
}
- 先啟動BioServer,再啟動BioClient,運行結果
===========BIO服務端啟動================
收到客戶端消息:你好,帥哥!
收到客戶端消息:你好,靚仔!
收到客戶端消息:你好,坤坤!
===========BIO客戶端啟動================
收到服務端消息:你好,吊毛!
===========BIO客戶端啟動================
收到服務端消息:你好,吊毛!
===========BIO客戶端啟動================
收到服務端消息:你好,吊毛!
在上述Java-BIO的通信過程中,如果客戶端一直沒有發送消息過來,服務端則會一直等待下去,從而服務端陷入阻塞狀態。同理,由于客戶端也一直在等待服務端的消息,如果服務端一直未響應消息回來,客戶端也會陷入阻塞狀態。
在BioServer定義了一個類BioServerThread,繼承了Thread類,run方法里主要是通過socket和流來讀取客戶端的消息,以及發送消息給客戶端,每處理一個客戶端的Socket連接,就得新建一個線程。
同時,IO讀寫操作也是阻塞的,如果客戶端一直沒有發送消息過來,線程就會進入阻塞狀態,一直等待下去。
在BioClient里,循環創建Socket,向服務端收發消息,客戶端的讀寫也是阻塞的。
在這個Demo里就體現了BIO的兩個特點:
- 一個客戶端連接對應一個處理線程
- 讀寫操作都是阻塞的
Java BIO
毫無疑問,不管是創建太多線程,還是阻塞讀寫,都會浪費服務器的資源。
Java NIO
那么我們就進入Java的下一種IO模型——Java NIO,它對應操作系統IO模型中的多路復用IO,底層采用了epoll實現。
Java-NIO則是JDK1.4中新引入的API,它在BIO功能的基礎上實現了非阻塞式的特性,其所有實現都位于java.nio包下。NIO是一種基于通道、面向緩沖區的IO操作,相較BIO而言,它能夠更為高效的對數據進行讀寫操作,同時與原先的BIO使用方式也大有不同。
我們還是先來看個Demo:
- NioServer
/**
* @Author 三分惡
* @Date 2023/4/30
* @Description NIO服務端
*/
public class NioServer {
public static void main(String[] args) throws IOException {
//創建一個選擇器selector
Selector selector= Selector.open();
//創建serverSocketChannel
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
//綁定端口
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
//必須得設置成非阻塞模式
serverSocketChannel.configureBlocking(false);
//將channel注冊到selector并設置監聽事件為ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("===========NIO服務端啟動============");
while(true){
//超時等待
if(selector.select(1000)==0){
System.out.println("===========NIO服務端超時等待============");
continue;
}
// 有客戶端請求被輪詢監聽到,獲取返回的SelectionKey集合
Iterator<SelectionKey> iterator=selector.selectedKeys().iterator();
//迭代器遍歷SelectionKey集合
while (iterator.hasNext()){
SelectionKey key=iterator.next();
// 判斷是否為ACCEPT事件
if (key.isAcceptable()){
// 處理接收請求事件
SocketChannel socketChannel=((ServerSocketChannel) key.channel()).accept();
//非阻塞模式
socketChannel.configureBlocking(false);
// 注冊到Selector并設置監聽事件為READ
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
System.out.println("成功連接客戶端");
}
//判斷是否為READ事件
if (key.isReadable()){
SocketChannel socketChannel = (SocketChannel) key.channel();
try {
// 獲取以前設置的附件對象,如果沒有則新建一個
ByteBuffer buffer = (ByteBuffer) key.attachment();
if (buffer == null) {
buffer = ByteBuffer.allocate(1024);
key.attach(buffer);
}
// 清空緩沖區
buffer.clear();
// 將通道中的數據讀到緩沖區
int len = socketChannel.read(buffer);
if (len > 0) {
buffer.flip();
String message = new String(buffer.array(), 0, len);
System.out.println("收到客戶端消息:" + message);
} else if (len < 0) {
// 接收到-1,表示連接已關閉
key.cancel();
socketChannel.close();
continue;
}
// 注冊寫事件,下次向客戶端發送消息
socketChannel.register(selector, SelectionKey.OP_WRITE, buffer);
} catch (IOException e) {
// 取消SelectionKey并關閉對應的SocketChannel
key.cancel();
socketChannel.close();
}
}
//判斷是否為WRITE事件
if (key.isWritable()){
SocketChannel socketChannel = (SocketChannel) key.channel();
//獲取buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
String hello = "你好,坤坤!";
//清空buffer
buffer.clear();
//buffer中寫入消息
buffer.put(hello.getBytes());
buffer.flip();
//向channel中寫入消息
socketChannel.write(buffer);
buffer.clear();
System.out.println("向客戶端發送消息:" + hello);
// 設置下次讀寫操作,向 Selector 進行注冊
socketChannel.register(selector, SelectionKey.OP_READ, buffer);
}
// 移除本次處理的SelectionKey,防止重復處理
iterator.remove();
}
}
}
}
- NioClient
public class NioClient {
public static void main(String[] args) throws IOException {
// 創建SocketChannel并指定ip地址和端口號
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888));
System.out.println("==============NIO客戶端啟動================");
// 非阻塞模式
socketChannel.configureBlocking(false);
String hello="你好,靚仔!";
ByteBuffer buffer = ByteBuffer.wrap(hello.getBytes());
// 向通道中寫入數據
socketChannel.write(buffer);
System.out.println("發送消息:" + hello);
buffer.clear();
// 將channel注冊到Selector并監聽READ事件
socketChannel.register(Selector.open(), SelectionKey.OP_READ, buffer);
while (true) {
// 讀取服務端數據
if (socketChannel.read(buffer) > 0) {
buffer.flip();
String msg = new String(buffer.array(), 0, buffer.limit());
System.out.println("收到服務端消息:" + msg);
break;
}
}
// 關閉輸入流
socketChannel.shutdownInput();
// 關閉SocketChannel連接
socketChannel.close();
}
}
- 先運行NioServer,再運行NioClient,運行結果:
===========NIO服務端啟動============
===========NIO服務端超時等待============
===========NIO服務端超時等待============
成功連接客戶端
收到客戶端消息:你好,靚仔!
向客戶端發送消息:你好,坤坤!
==============NIO客戶端啟動================
發送消息:你好,靚仔!
收到服務端消息:你好,坤坤!
我們在這個案例里實現了一個比較簡單的Java NIO 客戶端服務端通信,里面有兩個小的點需要注意,注冊到選擇器上的通道都必須要為非阻塞模型,同時通過緩沖區傳輸數據時,必須要調用flip()方法切換為讀取模式。
代碼流程示意圖
Java-NIO中有三個核心概念:**Buffer(緩沖區)、Channel(通道)、Selector(選擇器)**。
Java NIO
- 每個客戶端連連接本質上對應著一個Channel通道,每個通道都有自己的Buffer緩沖區來進行讀寫,這些Channel被Selector選擇器管理調度
- Selector負責輪詢所有已注冊的Channel,監聽到有事件發生,才提交給服務端線程處理,服務端線程不需要做任何阻塞等待,直接在Buffer里處理Channel事件的數據即可,處理完馬上結束,或返回線程池供其他客戶端事件繼續使用。
- 通過Selector,服務端的一個Thread就可以處理多個客戶端的請求
- Buffer(緩沖區)就是飯店用來存放食材的儲藏室,當服務員點餐時,需要從儲藏室中取出食材進行制作。
- Channel(通道)是用于傳輸數據的車道,就像飯店里的上菜窗口,可以快速把點好的菜品送到客人的桌上。
- Selector(選擇器)就是大堂經理,負責協調服務員、廚師和客人的配合和溝通,以保證整個就餐過程的效率和順暢。
Java AIO
Java-AIO也被成為NIO2,它是在NIO的基礎上,引入了新的異步通道的概念,并提供了異步文件通道和異步套接字的實現。
異步通道的實現體系
它們的主要區別就在于這個異步通道,見名知意:使用異步通道去進行IO操作時,所有操作都為異步非阻塞的,當調用read()/write()/accept()/connect()方法時,本質上都會交由操作系統去完成,比如要接收一個客戶端的數據時,操作系統會先將通道中可讀的數據先傳入read()回調方法指定的緩沖區中,然后再主動通知Java程序去處理。
我們還是先來看個Demo:
- AioServer
/**
* @Author 三分惡
* @Date 2023/5/1
* @Description AIO服務端
*/
public class AioServer {
public static void main(String[] args) throws Exception {
// 創建異步通道組,處理IO事件
AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());
//創建異步服務器Socket通道,并綁定端口
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress(8888));
System.out.println("=============AIO服務端啟動=========");
// 異步等待接收客戶端連接
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
// 創建ByteBuffer
final ByteBuffer buffer = ByteBuffer.allocate(1024);
@Override
public void completed(AsynchronousSocketChannel channel, Object attachment) {
System.out.println("客戶端連接成功");
try {
buffer.clear();
// 異步讀取客戶端發送的消息
channel.read(buffer, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer len, Object attachment) {
buffer.flip();
String message = new String(buffer.array(), 0, len);
System.out.println("收到客戶端消息:" + message);
// 異步發送消息給客戶端
channel.write(ByteBuffer.wrap(("你好,阿坤!").getBytes()), null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
// 關閉輸出流
try {
channel.shutdownOutput();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
// 繼續異步等待接收客戶端連接
server.accept(null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
// 繼續異步等待接收客戶端連接
server.accept(null, this);
}
});
// 等待所有連接都處理完畢
group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}
- AioClient
/**
* @Author 三分惡
* @Date 2023/5/1
* @Description AIO客戶端
*/
public class AioClient {
public static void main(String[] args) throws Exception {
// 創建異步Socket通道
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
// 異步連接服務器
client.connect(new InetSocketAddress("127.0.0.1", 8888), null, new CompletionHandler<Void, Object>() {
// 創建ByteBuffer
final ByteBuffer buffer = ByteBuffer.wrap(("你好,靚仔!").getBytes());
@Override
public void completed(Void result, Object attachment) {
// 異步發送消息給服務器
client.write(buffer, null, new CompletionHandler<Integer, Object>() {
// 創建ByteBuffer
final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
@Override
public void completed(Integer result, Object attachment) {
readBuffer.clear();
// 異步讀取服務器發送的消息
client.read(readBuffer, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
readBuffer.flip();
String msg = new String(readBuffer.array(), 0, result);
System.out.println("收到服務端消息:" + msg);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
// 等待連接處理完畢
Thread.sleep(1000);
// 關閉輸入流和Socket通道
client.shutdownInput();
client.close();
}
}
- 看下運行結果
=============AIO服務端啟動=========
客戶端連接成功
收到客戶端消息:你好,靚仔!
收到服務端消息:你好,阿坤!
可以看到,所有的操作都是異步進行,通過completed接收異步回調,通過failed接收錯誤回調。
而且我們發現,相較于之前的NIO而言,AIO其中少了Selector選擇器這個核心組件,選擇器在NIO中充當了協調者的角色。
但在Java-AIO中,類似的角色直接由操作系統擔當,而且不是采用輪詢的方式監聽IO事件,而是采用一種類似于“訂閱-通知”的模式。
Java AIO簡圖
在AIO中,所有創建的通道都會直接在OS上注冊監聽,當出現IO請求時,會先由操作系統接收、準備、拷貝好數據,然后再通知監聽對應通道的程序處理數據。
Java-AIO這種異步非阻塞式IO也是由操作系統進行支持的,在Windows系統中提供了一種異步IO技術:IOCP(I/O Completion Port,所以Windows下的Java-AIO則是依賴于這種機制實現。不過在Linux系統中由于沒有這種異步IO技術,所以Java-AIO在Linux環境中使用的還是epoll這種多路復用技術進行模擬實現的。
因為Linux的異步IO技術實際上不太成熟,所以Java-AIO的實際應用并不是太多,比如大名鼎鼎的網絡通信框架Netty就沒有采用Java-AIO,而是使用Java-NIO,在代碼層面,自行實現異步。
小結
那么這期我們就快速過了一下Java的三種IO機制,它們的特點,我們直接看下圖:
Java三種IO模型
我們也發現,雖然Java-NIO、Java-AIO,在性能上比Java-BIO要強很多,但是可以看到,寫法上一個比一個難搞,不過好在基本也沒人直接用Java-NIO、Java-AIO,如果要進行網絡通信,一般都會采用Netty,它對原生的Java-NIO進行了封裝優化,接下來,我們會繼續走近Netty,敬請期待。
參考:
[1].《Netty權威指南》
[2].https://juejin.cn/post/7130952602350534693#heading-14
[3].https://www.jianshu.com/p/670033e5b916