基于 Netty 服務端快速了解核心組件
由于Netty優秀的設計和封裝,開發一個高性能網絡程序就變得非常簡單,本文從一個簡單的服務端落地簡單介紹一下Netty中的幾個核心組件,希望對你有幫助。
快速落地一個服務端
我們希望通過Netty快速落地一個簡單的主從reactor模型,由主reactor對應的線程組接收連接交由acceptor創建連接,與之建立的客戶端的讀寫事件都會交由從reactor對應的線程池處理:
基于此設計,我們通過Netty寫下如下代碼,可以看到我們做了如下幾件事:
- 聲明一個服務端創建引導類ServerBootstrap ,負責配置服務端及其啟動工作。
- 聲明主從reactor線程組,其中boss可以看作監聽端口接收新連接的線程組,而work則是負責處理客戶端數據讀寫的線程組。
- 基于上述線程池作為group的入參完成主從reactor模型創建。
- 通過channel函數指定server channe為NioServerSocketChannel即采用NIO模型,而NioServerSocketChannel我們可以直接理解為serverSocket的抽象表示。
- 通過childHandler方法給引導設置每一個連接數據讀寫的處理器handler。
最后調用bind啟動服務端并通過addListener對連接結果進行異步監聽:
public static void main(String[] args) {
//1. 聲明一個服務端創建引導類
ServerBootstrap serverBootstrap = new ServerBootstrap();
//2. 聲明主從reactor線程組
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
serverBootstrap.group(boss, worker)//3. 基于上述線程池創建主從reactor模型
.channel(NioServerSocketChannel.class)//server channel采用NIO模型
.childHandler(new ChannelInitializer<NioSocketChannel>() {//添加客戶端讀寫請求處理器到subreactor中
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 對于ChannelInboundHandlerAdapter,收到消息后會按照順序執行即 A -> B->ServerHandler
ch.pipeline().addLast(new InboundHandlerA())
.addLast(new InboundHandlerB())
.addLast(new ServerHandler());
// 處理寫數據的邏輯,順序是反著的 B -> A
ch.pipeline().addLast(new OutboundHandlerA())
.addLast(new OutboundHandlerB())
.addLast(new OutboundHandlerC());
ch.pipeline().addLast(new ExceptionHandler());
}
});
//綁定8080端口并設置回調監聽結果
serverBootstrap.bind("127.0.0.1", 8080)
.addListener(f -> {
if (f.isSuccess()) {
System.out.println("連接成功");
}
});
}
對于客戶端的發送的數據,我們都會通過ChannelInboundHandlerAdapter添加順序處理,就如代碼所示我們的執行順序為InboundHandlerA->InboundHandlerB->ServerHandler,對此我們給出InboundHandlerA的代碼,InboundHandlerB內容一樣就不展示了:
public class InboundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerA : " + ((ByteBuf)msg).toString(StandardCharsets.UTF_8));
//將當前的處理過的msg轉交給pipeline的下一個ChannelHandler
super.channelRead(ctx, msg);
}
}
而ServerHandler的則是:
- 客戶端與服務端建立連接,對應客戶端channel被激活,觸發channelActive方法。
- ChannelHandlerContext 的 Channel 已注冊到其 EventLoop 中,執行channelRegistered。
- 將 ChannelHandler 添加到實際上下文并準備好處理事件后調用。
解析客戶端的數據然后回復Hello Netty client :
private static class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channel被激活,執行channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
System.out.println("執行channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("執行handlerAdded");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
//打印讀取到的數據
System.out.println(new Date() + ": 服務端讀到數據 -> " + byteBuf.toString(StandardCharsets.UTF_8));
// 回復客戶端數據
System.out.println(new Date() + ": 服務端寫出數據");
//組裝數據并發送
ByteBuf out = getByteBuf(ctx);
ctx.channel().writeAndFlush(out);
super.channelRead(ctx, msg);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
ByteBuf buffer = ctx.alloc().buffer();
byte[] bytes = "Hello Netty client ".getBytes(StandardCharsets.UTF_8);
buffer.writeBytes(bytes);
return buffer;
}
//......
}
我們通過telnet 對應ip和端口后發現服務端輸出如下內容,也很我們上文說明的一致:
執行handlerAdded
執行channelRegistered
端口綁定成功,channel被激活,執行channelActive
然后我們發送消息 1,可以看到觸發所有inbound的channelRead方法:
InBoundHandlerA : 1
InBoundHandlerB: 1
Wed Jul 24 00:05:18 CST 2024: 服務端讀到數據 -> 1
然后我們回復hello netty client,按照添加的倒敘觸發OutBoundHandler:
Wed Jul 24 00:05:18 CST 2024: 服務端寫出數據
OutBoundHandlerC: Hello Netty client
OutBoundHandlerB: Hello Netty client
OutBoundHandlerA: Hello Netty client
詳解Netty中的核心組件
channel接口
channel是Netty對于底層class socket中的bind、connect、read、write等原語的封裝,簡化了我們網絡編程的復雜度,同時Netty也提供的各種現成的channel,我們可以根據個人需要自行使用。 下面便是筆者比較常用的Tcp或者UDP中比較常用的幾種channel。
- NioServerSocketChannel:基于NIO選擇器處理新連接。
- EpollServerSocketChannel:使用 linux EPOLL Edge 觸發模式實現最大性能的實現。
- NioDatagramChannel:發送和接收 AddressedEnvelope 的 NIO 數據報通道。
- EpollDatagramChannel:使用 linux EPOLL Edge 觸發模式實現最大性能的 DatagramChannel 實現。
EventLoop接口
在Netty中,所有channel都會注冊到某個eventLoop上, 每一個EventLoopGroup中有一個或者多個EventLoop,而每一個EventLoop都綁定一個線程,負責處理一個或者多個channel的事件:
這里我們也簡單的給出NioEventLoop中的run方法,它繼承自SingleThreadEventExecutor,我們可以大概看到NioEventLoop的核心邏輯本質就是輪詢所有注冊到NioEventLoop上的channel(socket的抽象)是否有其就緒的事件,然后
@Override
protected void run() {
for (;;) {
try {
//基于selectStrategy輪詢查看是否有就緒事件
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
//......
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
//根據IO配比執行網絡IO事件方法processSelectedKeys以及其他事件方法runAllTasks
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
//......
}
}
pipeline和channelHandler以channelHandlerContext
每一個channel的事件都會交由channelHandler處理,而負責同一個channel的channelHandler都會交由pipeline一條邏輯鏈進行連接,這兩者的的關系都會一一封裝成channelHandlerContext,channelHandlerContext主要是負責當前channelHandler和與其同一條channelpipeline上的其他channelHandler之間的交互。
舉個例子,當我們收到客戶端的寫入數據時,這些數據就會交由pipeline上的channelHandler處理,如下圖所示,從第一個channelHandler處理完成之后,每個channelHandlerContext就會將消息轉交到當前pipeline的下一個channelHandler處理:
假設我們的channelHandler執行完ChannelActive后,如希望繼續傳播則會調用fireChannelActive:
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("端口綁定成功,channel被激活,執行channelActive");
ctx.fireChannelActive()
}
查看其內部邏輯即可知曉,它就是通過AbstractChannelHandlerContext得到pipeline的下一個ChannelHandler并執行其channelActive方法:
@Override
public ChannelHandlerContext fireChannelActive() {
final AbstractChannelHandlerContext next = findContextInbound();
invokeChannelActive(next);
return this;
}
回調的思想
我們可以說回調其實是一種設計思想,Netty對于連接或者讀寫操作都是異步非阻塞的,所以我們希望在連接被建立進行一些響應的處理,那么Netty就會在連接建立的時候方法暴露一個回調方法供用戶實現個性邏輯。
例如我們的channel連接被建立時,其底層就會調用invokeChannelActive獲取我們綁定的ChannelInboundHandler并執行其channelActive方法:
private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}
于是就會調用到我們服務端ServerHandler 的channelActive方法:
private static class ServerHandler extends ChannelInboundHandlerAdapter {
//......
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("端口綁定成功,channel被激活,執行channelActive");
}
//......
}
Future異步監聽
為保證網絡服務器執行的效率,Netty大部分網絡IO操作都采用異步的,以筆者建立連接設置的監聽器為例,當前連接成功后,就會返回給監聽器一個java.util.concurrent.Future,我們就可以通過這個f獲取連接的結果是否成功:
//綁定8080端口并設置回調監聽結果
serverBootstrap.bind("127.0.0.1", 8080)
.addListener(f -> {
if (f.isSuccess()) {
System.out.println("連接成功");
}
});
我們步入DefaultPromise的addListener即可發現其內部就是添加監聽后判斷這個連接的異步任務Future是否完成,如果完成調用notifyListeners回調我們的監聽器的邏輯:
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
//......
//添加監聽
synchronized (this) {
addListener0(listener);
}
//連接任務完成,通知監聽器
if (isDone()) {
notifyListeners();
}
return this;
}