成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

基于 Netty 服務端快速了解核心組件

開發 網絡
由于Netty優秀的設計和封裝,開發一個高性能網絡程序就變得非常簡單,本文從一個簡單的服務端落地簡單介紹一下Netty中的幾個核心組件,希望對你有幫助。

由于Netty優秀的設計和封裝,開發一個高性能網絡程序就變得非常簡單,本文從一個簡單的服務端落地簡單介紹一下Netty中的幾個核心組件,希望對你有幫助。

快速落地一個服務端

我們希望通過Netty快速落地一個簡單的主從reactor模型,由主reactor對應的線程組接收連接交由acceptor創建連接,與之建立的客戶端的讀寫事件都會交由從reactor對應的線程池處理:

基于此設計,我們通過Netty寫下如下代碼,可以看到我們做了如下幾件事:

  • 聲明一個服務端創建引導類ServerBootstrap ,負責配置服務端及其啟動工作。
  • 聲明主從reactor線程組,其中boss可以看作監聽端口接收新連接的線程組,而work則是負責處理客戶端數據讀寫的線程組。
  • 基于上述線程池作為group的入參完成主從reactor模型創建。
  • 通過channel函數指定server channe為NioServerSocketChannel即采用NIO模型,而NioServerSocketChannel我們可以直接理解為serverSocket的抽象表示。
  • 通過childHandler方法給引導設置每一個連接數據讀寫的處理器handler。

最后調用bind啟動服務端并通過addListener對連接結果進行異步監聽:

public static void main(String[] args) {
        //1. 聲明一個服務端創建引導類
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //2. 聲明主從reactor線程組
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());


        serverBootstrap.group(boss, worker)//3. 基于上述線程池創建主從reactor模型
                .channel(NioServerSocketChannel.class)//server channel采用NIO模型
                .childHandler(new ChannelInitializer<NioSocketChannel>() {//添加客戶端讀寫請求處理器到subreactor中
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // 對于ChannelInboundHandlerAdapter,收到消息后會按照順序執行即 A -> B->ServerHandler
                        ch.pipeline().addLast(new InboundHandlerA())
                                .addLast(new InboundHandlerB())
                                .addLast(new ServerHandler());

                        // 處理寫數據的邏輯,順序是反著的 B -> A
                        ch.pipeline().addLast(new OutboundHandlerA())
                                .addLast(new OutboundHandlerB())
                                .addLast(new OutboundHandlerC());

                        ch.pipeline().addLast(new ExceptionHandler());
                    }
                });
        //綁定8080端口并設置回調監聽結果
        serverBootstrap.bind("127.0.0.1", 8080)
                .addListener(f -> {
                    if (f.isSuccess()) {
                        System.out.println("連接成功");
                    }
                });
    }

對于客戶端的發送的數據,我們都會通過ChannelInboundHandlerAdapter添加順序處理,就如代碼所示我們的執行順序為InboundHandlerA->InboundHandlerB->ServerHandler,對此我們給出InboundHandlerA的代碼,InboundHandlerB內容一樣就不展示了:

public class InboundHandlerA extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerA : " + ((ByteBuf)msg).toString(StandardCharsets.UTF_8));
        //將當前的處理過的msg轉交給pipeline的下一個ChannelHandler
        super.channelRead(ctx, msg);
    }
}

而ServerHandler的則是:

  • 客戶端與服務端建立連接,對應客戶端channel被激活,觸發channelActive方法。
  • ChannelHandlerContext 的 Channel 已注冊到其 EventLoop 中,執行channelRegistered。
  • 將 ChannelHandler 添加到實際上下文并準備好處理事件后調用。

解析客戶端的數據然后回復Hello Netty client :

private static class ServerHandler extends ChannelInboundHandlerAdapter {

  @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("channel被激活,執行channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            System.out.println("執行channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            System.out.println("執行handlerAdded");
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            //打印讀取到的數據
            System.out.println(new Date() + ": 服務端讀到數據 -> " + byteBuf.toString(StandardCharsets.UTF_8));

            // 回復客戶端數據
            System.out.println(new Date() + ": 服務端寫出數據");
            //組裝數據并發送
            ByteBuf out = getByteBuf(ctx);
            ctx.channel().writeAndFlush(out);
            super.channelRead(ctx, msg);
        }

        private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
            ByteBuf buffer = ctx.alloc().buffer();

            byte[] bytes = "Hello Netty client ".getBytes(StandardCharsets.UTF_8);

            buffer.writeBytes(bytes);

            return buffer;
        }

      //......
    }

我們通過telnet 對應ip和端口后發現服務端輸出如下內容,也很我們上文說明的一致:

執行handlerAdded
執行channelRegistered
端口綁定成功,channel被激活,執行channelActive

然后我們發送消息 1,可以看到觸發所有inbound的channelRead方法:

InBoundHandlerA : 1
InBoundHandlerB: 1
Wed Jul 24 00:05:18 CST 2024: 服務端讀到數據 -> 1

然后我們回復hello netty client,按照添加的倒敘觸發OutBoundHandler:

Wed Jul 24 00:05:18 CST 2024: 服務端寫出數據
OutBoundHandlerC: Hello Netty client 
OutBoundHandlerB: Hello Netty client 
OutBoundHandlerA: Hello Netty client 

詳解Netty中的核心組件

channel接口

channel是Netty對于底層class socket中的bind、connect、read、write等原語的封裝,簡化了我們網絡編程的復雜度,同時Netty也提供的各種現成的channel,我們可以根據個人需要自行使用。 下面便是筆者比較常用的Tcp或者UDP中比較常用的幾種channel。

  • NioServerSocketChannel:基于NIO選擇器處理新連接。
  • EpollServerSocketChannel:使用 linux EPOLL Edge 觸發模式實現最大性能的實現。
  • NioDatagramChannel:發送和接收 AddressedEnvelope 的 NIO 數據報通道。
  • EpollDatagramChannel:使用 linux EPOLL Edge 觸發模式實現最大性能的 DatagramChannel 實現。

EventLoop接口

在Netty中,所有channel都會注冊到某個eventLoop上, 每一個EventLoopGroup中有一個或者多個EventLoop,而每一個EventLoop都綁定一個線程,負責處理一個或者多個channel的事件:

這里我們也簡單的給出NioEventLoop中的run方法,它繼承自SingleThreadEventExecutor,我們可以大概看到NioEventLoop的核心邏輯本質就是輪詢所有注冊到NioEventLoop上的channel(socket的抽象)是否有其就緒的事件,然后

  @Override
    protected void run() {
        for (;;) {
            try {
             //基于selectStrategy輪詢查看是否有就緒事件
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
      //......

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                //根據IO配比執行網絡IO事件方法processSelectedKeys以及其他事件方法runAllTasks
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
           //......
        }
    }

pipeline和channelHandler以channelHandlerContext

每一個channel的事件都會交由channelHandler處理,而負責同一個channel的channelHandler都會交由pipeline一條邏輯鏈進行連接,這兩者的的關系都會一一封裝成channelHandlerContext,channelHandlerContext主要是負責當前channelHandler和與其同一條channelpipeline上的其他channelHandler之間的交互。

舉個例子,當我們收到客戶端的寫入數據時,這些數據就會交由pipeline上的channelHandler處理,如下圖所示,從第一個channelHandler處理完成之后,每個channelHandlerContext就會將消息轉交到當前pipeline的下一個channelHandler處理:

假設我們的channelHandler執行完ChannelActive后,如希望繼續傳播則會調用fireChannelActive:

 @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("端口綁定成功,channel被激活,執行channelActive");
            ctx.fireChannelActive()
        }

查看其內部邏輯即可知曉,它就是通過AbstractChannelHandlerContext得到pipeline的下一個ChannelHandler并執行其channelActive方法:

 @Override
    public ChannelHandlerContext fireChannelActive() {
        final AbstractChannelHandlerContext next = findContextInbound();
        invokeChannelActive(next);
        return this;
    }

回調的思想

我們可以說回調其實是一種設計思想,Netty對于連接或者讀寫操作都是異步非阻塞的,所以我們希望在連接被建立進行一些響應的處理,那么Netty就會在連接建立的時候方法暴露一個回調方法供用戶實現個性邏輯。

例如我們的channel連接被建立時,其底層就會調用invokeChannelActive獲取我們綁定的ChannelInboundHandler并執行其channelActive方法:

private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }

于是就會調用到我們服務端ServerHandler 的channelActive方法:

private static class ServerHandler extends ChannelInboundHandlerAdapter {

        //......

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("端口綁定成功,channel被激活,執行channelActive");
        }
  //......
}

Future異步監聽

為保證網絡服務器執行的效率,Netty大部分網絡IO操作都采用異步的,以筆者建立連接設置的監聽器為例,當前連接成功后,就會返回給監聽器一個java.util.concurrent.Future,我們就可以通過這個f獲取連接的結果是否成功:

//綁定8080端口并設置回調監聽結果
        serverBootstrap.bind("127.0.0.1", 8080)
                .addListener(f -> {
                    if (f.isSuccess()) {
                        System.out.println("連接成功");
                    }
                });

我們步入DefaultPromise的addListener即可發現其內部就是添加監聽后判斷這個連接的異步任務Future是否完成,如果完成調用notifyListeners回調我們的監聽器的邏輯:

@Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        //......
  //添加監聽
        synchronized (this) {
            addListener0(listener);
        }
  //連接任務完成,通知監聽器
        if (isDone()) {
            notifyListeners();
        }

        return this;
    }
責任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關推薦

2022-09-30 10:44:47

Netty組件數據

2023-09-11 10:53:32

2024-04-10 10:09:07

2024-05-30 08:04:20

Netty核心組件架構

2016-03-18 09:04:42

swift服務端

2023-07-26 10:21:26

服務端組件客戶端

2021-10-14 08:39:17

Java Netty Java 基礎

2012-03-02 10:38:33

MySQL

2013-03-25 10:08:44

PHPWeb

2023-06-30 09:46:00

服務物理機自動化

2022-10-08 00:01:00

ssrvuereact

2024-01-16 08:05:53

2011-04-22 10:13:35

SimpleFrame

2010-08-03 09:59:30

NFS服務

2016-11-03 09:59:38

kotlinjavaspring

2021-05-25 08:20:37

編程技能開發

2010-02-24 15:42:03

WCF服務端安全

2010-03-19 18:17:17

Java Server

2022-12-29 08:56:30

監控服務平臺

2009-08-21 15:22:56

端口偵聽
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产精品电影网 | 中文字幕国产精品视频 | 免费精品久久久久久中文字幕 | 视频一二三区 | 97国产一区二区精品久久呦 | 国产日韩一区二区三区 | www亚洲一区 | 九色综合网 | 国产清纯白嫩初高生视频在线观看 | 日韩高清一区 | 成人h电影在线观看 | 欧美一级片在线观看 | 精品国产乱码久久久久久蜜退臀 | 国产美女视频黄a视频免费 国产精品福利视频 | 日本一区二区三区免费观看 | 狠狠狠色丁香婷婷综合久久五月 | 欧美国产91 | 国产精品18久久久久久久 | 天天搞夜夜操 | 羞羞视频在线观看免费观看 | 精品三级在线观看 | 国产精品一区在线 | 国产精品视频在线播放 | 欧洲毛片 | 国产91成人 | 一级aaaaaa毛片免费同男同女 | 91av视频在线播放 | 国产成人精品久久二区二区91 | 国产999精品久久久久久 | 欧美日韩精品一区二区三区四区 | 一区二区三区在线电影 | 国产精品成人在线观看 | 国内自拍视频在线观看 | 国产精品视频免费观看 | 中文字幕在线一区二区三区 | 国产亚洲精品久久久久久牛牛 | 久久av资源网 | www.国产精品 | 精品一区二区三区电影 | 九九亚洲精品 | 国产高清久久久 |