一篇文章帶你了解Netty
Netty
傳統的IO模型的web容器,比如老版本的Tomcat,為了增加系統的吞吐量,需要不斷增加系統核心線程數量,或者通過水平擴展服務器數量,來增加系統處理請求的能力。有了NIO之后,一個線程即可處理多個連接事件,其中基于多路復用模型的Netty框架,不僅降低了使用NIO的復雜度,
優點
Netty是一款以java NIO為基礎,基于事件驅動模型支持異步、高并發的網絡應用框架。
- API使用簡單,開發門檻低,簡化了NIO開發網絡程序的復雜度
- 功能強大,預置多種編解碼功能,支持多種主流協議,比如Http、WebSocket。
- 定制能力強,可以通過ChannelHandler對通信框架靈活擴展。
- 性能高,支持異步非阻塞通信模型
- 成熟穩定,社區活躍,已經修復了Java NIO所有的Bug。
- 經歷了大規模商業應用的考驗,質量有保證。
Reactor模型
Reactor模型也叫做反應器設計模式,是一種為處理服務請求并發提交到一個或者多個服務處理器的事件設計模式。
而Netty是基于Reactor模型設計的一套高性能網絡應用框架,下面來看下常見的Reactor模型有哪些:
一、單Reactor單線程
1)可以實現通過一個阻塞對象監聽多個鏈接請求
2)Reactor對象通過select監聽客戶端請求事件,通過dispatch進行分發
3)如果是建立鏈接請求,則由Acceptor通過accept處理鏈接請求,然后創建一個Handler對象處理完成鏈接后的各種事件
4)如果不是鏈接請求,則由Reactor分發調用鏈接對應的Handler來處理
5)Handler會完成Read->業務處理->send的完整業務流程
二、單Reactor多線程
1)Reactor對象通過select監聽客戶端請求事件,收到事件后,通過dispatch分發
2)如果是建立鏈接請求,則由Acceptor通過accept處理鏈接請求,然后創建一個Handler對象處理完成鏈接后的各種事件
3)如果不是鏈接請求,則由Reactor分發調用鏈接對應的Handler來處理
4)Handler只負責事件響應不做具體業務處理
5)通過read讀取數據后,分發到worker線程池處理,處理完成后返回給Handler,Handler收到后,通過send將結果返回給client
三、主從Reactor多線程
1)Reactor主線程MainReactor對象通過select監聽鏈接事件,通過Acceptor處理
2)當Acceptor處理鏈接事件后,MainReactor將鏈接分配給SubReactor
3)SubReactor將鏈接加入到隊列進行監聽,并創建Handler進行事件處理
4)當有新事件發生時,SubReactor就會調用對應的Handler處理
5)Handler通過read讀取數據,分發到worker線程池處理,處理完成后返回給Handler,Handler收到后,通過send將結果返回給client
6)Reactor主線程可以對應多個Reactor子線程
三種模式用生活案例來理解:
1)單Reactor單線程,前臺接待員和服務員是同一個人,全程為顧客服務
2)單Reactor多線程,1個前臺接待員,多個服務員,接待員只負責接待
3)主從Reactor多線程,多個前臺接待員,多個服務員
Reactor模型具有如下優點:
1)響應快,不必為單個同步事件所阻塞,雖然Reactor本身依然是同步的
2)可以最大程度的避免復雜的多線程及同步問題,并且避免了多線程/進程的切換開銷
3)擴展性好,可以方便的通過增加Reactor實例個數來充分利用CPU資源
4)復用性好,Reactor模型本身與具體事件處理邏輯無關,具有很高的復用性
核心組件
Bootstrap
一個Netty應用通常由一個Bootstrap開始,它主要作用是配置整個Netty程序,串聯起各個組件。
Handler
為了支持各種協議和處理數據的方式,便誕生了Handler組件。Handler主要用來處理各種事件,這里的事件很廣泛,比如可以是連接、數據接收、異常、數據轉換等。
ChannelInboundHandler
一個最常用的Handler。這個Handler的作用就是處理接收到數據時的事件,也就是說,我們的業務邏輯一般就是寫在這個Handler里面的,ChannelInboundHandler就是用來處理我們的核心業務邏輯。
ChannelInitializer
當一個鏈接建立時,我們需要知道怎么來接收或者發送數據,當然,我們有各種各樣的Handler實現來處理它,那么ChannelInitializer便是用來配置這些Handler,它會提供一個ChannelPipeline,并把Handler加入到ChannelPipeline。
ChannelPipeline
一個Netty應用基于ChannelPipeline機制,這種機制需要依賴于EventLoop和EventLoopGroup,因為它們三個都和事件或者事件處理相關。
EventLoops
目的是為Channel處理IO操作,一個EventLoop可以為多個Channel服務。EventLoopGroup由多個EventLoop組成。
Channel
代表了一個Socket鏈接,或者其它和IO操作相關的組件,它和EventLoop一起用來參與IO處理。
Future
在Netty中所有的IO操作都是異步的,因此,你不能立刻得知消息是否被正確處理,但是我們可以過一會等它執行完成或者直接注冊一個監聽,具體的實現就是通過Future和ChannelFutures,他們可以注冊一個監聽,當操作執行成功或失敗時監聽會自動觸發。
示例
下面通過一個簡單的示例,了解怎么基于Netty如何開發一個簡單的網絡通信程序。和之前一樣,包含服務端與客戶端:
Server:
@Slf4j
public class Server {
private EventLoopGroup boosGroup;
private EventLoopGroup workGroup;
public Server(int port){
try {
init(port);
log.info("----- 服務啟動成功 -----");
} catch (InterruptedException e) {
log.error("啟動服務出錯:{}", e.getCause());
}
}
private void init(int port) throws InterruptedException {
// 處理連接
this.boosGroup = new NioEventLoopGroup();
// 處理業務
this.workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
// 綁定
bootstrap.group(boosGroup, workGroup)
.channel(NioServerSocketChannel.class) //配置服務端
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_RCVBUF, 1024)
.childOption(ChannelOption.SO_SNDBUF, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
}
public void close(){
this.boosGroup.shutdownGracefully();
this.workGroup.shutdownGracefully();
}
}
@Slf4j
class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(">>>>>>> server active");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//1. 讀取客戶端的數據(緩存中去取并打印到控制臺)
ByteBuf buf = (ByteBuf) msg;
byte[] request = new byte[buf.readableBytes()];
buf.readBytes(request);
String requestBody = new String(request, "utf-8");
log.info(">>>>>>>>> receive message: {}", requestBody);
//2. 返回響應數據
ctx.writeAndFlush(Unpooled.copiedBuffer((requestBody+" too").getBytes()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
Client:
@Slf4j
public class Client {
private EventLoopGroup workGroup;
private ChannelFuture channelFuture;
public Client(int port){
init(port);
}
private void init(int port){
this.workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_RCVBUF, 1024)
.option(ChannelOption.SO_SNDBUF, 1024)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClientHandler());
}
});
this.channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly();
}
/**
*
* @param message
*/
public void send(String message){
this.channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(message.getBytes()));
}
/**
*
*/
public void close(){
try {
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
workGroup.shutdownGracefully();
}
}
@Slf4j
class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(">>>>>>> client active");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
log.info(">>>>>>>>> receive message: {}", body);
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
測試:
public class StarterTests {
static int port = 9011;
@Test
public void startServer(){
Server server = new Server(9011);
}
@Test
public void startClient(){
Client client = new Client(port);
client.send("Hello Netty!");
while (true){}
}
}
主要實現步驟:
1、創建兩個NIO線程組,一個專門用來網絡事件處理(接受客戶端連接),另一個則進行網絡通訊讀寫
2、創建一個ServerBootstrap對象,配置Netty的一系列參數,例如接收傳入數據的緩存大小等。
3、創建一個實際處理數據的類ChannelInitializer,進行初始化的準備工作,比如設置傳入數據的字符集格式,實現具體處理數據的接口。
4、綁定端口,執行同步阻塞方法等待服務器啟動即可
類似技術
Mina、Netty、Grizzly
1、Mina
Mina(Multipurpose Infrastructure for Network Applications) 是 Apache 組織一個較新的項目,它為開發高性能和高可用性的網絡應用程序提供了非常便利的框架。當前發行的 Mina 版本2.04支持基于 Java NIO 技術的 TCP/UDP 應用程序開發、串口通訊程序,Mina 所支持的功能也在進一步的擴展中。
目前,正在使用Mina的應用包括:Apache Directory Project、AsyncWeb、AMQP(Advanced Message Queuing Protocol)、RED5 Server(Macromedia Flash Media RTMP)、ObjectRADIUS、 Openfire等等。
2、Netty
Netty是一款異步的事件驅動的網絡應用框架和工具,用于快速開發可維護的高性能、高擴展性協議服務器和客戶端。也就是說,Netty是一個NIO客戶端/服務器框架,支持快速、簡單地開發網絡應用,如協議服務器和客戶端。它極大簡化了網絡編程,如TCP和UDP套接字服務器。
3、Grizzly
Grizzly是一種應用程序框架,專門解決編寫成千上萬用戶訪問服務器時候產生的各種問題。使用JAVA NIO作為基礎,并隱藏其編程的復雜性。容易使用的高性能的API。帶來非阻塞socketd到協議處理層。利用高性能的緩沖和緩沖管理使用高性能的線程池。
從設計的理念上來看,Mina的設計理念是最為優雅的。當然,由于Netty的主導作者與Mina的主導作者是同一人,出自同一人之手的Netty在設計理念上與Mina基本上是一致的。而Grizzly在設計理念上就較差了點,幾乎是JavaNIO的簡單封裝。
結束語
前面我們說到了各種IO模型,基于Java NIO開發的網絡應用程序,能夠更好的基于服務器操作系統底層支持,更好的利用服務器資源提供高性能、可靠易擴展的應用。而Netty在Java NIO的基礎上,為開發這提供便捷提升開發的效率與穩定性,在很多主流的框架中都有被使用,比如Dubbo、Spring Webflux等