成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Netty源碼之Reactor模式

開發 前端
每個新連接創建一個線程來處理。對于長連接服務,如果一個client和server保持一個連接的話,有多少個client接入,server就需要創建同等的線程來處理。線程上下文切換,數據同步和內存消耗,對server來說,將是非常大的開銷。

 [[410505]]

學習目標

  • 什么是Reactor模式?
  • Reactor模式由什么組成的?
  • Reactor模式解決什么問題?
  • Reactor模式線程模型有哪些?演進過程?

web處理請求架構

大多數web請求處理流程可以抽象成這幾個步驟:讀取(read),解碼(decode),處理(process),編碼(encode),發送(send),如下圖所示:

 

同時,處理web請求通常有兩種架構:傳統基于線程架構和事件驅動架構。

傳統基于線程架構

概念

每個新連接創建一個線程來處理。對于長連接服務,如果一個client和server保持一個連接的話,有多少個client接入,server就需要創建同等的線程來處理。線程上下文切換,數據同步和內存消耗,對server來說,將是非常大的開銷。

代碼實現

傳統基于線程架構通常采用BIO的方式來實現,代碼如下:

  1. public class Server implements Runnable { 
  2.  
  3.     int port; 
  4.  
  5.     public Server(int port) { 
  6.         this.port = port; 
  7.     } 
  8.  
  9.     @Override 
  10.     public void run() { 
  11.         try { 
  12.             ServerSocket serverSocket = new ServerSocket(port); 
  13.             while (true){ 
  14.                 System.out.println("等待新連接..."); 
  15.                 new Thread(new Handler(serverSocket.accept())).start(); 
  16.             } 
  17.         } catch (IOException e) { 
  18.             e.printStackTrace(); 
  19.         } 
  20.     } 
  21.  
  22.     static class Handler implements Runnable{ 
  23.  
  24.         private Socket socket; 
  25.  
  26.         public Handler(Socket socket){ 
  27.             this.socket = socket; 
  28.         } 
  29.  
  30.         @Override 
  31.         public void run() { 
  32.             try { 
  33.                 byte[] input = new byte[1024]; 
  34.  
  35.                 this.socket.getInputStream().read(input); 
  36.                 byte[] output = process(input); 
  37.                 this.socket.getOutputStream().write(output); 
  38.                 this.socket.getOutputStream().flush(); 
  39.                 this.socket.close(); 
  40.                 System.out.println("響應完成!"); 
  41.             } catch (IOException e) { 
  42.                 e.printStackTrace(); 
  43.             } 
  44.         } 
  45.  
  46.         private byte[] process(byte[] input) { 
  47.             System.out.println("讀取內容:" + new String(input)); 
  48.             return input; 
  49.         } 
  50.     } 
  51.  
  52.     public static void main(String[] args) throws InterruptedException { 
  53.         Thread thread = new Thread(new Server(2021)); 
  54.         thread.setDaemon(true); 
  55.         thread.start(); 
  56.  
  57.         synchronized (Server.class) { 
  58.             Server.class.wait(); 
  59.         } 
  60.     } 

為了避免線程創建銷毀的開銷,我們通常會采用線程池,但是同樣也有很大的弊端:

  • 同步阻塞IO,讀寫阻塞,線程等待時間過長
  • 在制定線程策略的時候,只能根據CPU的數目來限定可用線程資源,不能根據連接并發數目來制定,也就是連接有限制。否則很難保證對客戶端請求的高效和公平。
  • 多線程之間的上下文切換,造成線程使用效率并不高,并且不易擴展
  • 狀態數據以及其他需要保持一致的數據,需要采用并發同步控制

應用場景

既然傳統基于線程架構弊端這么多,它存在還有什么價值?它的應用場景是什么?

傳統基于線程架構適用于連接數目比較小且一次傳輸大量數據的場景,比如上傳,下載。

事件驅動架構

事件驅動架構:可以把線程和連接解耦,線程只用于執行事件注冊的回調函數。事件驅動架構由事件生產者和事件消費者組成,前者是事件的來源,它只負責監聽哪些事件發生;后者是直接處理事件或者事件發生時,響應事件的實體。

Reactor模式

什么是Reactor模式?

Reactor模式是事件驅動架構的一種具體實現方法,簡而言之,就是一個單線程進行循環監聽就緒IO事件,并將就緒IO事件分發給對應的回調函數。

Reactor模式由什么組成的?

Reactor模式分為兩個重要組成部分,Reactor和Handler。 Reactor(反應器):循環監聽就緒IO事件,并分發給回調函數。 Handler(回調函數):執行對應IO事件的實際業務邏輯。

Reactor模式解決什么問題?

反應器模式可以實現同步的多路復用,同步是指按照事件到達的順序分發處理。反應器 接收來自不同的客戶端的消息、請求和連接,盡管客戶端是并發的,但是反應器可以按照事件到達的順序觸發回調函數。因此,Reactor模式將連接和線程解耦,不需要為每個連接創建單獨線程。這個問題和C10K問題相同,提供了一個解決思路。

Reactor模式下的三種模型

單線程模型:IO事件輪詢,分發(accept)和IO事件執行(read,decode,compute,encode,send)都在一個線程中完成,如下圖所示:

在單線程模型下,不僅IO操作在Reactor線程上,而非IO操作(handlder中process()方法)也在Reactor線程上執行了,當非IO操作執行慢的話,這會大大延遲IO請求響應,所以應該把非IO操作拆出來,來加速Reactor線程對IO請求響應,就出現多線程模型。

單線程模型實現:

  1. // reactor 
  2. public class Reactor implements Runnable { 
  3.  
  4.     int port; 
  5.     Selector selector; 
  6.     ServerSocketChannel serverSocket; 
  7.  
  8.     public Reactor(int port) throws IOException { 
  9.         this.port = port; 
  10.         // 創建serverSocket對象 
  11.         serverSocket = ServerSocketChannel.open(); 
  12.         // 綁定端口 
  13.         serverSocket.socket().bind(new InetSocketAddress(port)); 
  14.         // 配置非阻塞 
  15.         serverSocket.configureBlocking(false); 
  16.         // 創建selector對象 
  17.         selector = Selector.open(); 
  18.         // serversocket注冊到selector上,幫忙監聽accpet事件 
  19.         serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverSocket,selector)); 
  20.         /** 還可以使用 SPI provider,來創建selector和serversocket對象 
  21.         SelectorProvider p = SelectorProvider.provider(); 
  22.         selector = p.openSelector(); 
  23.         serverSocket = p.openServerSocketChannel(); 
  24.         */ 
  25.     } 
  26.  
  27.     @Override 
  28.     public void run() { 
  29.         try { 
  30.             while (!Thread.interrupted()) { 
  31.                 System.out.println("start select event..."); 
  32.                 selector.select(); 
  33.                 Set selectedKeys = selector.selectedKeys(); 
  34.                 Iterator it = selectedKeys.iterator(); 
  35.                 while (it.hasNext()) { 
  36.                     dispatch((SelectionKey)it.next()); 
  37.                 } 
  38.                 selectedKeys.clear(); 
  39.             } 
  40.         } catch (IOException e) { 
  41.             e.printStackTrace(); 
  42.         } 
  43.     } 
  44.  
  45.     private void dispatch(SelectionKey key) { 
  46.         Runnable r = (Runnable) key.attachment(); 
  47.         if (r != null) { 
  48.             r.run(); 
  49.         } 
  50.     } 
  51.  
  52.  
  53.     public static void main(String[] args) throws IOException, InterruptedException { 
  54.         Thread thread = new Thread(new Reactor(2021)); 
  55.         thread.start(); 
  56.         synchronized (Reactor.class) { 
  57.             Reactor.class.wait(); 
  58.         } 
  59.     } 
  60. // acceptor調度器 
  61. public class Acceptor implements Runnable { 
  62.  
  63.     ServerSocketChannel serverSocket; 
  64.     Selector selector; 
  65.  
  66.     public Acceptor(ServerSocketChannel serverSocket,Selector selector) { 
  67.         this.serverSocket = serverSocket; 
  68.         this.selector = selector; 
  69.     } 
  70.  
  71.     @Override 
  72.     public void run() { 
  73.         try { 
  74.             SocketChannel socket = this.serverSocket.accept(); 
  75.             if (socket != null) { 
  76.                 new Handler(selector,socket); 
  77.             } 
  78.  
  79.         } catch (IOException e) { 
  80.             e.printStackTrace(); 
  81.         } 
  82.     } 
  83. // 回調函數handler 
  84. public class Handler implements Runnable { 
  85.  
  86.     Selector selector; 
  87.     SocketChannel socket; 
  88.     SelectionKey sk; 
  89.  
  90.     ByteBuffer input = ByteBuffer.allocate(1024); 
  91.     ByteBuffer output = ByteBuffer.allocate(1024); 
  92.     static final int READING = 0, SENDING = 1; 
  93.     int state = READING; 
  94.  
  95.  
  96.     public Handler(Selector selector, SocketChannel socket) throws IOException { 
  97.         this.selector = selector; 
  98.         this.socket = socket; 
  99.  
  100.         this.socket.configureBlocking(false); 
  101.         sk = this.socket.register(selector,0); 
  102.         sk.attach(this); 
  103.         sk.interestOps(SelectionKey.OP_READ); 
  104.         selector.wakeup(); 
  105.     } 
  106.  
  107.     @Override 
  108.     public void run() { 
  109.         try{ 
  110.             if (state == READING) { 
  111.                 read(); 
  112.             } else if (state == SENDING) { 
  113.                 send(); 
  114.             } 
  115.         } catch (IOException ex) { 
  116.             ex.printStackTrace(); 
  117.         } 
  118.     } 
  119.  
  120.     private void read() throws IOException { 
  121.         socket.read(input); 
  122.         if (inputIsComplete()) { 
  123.             // 執行業務邏輯代碼 
  124.             process(); 
  125.             state = SENDING; 
  126.             // Normally also do first write now 
  127.             sk.interestOps(SelectionKey.OP_WRITE); 
  128.         } 
  129.     } 
  130.  
  131.     private void send() throws IOException { 
  132.         socket.write(output); 
  133.         socket.close(); 
  134.         if (outputIsComplete()) sk.cancel(); 
  135.     } 
  136.  
  137.     boolean inputIsComplete() { return true;} 
  138.  
  139.     boolean outputIsComplete() {return true;} 
  140.     // 處理非IO操作(業務邏輯代碼) 
  141.     void process(){ 
  142.         String msg = new String(input.array()); 
  143.         System.out.println("讀取內容:" + msg); 
  144.         output.put(msg.getBytes()); 
  145.         output.flip(); 
  146.     } 
  • 多線程模型:與單線程模型不同的是添加一個業務線程池,將非IO操作(業務邏輯處理)交給業務線程池來處理,提高Reactor線程的IO響應,如圖所示:

 

在多線程模型下,雖然將非IO操作拆出去了,但是所有IO操作都在Reactor單線程中完成的。在高負載、高并發場景下,也會成為瓶頸,于是對Reactor單線程進行了優化,出現了主從線程模型。

多線程模型實現:

  1. public class Reactor implements Runnable { 
  2.  
  3.     int port; 
  4.     Selector selector; 
  5.     ServerSocketChannel serverSocket; 
  6.  
  7.  
  8.     public Reactor(int port) throws IOException { 
  9.         this.port = port; 
  10.  
  11.         // 創建serverSocket對象 
  12.         serverSocket = ServerSocketChannel.open(); 
  13.         // 綁定端口 
  14.         serverSocket.socket().bind(new InetSocketAddress(port)); 
  15.         // 配置非阻塞 
  16.         serverSocket.configureBlocking(false); 
  17.  
  18.         // 創建selector對象 
  19.         selector = Selector.open(); 
  20.         // serversocket注冊到selector上,幫忙監聽accpet事件 
  21.         serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor("Acceptor",serverSocket,selector)); 
  22.  
  23.         /** 還可以使用 SPI provider,來創建selector和serversocket對象 
  24.         SelectorProvider p = SelectorProvider.provider(); 
  25.         selector = p.openSelector(); 
  26.         serverSocket = p.openServerSocketChannel(); 
  27.         */ 
  28.     } 
  29.  
  30.     @Override 
  31.     public void run() { 
  32.         try { 
  33.             while (!Thread.interrupted()) { 
  34.                 System.out.println("start select event..."); 
  35.                 selector.select(); 
  36.                 Set selectedKeys = selector.selectedKeys(); 
  37.                 Iterator it = selectedKeys.iterator(); 
  38.                 while (it.hasNext()) { 
  39.                     dispatch((SelectionKey)it.next()); 
  40.                 } 
  41.                 selectedKeys.clear(); 
  42.             } 
  43.         } catch (IOException e) { 
  44.             e.printStackTrace(); 
  45.         } 
  46.     } 
  47.  
  48.     private void dispatch(SelectionKey key) { 
  49.         SelfRunable r = (SelfRunable) key.attachment(); 
  50.         if (r != null) { 
  51.             System.out.println("dispatch to " + r.getName() + "===="); 
  52.             r.run(); 
  53.         } 
  54.     } 
  55.  
  56.  
  57.     public static void main(String[] args) throws IOException, InterruptedException { 
  58.  
  59.         Thread thread = new Thread(new Reactor(2021)); 
  60.         thread.start(); 
  61.  
  62.         synchronized (Reactor.class) { 
  63.             Reactor.class.wait(); 
  64.         } 
  65.  
  66.  
  67.     } 
  68.  
  69. public class Acceptor implements SelfRunable { 
  70.     ServerSocketChannel serverSocket; 
  71.     Selector selector; 
  72.     String name
  73.     public Acceptor(String name, ServerSocketChannel serverSocket,Selector selector) { 
  74.         this.name = name
  75.         this.serverSocket = serverSocket; 
  76.         this.selector = selector; 
  77.     } 
  78.  
  79.     @Override 
  80.     public void run() { 
  81.         try { 
  82.             SocketChannel socket = this.serverSocket.accept(); 
  83.             if (socket != null) { 
  84.                 new Handler("handler_" + ((InetSocketAddress)socket.getLocalAddress()).getPort(), selector,socket); 
  85.             } 
  86.  
  87.         } catch (IOException e) { 
  88.             e.printStackTrace(); 
  89.         } 
  90.     } 
  91.  
  92.     @Override 
  93.     public String getName() { 
  94.         return this.name
  95.     } 
  96.  
  97. public class Handler implements SelfRunable { 
  98.     String name
  99.     Selector selector; 
  100.     SocketChannel socket; 
  101.     SelectionKey sk; 
  102.     ByteBuffer input = ByteBuffer.allocate(1024); 
  103.     ByteBuffer output = ByteBuffer.allocate(1024); 
  104.     static final int READING = 0, SENDING = 1,  PROCESSING = 3; 
  105.     volatile int state = READING; 
  106.     static ExecutorService poolExecutor = Executors.newFixedThreadPool(5); 
  107.  
  108.     public Handler(String name, Selector selector, SocketChannel socket) throws IOException { 
  109.         this.selector = selector; 
  110.         this.socket = socket; 
  111.         this.name = name
  112.  
  113.         this.socket.configureBlocking(false); 
  114.         sk = this.socket.register(selector,0); 
  115.         sk.attach(this); 
  116.         sk.interestOps(SelectionKey.OP_READ); 
  117.         selector.wakeup(); 
  118.     } 
  119.  
  120.     @Override 
  121.     public void run() { 
  122.         try{ 
  123.             System.out.println("state:" + state); 
  124.             if (state == READING) { 
  125.                 read(); 
  126.             } else if (state == SENDING) { 
  127.                 send(); 
  128.             } 
  129.         } catch (IOException ex) { 
  130.             ex.printStackTrace(); 
  131.         } 
  132.     } 
  133.  
  134.     synchronized void read() throws IOException { 
  135.         socket.read(input); 
  136.         if (inputIsComplete()) { 
  137.             state = PROCESSING; 
  138.            poolExecutor.execute(new Processer()); 
  139.         } 
  140.     } 
  141.  
  142.     synchronized void processAndHandOff() { 
  143.         System.out.println("processAndHandOff========="); 
  144.         process(); 
  145.         state = SENDING; // or rebind attachment 
  146.         sk.interestOps(SelectionKey.OP_WRITE); 
  147.         selector.wakeup(); 
  148.         System.out.println("processAndHandOff finish ! ========="); 
  149.     } 
  150.  
  151.     private void send() throws IOException { 
  152.         System.out.println("start send ..."); 
  153.         socket.write(output); 
  154.         socket.close(); 
  155.         System.out.println("start send finish!"); 
  156.         if (outputIsComplete()) sk.cancel(); 
  157.     } 
  158.  
  159.     boolean inputIsComplete() { return true;} 
  160.  
  161.     boolean outputIsComplete() {return true;} 
  162.  
  163.     void process(){ 
  164.         String msg = new String(input.array()); 
  165.         System.out.println("讀取內容:" + msg); 
  166.         output.put(msg.getBytes()); 
  167.         output.flip(); 
  168.     } 
  169.  
  170.     @Override 
  171.     public String getName() { 
  172.         return this.name
  173.     } 
  174.  
  175.     class Processer implements Runnable { 
  176.         public void run() { processAndHandOff(); } 
  177.     } 
  • 主從線程模型: 相比多線程模型而言,對于多核cpu,為了充分利用資源,將Reactor拆分成了mainReactor 和 subReactor,但是,主從線程模型也有弊端,不適合大量數據傳輸。 mainReactor:負責監聽接收(accpet)新連接,將新連接后續操作交給subReactor來處理,通常由一個線程處理。 subReactor: 負責處理IO的讀寫操作,通常由多個線程處理。 非IO操作依然由業務線程池來處理。

主從線程模型實現:

  1. public class Reactor implements Runnable { 
  2.  
  3.     int port; 
  4.     Selector selector; 
  5.     ServerSocketChannel serverSocket; 
  6.     int SUBREACTOR_SIZE = 1; 
  7.     SubReactor[] subReactorPool = new SubReactor[SUBREACTOR_SIZE]; 
  8.  
  9.  
  10.     public Reactor(int port) throws IOException { 
  11.         this.port = port; 
  12.  
  13.         // 創建serverSocket對象 
  14.         serverSocket = ServerSocketChannel.open(); 
  15.         // 綁定端口 
  16.         serverSocket.socket().bind(new InetSocketAddress(port)); 
  17.         // 配置非阻塞 
  18.         serverSocket.configureBlocking(false); 
  19.  
  20.         // 創建selector對象 
  21.         selector = Selector.open(); 
  22.         // serversocket注冊到selector上,幫忙監聽accpet事件 
  23.         serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor("Acceptor",serverSocket,subReactorPool)); 
  24.  
  25.         // 初始化subreactor pool 
  26.         initSubReactorPool(); 
  27.  
  28.  
  29.         /** 還可以使用 SPI provider,來創建selector和serversocket對象 
  30.         SelectorProvider p = SelectorProvider.provider(); 
  31.         selector = p.openSelector(); 
  32.         serverSocket = p.openServerSocketChannel(); 
  33.         */ 
  34.     } 
  35.  
  36.     @Override 
  37.     public void run() { 
  38.         try { 
  39.             while (!Thread.interrupted()) { 
  40.                 System.out.println("mainReactor start select event..."); 
  41.                 selector.select(); 
  42.                 Set selectedKeys = selector.selectedKeys(); 
  43.                 Iterator it = selectedKeys.iterator(); 
  44.                 while (it.hasNext()) { 
  45.                     dispatch((SelectionKey)it.next()); 
  46.                 } 
  47.                 selectedKeys.clear(); 
  48.             } 
  49.         } catch (IOException e) { 
  50.             e.printStackTrace(); 
  51.         } 
  52.     } 
  53.  
  54.     void initSubReactorPool() { 
  55.         try { 
  56.             for (int i = 0; i < SUBREACTOR_SIZE; i++) { 
  57.                 subReactorPool[i] = new SubReactor("SubReactor" + i); 
  58.             } 
  59.         } catch (IOException ex) { /* ... */ } 
  60.     } 
  61.  
  62.     private void dispatch(SelectionKey key) { 
  63.         SelfRunable r = (SelfRunable) key.attachment(); 
  64.         if (r != null) { 
  65.             System.out.println("mainReactor dispatch to " + r.getName() + "===="); 
  66.             r.run(); 
  67.         } 
  68.     } 
  69.  
  70.  
  71.     public static void main(String[] args) throws IOException, InterruptedException { 
  72.  
  73.         Thread thread = new Thread(new Reactor(2021)); 
  74.         thread.start(); 
  75.  
  76.         synchronized (Reactor.class) { 
  77.             Reactor.class.wait(); 
  78.         } 
  79.     } 
  80.  
  81. public class SubReactor implements SelfRunable { 
  82.  
  83.     private Selector selector; 
  84.     private String name
  85.     private List<SelfRunable> task = new ArrayList<SelfRunable>(); 
  86.  
  87.     public SubReactor(String name) throws IOException { 
  88.         this.name = name
  89.         selector = Selector.open(); 
  90.         new Thread(this).start(); 
  91.     } 
  92.  
  93.     @Override 
  94.     public String getName() { 
  95.         return this.name
  96.     } 
  97.  
  98.     @Override 
  99.     public void run() { 
  100.         try { 
  101.             while (!Thread.interrupted()) { 
  102.                 System.out.println("subReactor start select event..."); 
  103.                 selector.select(5000); 
  104.                 Set selectedKeys = selector.selectedKeys(); 
  105.                 Iterator it = selectedKeys.iterator(); 
  106.                 while (it.hasNext()) { 
  107.                     dispatch((SelectionKey)it.next()); 
  108.                 } 
  109.                 selectedKeys.clear(); 
  110.  
  111.             } 
  112.         } catch (IOException e) { 
  113.             e.printStackTrace(); 
  114.         } 
  115.     } 
  116.  
  117.     private void dispatch(SelectionKey key) { 
  118.         SelfRunable r = (SelfRunable) key.attachment(); 
  119.         if (r != null) { 
  120.             System.out.println("subReactor dispatch to " + r.getName() + "===="); 
  121.             r.run(); 
  122.         } 
  123.     } 
  124.  
  125.     public Selector getSelector(){ 
  126.         return this.selector; 
  127.     } 
  128.  
  129.     public void submit(SelfRunable runnable) { 
  130.         task.add(runnable); 
  131.     } 
  132.  
  133.  
  134. public class Acceptor implements SelfRunable { 
  135.  
  136.     int next = 0; 
  137.     String name
  138.     SubReactor[] subReactorPool; 
  139.     ServerSocketChannel serverSocket; 
  140.  
  141.     public Acceptor(String name, ServerSocketChannel serverSocket,SubReactor[] subReactorPool) { 
  142.         this.name = name
  143.         this.serverSocket = serverSocket; 
  144.         this.subReactorPool = subReactorPool; 
  145.     } 
  146.  
  147.     @Override 
  148.     public void run() { 
  149.         try { 
  150.             SocketChannel socket = this.serverSocket.accept(); 
  151.             if (socket != null) { 
  152.                 new Handler("handler", subReactorPool[next].getSelector(),socket); 
  153.             } 
  154.             if (++next == subReactorPool.length) {next=0;} 
  155.  
  156.         } catch (IOException e) { 
  157.             e.printStackTrace(); 
  158.         } 
  159.     } 
  160.  
  161.     @Override 
  162.     public String getName() { 
  163.         return this.name
  164.     } 
  165.  
  166. public class Handler implements SelfRunable { 
  167.  
  168.     String name
  169.     Selector selector; 
  170.     SocketChannel socket; 
  171.     SelectionKey sk; 
  172.  
  173.     ByteBuffer input = ByteBuffer.allocate(1024); 
  174.     ByteBuffer output = ByteBuffer.allocate(1024); 
  175.     static final int READING = 0, SENDING = 1,  PROCESSING = 3; 
  176.     volatile int state = READING; 
  177.  
  178.     static ExecutorService poolExecutor = Executors.newFixedThreadPool(5); 
  179.  
  180.     public Handler(String name, Selector selector, SocketChannel socket) throws IOException { 
  181.         this.selector = selector; 
  182.         this.socket = socket; 
  183.         this.name = name
  184.  
  185.         this.socket.configureBlocking(false); 
  186.         sk = this.socket.register(this.selector,0); 
  187.         sk.attach(this); 
  188.         sk.interestOps(SelectionKey.OP_READ); 
  189.         selector.wakeup(); 
  190.     } 
  191.  
  192.     @Override 
  193.     public void run() { 
  194.         try{ 
  195.             System.out.println("state:" + state); 
  196.             if (state == READING) { 
  197.                 read(); 
  198.             } else if (state == SENDING) { 
  199.                 send(); 
  200.             } 
  201.         } catch (IOException ex) { 
  202.             ex.printStackTrace(); 
  203.         } 
  204.     } 
  205.  
  206.     synchronized void read() throws IOException { 
  207.         socket.read(input); 
  208.         if (inputIsComplete()) { 
  209.             state = PROCESSING; 
  210.            poolExecutor.execute(new Processer()); 
  211.         } 
  212.     } 
  213.  
  214.     synchronized void processAndHandOff() { 
  215.         System.out.println("processAndHandOff========="); 
  216.         process(); 
  217.         state = SENDING; // or rebind attachment 
  218.         sk.interestOps(SelectionKey.OP_WRITE); 
  219.         selector.wakeup(); 
  220.         System.out.println("processAndHandOff finish ! ========="); 
  221.     } 
  222.  
  223.     private void send() throws IOException { 
  224.         System.out.println("start send ..."); 
  225.         socket.write(output); 
  226.         socket.close(); 
  227.         System.out.println("start send finish!"); 
  228.         if (outputIsComplete()) sk.cancel(); 
  229.     } 
  230.  
  231.     boolean inputIsComplete() { return true;} 
  232.  
  233.     boolean outputIsComplete() {return true;} 
  234.  
  235.     void process(){ 
  236.         String msg = new String(input.array()); 
  237.         System.out.println("讀取內容:" + msg); 
  238.         output.put(msg.getBytes()); 
  239.         output.flip(); 
  240.     } 
  241.  
  242.     @Override 
  243.     public String getName() { 
  244.         return this.name
  245.     } 
  246.  
  247.     class Processer implements Runnable { 
  248.         public void run() { processAndHandOff(); } 
  249.     } 

Reactor線程模型演進

模型

簡介

弊端

單線程模型

IO/非IO操作都在Reactor單線程中完成

非IO操作執行慢,影響IO操作響應延遲

多線程模型

拆分非IO操作交給業務線程池執行,IO操作由Reator單線程執行

高并發,高負載場景下,Reactor單線程會成為瓶頸

主從線程模型

Reactor單線程拆分為mainReactor和subReactor

不適合大量數據傳輸

Netty線程模型

Reactor主從線程模型-抽象模型

  • 創建ServerSocketChannel過程(創建channel,配置非阻塞)
  • ServerSocketChannel注冊到mainReactor的selector對象上,監聽accept事件
  • mainReactor的selector監聽到新連接SocketChannel,將SocketChannel注冊到subReactor的selector對象上,監聽read/write事件
  • subReactor的selector監聽到read/write事件,移交給業務線程池(對應netty的pipeline)

Netty線程模型

我們再好好看看mainReactor和subReactor,其實這兩個類功能非常相似,所以Netty將mainReactor和subReactor統一成了EventLoop。對于Netty零基礎的,請參考這個Reactor主從線程模型-抽象模型和下面這張圖來理解EventLoop。

 

 

責任編輯:武曉燕 來源: 今日頭條
相關推薦

2022-03-04 08:10:35

NettyIO模型Reactor

2022-03-06 12:15:38

NettyReactor線程

2025-05-08 10:25:00

Netty網絡編程框架

2021-06-16 14:18:37

NettyReactor線程模型

2022-10-25 08:23:09

Reactor模式I/O

2012-08-24 09:58:09

ReactorDSSC

2022-03-10 07:58:12

ReactorNetty運轉架構

2022-09-29 15:39:10

服務器NettyReactor

2020-12-11 11:04:07

NettyIO

2022-02-09 09:37:54

ReactorNettyI/O

2024-11-22 08:00:00

Netty開發

2019-01-15 10:54:03

高性能ServerReactor

2022-05-24 15:46:51

Wi-FiSTA模式

2024-10-24 20:48:04

Netty線程Java

2020-08-21 07:23:50

工廠模式設計

2021-04-26 17:38:40

ReactorProactor網絡

2021-09-27 08:56:44

NettyChannelHand架構

2015-03-31 18:26:43

陌陌社交

2021-06-09 08:53:34

設計模式策略模式工廠模式

2012-02-29 09:41:14

JavaScript
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 美美女高清毛片视频免费观看 | 亚洲国产精品一区二区三区 | 日韩在线视频网址 | 欧美日韩一区在线 | 久视频在线 | 久久成人精品一区二区三区 | 亚洲一级毛片 | 色狠狠桃花综合 | 日韩精品一区二 | 亚洲欧美在线视频 | 91正在播放 | 色久影院 | 免费一区二区 | 久久国产精品-久久精品 | 91麻豆精品国产91久久久更新资源速度超快 | 久久久久久久久一区 | 亚洲网站在线观看 | 成人免费毛片在线观看 | 精品免费国产一区二区三区 | 日韩喷潮| 一区精品在线观看 | 国产九九精品视频 | 欧美性乱| 国产成人在线一区二区 | 欧美片网站免费 | 一级毛片在线播放 | 中文字幕国产日韩 | 欧美日韩国产不卡 | 亚洲免费在线 | 国产美女一区 | 日日爱av | 色悠悠久 | 久久精品国产99国产精品 | www.成人.com | 日韩伦理一区二区 | 免费精品久久久久久中文字幕 | 日韩精品一区二区三区在线播放 | www久久av| 亚洲第一成年免费网站 | 国产免费一区二区三区 | 亚洲精品国产电影 |