一文快速了解高性能網絡通信框架 Netty
傳統BIO與不完美的解決方案
1.BIO編程及其問題
Java程序員早期進行網絡程序開發的時候,采用的都是傳統BIO模式進行開發,這種模式工作流程非常簡單:
- 阻塞監聽。
- 收到連接分配現場處理該連接。
- 循環回到第一步。
這種做法在少量的客戶端連接下還是可以保證可靠運行的,我們都知道每當服務器啟動就會其一個端口監聽連接,筆者以自己的服務器的1234號進程為例:
netstat -ano | findstr :1234
此時對應的端口使用情況為只有一個8080端口監聽:
TCP 0.0.0.0:8080 0.0.0.0:0 LISTENING 11312
TCP [::]:8080 [::]:0 LISTENING 11312
每當我們一個客戶端接入,服務器就會為其分配一個端口端口處理和該客戶端的收發,以筆者的程序為例,可以看到此時該進程正使用58891與客戶端socket進程交互:
TCP 0.0.0.0:8080 0.0.0.0:0 LISTENING 11312
TCP 127.0.0.1:8080 127.0.0.1:58891 ESTABLISHED 11312
TCP 127.0.0.1:58891 127.0.0.1:8080 ESTABLISHED 4928
TCP [::]:8080 [::]:0 LISTENING 11312
由此可知,一旦遇到高并發IO讀寫,由于一個客戶端綁定一個線程的模式,所以每一個端口號的收發都需要一個線程進程處理,如果有大量連接接入勢必導致頻繁的線程上下文切換進而導致各種資源的消耗,由此導致著名的C10k問題:
這里筆者也給出一段比較基礎的bio代碼示例供讀者參考一下這種實現,可以看到我們的主線程阻塞監聽,每當收到一個新的連接就創建一個線程處理這個客戶端的讀寫請求:
public class IOServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8888);
//創建一個線程等待連接進來的客戶端
new Thread(() -> waitConnect(serverSocket)).start();
}
private static void waitConnect(ServerSocket serverSocket) {
while (true) {
try {
// 1. 阻塞方法獲取新連接
Socket socket = serverSocket.accept();
// 2. 每個客戶端來了,就專門創建一個新的連接處理
new Thread(() -> {
int len;
byte[] data = new byte[1024];
try {
InputStream inputStream = socket.getInputStream();
// 3. 按字節流方式讀取數據
while ((len = inputStream.read(data)) != -1) {
System.out.println(Thread.currentThread().getName() + " receive msg:" + new String(data, 0, len));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2.epoll事件驅動編程
于是就有了epoll事件驅動編程這一方案,也就是我們常說的IO多路復用,該方案的理念是將所有socket的讀寫事件注冊到epoll上。 以我們的服務端為例,創建socket監聽連接時就會將自己的感興趣的連接事件注冊到epoll上,隨后服務端就可以在循環中非阻塞的獲取是否有連接接入,每當有連接接入就會為請求客戶端建立連接并將其讀寫事件注冊到處理客戶都安的epoll上,后續所有客戶端讀寫請求都會交給這個epoll處理,由此實現最少的線程做最多的事情,提升性能同時還降低消耗:
對此我們也用一段偽代碼展示一下事件驅動編程:
//創建epoll
EpollFd epollFd=createEpoll();
//將文件描述符注冊到epoll上
epollCreateCtl(epollFd,socketFdList)
while(true){
//收到epoll推送過來的事件
List<event> eventList=epollWait(epollFd);
//遍歷并處理事件
eventList.foreach(e->handler(e));
}
3.JDK傳統事件驅動編程
基于上述描述我們對事件驅動編程有了初步的了解,接下來我們就來看看原生的jdk是如何實現NIO事件驅動編程的。
首先我們需要創建一個serverSelector用于非阻塞查詢是否有就緒的socket事件,一旦收到客戶端的請求后,為其建立連接之后,將客戶端的讀寫事件注冊到clientSelector,由clientSelector的線程處理這些客戶端讀寫,而serverSelector依然負責非阻塞輪詢監聽是否有新連接:
簡單介紹之后我們給出Selector 聲明:
//負責輪詢是否有新連接
Selector serverSelector = Selector.open();
//負責處理每個客戶端是否有數據可讀
Selector clientSelector = Selector.open();
然后我們使用這個socket非阻塞輪詢就緒的連接事件并注冊到客戶端的epoll模型上:
new Thread(() -> {
try {
//創建服務端socket監聽通道
ServerSocketChannel listenerChannel = ServerSocketChannel.open();
//綁定端口
listenerChannel.socket().bind(new InetSocketAddress(8888));
//設置為非阻塞監聽
listenerChannel.configureBlocking(false);
//注冊感興趣的事件為OP_ACCEPT事件,即可處理當前socket的ACCEPT連接接入事件
listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
//循環非阻塞獲取就緒事件
while (true) {
//阻塞1毫秒查看是否有新的連接進來
if (serverSelector.select(1) > 0) {
//查看是否有就緒的事件
Set<SelectionKey> set = serverSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
//遍歷事件
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
//判斷是否是新的socket連接加入
if (key.isAcceptable()) {
System.out.println("有新的socket連接加入");
//接收此通道與socket的連接
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
//服務端監測到新連接之后,不再創建一個新線程,而是直接將
//新連接綁定到clientSelector上
clientChannel.register(clientSelector, SelectionKey.OP_READ);
keyIterator.remove();
}
}
}
}
} catch (Exception e) {
}
}).start();
我們再來看看客戶端處理線程邏輯,和上文差不多,都是非阻塞輪詢客戶端就緒的事件,我們以輸出的方式模擬事件處理,然后進入下一次循環:
new Thread(() -> {
while (true) {
try {
//通過clientSelector.select(1)方法可以輪詢出來,進而批量處理
if (clientSelector.select(1) > 0) {
//獲取就緒的客戶端事件
Set<SelectionKey> set = clientSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
//循環遍歷處理客戶端事件,完成后將該key移除,并在此注冊一個OP_READ等待下一次該socket就緒
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) {
try {
//獲取事件的通道
SocketChannel clientChannel = (SocketChannel) key.channel();
//數據的讀寫面向Buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//讀取數據到buffer中
clientChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println(Thread.currentThread().getName() + ":" + Charset.defaultCharset().newDecoder().decode(byteBuffer).toString());
} catch (Exception e) {
} finally {
keyIterator.remove();
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
可以看出原生nio雖然相對bio減小了一定開銷且提高一定的性能,但是缺點也很明顯:
原生的JDK的NIO概念非常多,使用非常復雜對新手不友好。
- 底層使用epoll,很容易導致空輪詢進而出現CPU100%。
- 沒有對建立連接和處理請求的兩個處理建立線程模型,無法較好的發揮它的優勢,需要自己進行擴展實現。
- 項目龐大后,會出現各種奇奇怪怪的bug,很難排查,且維護成本較高。
高性能網絡通信框架Netty
相對與JDK的原生nio,Netty與之相比有著一下的優勢:
- 統一的API,支持多種傳輸類型、阻塞的和非阻塞的簡單而強大的線程模型,真正的無連接數據報套接字,支持鏈接邏輯組件以支持復用。
- 易于使用,各種配置只需幾個方法的調用就能完成。
- 性能較好,擁有比 Java 的核心API更高的吞吐量以及更低的延遲得益于池化和復用,擁有更低的資源消耗最少的內存復制。
- 健壯,不會因為慢速、快速或者超載的連接而導致OutOfMemoryError消除在高速網絡中NIO應用程序常見的不公平讀/寫比率。
- 安全,完整的SSL/TLS以及 StartTLS支持可用于受限環境下,如Applet和 OSGI。
- 社區活躍。
同樣以以上述客戶端服務端通信,Netty實現就比較簡單了,我們編寫服務端時,只需通過NioEventLoopGroup 完成上圖所說兩個slector創建,再通過channel指明當前事件輪詢采用NIO非阻塞方式,最后將事件處理器FirstServerHandler添加到當前服務端childHandler的pipeline上即可處理所有客戶端讀寫請求:
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
//創建處理連接的事件輪詢eventLoop
NioEventLoopGroup boss = new NioEventLoopGroup();
//創建處理客戶端讀寫請求的eventLoop
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap.group(boss, worker)
//設置為非阻塞輪詢
.channel(NioServerSocketChannel.class)
//childHandler添加ServerHandler客戶端讀寫請求
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new FirstServerHandler());
}
});
serverBootstrap.bind("127.0.0.1", 8080);
}
最后我們給出FirstServerHandler 的代碼,可以看到我們直接繼承ChannelInboundHandlerAdapter 處理客戶端發送的數據,每當服務端收到客戶端數據時就會回調channelRead,我們的邏輯也很簡單,收到數據之后直接回復Hello Netty client:
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
/**
* 收到客戶端數據后會回調該方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
//打印讀取到的數據
System.out.println(new Date() + ": 服務端讀到數據 -> " + byteBuf.toString(StandardCharsets.UTF_8));
// 回復客戶端數據
System.out.println(new Date() + ": 服務端寫出數據");
//組裝數據并發送
ByteBuf out = getByteBuf(ctx);
ctx.channel().writeAndFlush(out);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
ByteBuf buffer = ctx.alloc().buffer();
byte[] bytes = "Hello Netty client ".getBytes(StandardCharsets.UTF_8);
buffer.writeBytes(bytes);
return buffer;
}
}
此時我們通過telnet 127.0.0.1 8080進行數據發送即可收到服務端的響應了: