Netty源碼之Reactor模式
學習目標
- 什么是Reactor模式?
- Reactor模式由什么組成的?
- Reactor模式解決什么問題?
- Reactor模式線程模型有哪些?演進過程?
web處理請求架構
大多數web請求處理流程可以抽象成這幾個步驟:讀取(read),解碼(decode),處理(process),編碼(encode),發送(send),如下圖所示:
同時,處理web請求通常有兩種架構:傳統基于線程架構和事件驅動架構。
傳統基于線程架構
概念
每個新連接創建一個線程來處理。對于長連接服務,如果一個client和server保持一個連接的話,有多少個client接入,server就需要創建同等的線程來處理。線程上下文切換,數據同步和內存消耗,對server來說,將是非常大的開銷。
代碼實現
傳統基于線程架構通常采用BIO的方式來實現,代碼如下:
- public class Server implements Runnable {
- int port;
- public Server(int port) {
- this.port = port;
- }
- @Override
- public void run() {
- try {
- ServerSocket serverSocket = new ServerSocket(port);
- while (true){
- System.out.println("等待新連接...");
- new Thread(new Handler(serverSocket.accept())).start();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- static class Handler implements Runnable{
- private Socket socket;
- public Handler(Socket socket){
- this.socket = socket;
- }
- @Override
- public void run() {
- try {
- byte[] input = new byte[1024];
- this.socket.getInputStream().read(input);
- byte[] output = process(input);
- this.socket.getOutputStream().write(output);
- this.socket.getOutputStream().flush();
- this.socket.close();
- System.out.println("響應完成!");
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private byte[] process(byte[] input) {
- System.out.println("讀取內容:" + new String(input));
- return input;
- }
- }
- public static void main(String[] args) throws InterruptedException {
- Thread thread = new Thread(new Server(2021));
- thread.setDaemon(true);
- thread.start();
- synchronized (Server.class) {
- Server.class.wait();
- }
- }
- }
為了避免線程創建銷毀的開銷,我們通常會采用線程池,但是同樣也有很大的弊端:
- 同步阻塞IO,讀寫阻塞,線程等待時間過長
- 在制定線程策略的時候,只能根據CPU的數目來限定可用線程資源,不能根據連接并發數目來制定,也就是連接有限制。否則很難保證對客戶端請求的高效和公平。
- 多線程之間的上下文切換,造成線程使用效率并不高,并且不易擴展
- 狀態數據以及其他需要保持一致的數據,需要采用并發同步控制
應用場景
既然傳統基于線程架構弊端這么多,它存在還有什么價值?它的應用場景是什么?
傳統基于線程架構適用于連接數目比較小且一次傳輸大量數據的場景,比如上傳,下載。
事件驅動架構
事件驅動架構:可以把線程和連接解耦,線程只用于執行事件注冊的回調函數。事件驅動架構由事件生產者和事件消費者組成,前者是事件的來源,它只負責監聽哪些事件發生;后者是直接處理事件或者事件發生時,響應事件的實體。
Reactor模式
什么是Reactor模式?
Reactor模式是事件驅動架構的一種具體實現方法,簡而言之,就是一個單線程進行循環監聽就緒IO事件,并將就緒IO事件分發給對應的回調函數。
Reactor模式由什么組成的?
Reactor模式分為兩個重要組成部分,Reactor和Handler。 Reactor(反應器):循環監聽就緒IO事件,并分發給回調函數。 Handler(回調函數):執行對應IO事件的實際業務邏輯。
Reactor模式解決什么問題?
反應器模式可以實現同步的多路復用,同步是指按照事件到達的順序分發處理。反應器 接收來自不同的客戶端的消息、請求和連接,盡管客戶端是并發的,但是反應器可以按照事件到達的順序觸發回調函數。因此,Reactor模式將連接和線程解耦,不需要為每個連接創建單獨線程。這個問題和C10K問題相同,提供了一個解決思路。
Reactor模式下的三種模型
單線程模型:IO事件輪詢,分發(accept)和IO事件執行(read,decode,compute,encode,send)都在一個線程中完成,如下圖所示:
在單線程模型下,不僅IO操作在Reactor線程上,而非IO操作(handlder中process()方法)也在Reactor線程上執行了,當非IO操作執行慢的話,這會大大延遲IO請求響應,所以應該把非IO操作拆出來,來加速Reactor線程對IO請求響應,就出現多線程模型。
單線程模型實現:
- // reactor
- public class Reactor implements Runnable {
- int port;
- Selector selector;
- ServerSocketChannel serverSocket;
- public Reactor(int port) throws IOException {
- this.port = port;
- // 創建serverSocket對象
- serverSocket = ServerSocketChannel.open();
- // 綁定端口
- serverSocket.socket().bind(new InetSocketAddress(port));
- // 配置非阻塞
- serverSocket.configureBlocking(false);
- // 創建selector對象
- selector = Selector.open();
- // serversocket注冊到selector上,幫忙監聽accpet事件
- serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverSocket,selector));
- /** 還可以使用 SPI provider,來創建selector和serversocket對象
- SelectorProvider p = SelectorProvider.provider();
- selector = p.openSelector();
- serverSocket = p.openServerSocketChannel();
- */
- }
- @Override
- public void run() {
- try {
- while (!Thread.interrupted()) {
- System.out.println("start select event...");
- selector.select();
- Set selectedKeys = selector.selectedKeys();
- Iterator it = selectedKeys.iterator();
- while (it.hasNext()) {
- dispatch((SelectionKey)it.next());
- }
- selectedKeys.clear();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private void dispatch(SelectionKey key) {
- Runnable r = (Runnable) key.attachment();
- if (r != null) {
- r.run();
- }
- }
- public static void main(String[] args) throws IOException, InterruptedException {
- Thread thread = new Thread(new Reactor(2021));
- thread.start();
- synchronized (Reactor.class) {
- Reactor.class.wait();
- }
- }
- }
- // acceptor調度器
- public class Acceptor implements Runnable {
- ServerSocketChannel serverSocket;
- Selector selector;
- public Acceptor(ServerSocketChannel serverSocket,Selector selector) {
- this.serverSocket = serverSocket;
- this.selector = selector;
- }
- @Override
- public void run() {
- try {
- SocketChannel socket = this.serverSocket.accept();
- if (socket != null) {
- new Handler(selector,socket);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- // 回調函數handler
- public class Handler implements Runnable {
- Selector selector;
- SocketChannel socket;
- SelectionKey sk;
- ByteBuffer input = ByteBuffer.allocate(1024);
- ByteBuffer output = ByteBuffer.allocate(1024);
- static final int READING = 0, SENDING = 1;
- int state = READING;
- public Handler(Selector selector, SocketChannel socket) throws IOException {
- this.selector = selector;
- this.socket = socket;
- this.socket.configureBlocking(false);
- sk = this.socket.register(selector,0);
- sk.attach(this);
- sk.interestOps(SelectionKey.OP_READ);
- selector.wakeup();
- }
- @Override
- public void run() {
- try{
- if (state == READING) {
- read();
- } else if (state == SENDING) {
- send();
- }
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
- private void read() throws IOException {
- socket.read(input);
- if (inputIsComplete()) {
- // 執行業務邏輯代碼
- process();
- state = SENDING;
- // Normally also do first write now
- sk.interestOps(SelectionKey.OP_WRITE);
- }
- }
- private void send() throws IOException {
- socket.write(output);
- socket.close();
- if (outputIsComplete()) sk.cancel();
- }
- boolean inputIsComplete() { return true;}
- boolean outputIsComplete() {return true;}
- // 處理非IO操作(業務邏輯代碼)
- void process(){
- String msg = new String(input.array());
- System.out.println("讀取內容:" + msg);
- output.put(msg.getBytes());
- output.flip();
- }
- }
- 多線程模型:與單線程模型不同的是添加一個業務線程池,將非IO操作(業務邏輯處理)交給業務線程池來處理,提高Reactor線程的IO響應,如圖所示:
在多線程模型下,雖然將非IO操作拆出去了,但是所有IO操作都在Reactor單線程中完成的。在高負載、高并發場景下,也會成為瓶頸,于是對Reactor單線程進行了優化,出現了主從線程模型。
多線程模型實現:
- public class Reactor implements Runnable {
- int port;
- Selector selector;
- ServerSocketChannel serverSocket;
- public Reactor(int port) throws IOException {
- this.port = port;
- // 創建serverSocket對象
- serverSocket = ServerSocketChannel.open();
- // 綁定端口
- serverSocket.socket().bind(new InetSocketAddress(port));
- // 配置非阻塞
- serverSocket.configureBlocking(false);
- // 創建selector對象
- selector = Selector.open();
- // serversocket注冊到selector上,幫忙監聽accpet事件
- serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor("Acceptor",serverSocket,selector));
- /** 還可以使用 SPI provider,來創建selector和serversocket對象
- SelectorProvider p = SelectorProvider.provider();
- selector = p.openSelector();
- serverSocket = p.openServerSocketChannel();
- */
- }
- @Override
- public void run() {
- try {
- while (!Thread.interrupted()) {
- System.out.println("start select event...");
- selector.select();
- Set selectedKeys = selector.selectedKeys();
- Iterator it = selectedKeys.iterator();
- while (it.hasNext()) {
- dispatch((SelectionKey)it.next());
- }
- selectedKeys.clear();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private void dispatch(SelectionKey key) {
- SelfRunable r = (SelfRunable) key.attachment();
- if (r != null) {
- System.out.println("dispatch to " + r.getName() + "====");
- r.run();
- }
- }
- public static void main(String[] args) throws IOException, InterruptedException {
- Thread thread = new Thread(new Reactor(2021));
- thread.start();
- synchronized (Reactor.class) {
- Reactor.class.wait();
- }
- }
- }
- public class Acceptor implements SelfRunable {
- ServerSocketChannel serverSocket;
- Selector selector;
- String name;
- public Acceptor(String name, ServerSocketChannel serverSocket,Selector selector) {
- this.name = name;
- this.serverSocket = serverSocket;
- this.selector = selector;
- }
- @Override
- public void run() {
- try {
- SocketChannel socket = this.serverSocket.accept();
- if (socket != null) {
- new Handler("handler_" + ((InetSocketAddress)socket.getLocalAddress()).getPort(), selector,socket);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- @Override
- public String getName() {
- return this.name;
- }
- }
- public class Handler implements SelfRunable {
- String name;
- Selector selector;
- SocketChannel socket;
- SelectionKey sk;
- ByteBuffer input = ByteBuffer.allocate(1024);
- ByteBuffer output = ByteBuffer.allocate(1024);
- static final int READING = 0, SENDING = 1, PROCESSING = 3;
- volatile int state = READING;
- static ExecutorService poolExecutor = Executors.newFixedThreadPool(5);
- public Handler(String name, Selector selector, SocketChannel socket) throws IOException {
- this.selector = selector;
- this.socket = socket;
- this.name = name;
- this.socket.configureBlocking(false);
- sk = this.socket.register(selector,0);
- sk.attach(this);
- sk.interestOps(SelectionKey.OP_READ);
- selector.wakeup();
- }
- @Override
- public void run() {
- try{
- System.out.println("state:" + state);
- if (state == READING) {
- read();
- } else if (state == SENDING) {
- send();
- }
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
- synchronized void read() throws IOException {
- socket.read(input);
- if (inputIsComplete()) {
- state = PROCESSING;
- poolExecutor.execute(new Processer());
- }
- }
- synchronized void processAndHandOff() {
- System.out.println("processAndHandOff=========");
- process();
- state = SENDING; // or rebind attachment
- sk.interestOps(SelectionKey.OP_WRITE);
- selector.wakeup();
- System.out.println("processAndHandOff finish ! =========");
- }
- private void send() throws IOException {
- System.out.println("start send ...");
- socket.write(output);
- socket.close();
- System.out.println("start send finish!");
- if (outputIsComplete()) sk.cancel();
- }
- boolean inputIsComplete() { return true;}
- boolean outputIsComplete() {return true;}
- void process(){
- String msg = new String(input.array());
- System.out.println("讀取內容:" + msg);
- output.put(msg.getBytes());
- output.flip();
- }
- @Override
- public String getName() {
- return this.name;
- }
- class Processer implements Runnable {
- public void run() { processAndHandOff(); }
- }
- }
- 主從線程模型: 相比多線程模型而言,對于多核cpu,為了充分利用資源,將Reactor拆分成了mainReactor 和 subReactor,但是,主從線程模型也有弊端,不適合大量數據傳輸。 mainReactor:負責監聽接收(accpet)新連接,將新連接后續操作交給subReactor來處理,通常由一個線程處理。 subReactor: 負責處理IO的讀寫操作,通常由多個線程處理。 非IO操作依然由業務線程池來處理。
主從線程模型實現:
- public class Reactor implements Runnable {
- int port;
- Selector selector;
- ServerSocketChannel serverSocket;
- int SUBREACTOR_SIZE = 1;
- SubReactor[] subReactorPool = new SubReactor[SUBREACTOR_SIZE];
- public Reactor(int port) throws IOException {
- this.port = port;
- // 創建serverSocket對象
- serverSocket = ServerSocketChannel.open();
- // 綁定端口
- serverSocket.socket().bind(new InetSocketAddress(port));
- // 配置非阻塞
- serverSocket.configureBlocking(false);
- // 創建selector對象
- selector = Selector.open();
- // serversocket注冊到selector上,幫忙監聽accpet事件
- serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor("Acceptor",serverSocket,subReactorPool));
- // 初始化subreactor pool
- initSubReactorPool();
- /** 還可以使用 SPI provider,來創建selector和serversocket對象
- SelectorProvider p = SelectorProvider.provider();
- selector = p.openSelector();
- serverSocket = p.openServerSocketChannel();
- */
- }
- @Override
- public void run() {
- try {
- while (!Thread.interrupted()) {
- System.out.println("mainReactor start select event...");
- selector.select();
- Set selectedKeys = selector.selectedKeys();
- Iterator it = selectedKeys.iterator();
- while (it.hasNext()) {
- dispatch((SelectionKey)it.next());
- }
- selectedKeys.clear();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- void initSubReactorPool() {
- try {
- for (int i = 0; i < SUBREACTOR_SIZE; i++) {
- subReactorPool[i] = new SubReactor("SubReactor" + i);
- }
- } catch (IOException ex) { /* ... */ }
- }
- private void dispatch(SelectionKey key) {
- SelfRunable r = (SelfRunable) key.attachment();
- if (r != null) {
- System.out.println("mainReactor dispatch to " + r.getName() + "====");
- r.run();
- }
- }
- public static void main(String[] args) throws IOException, InterruptedException {
- Thread thread = new Thread(new Reactor(2021));
- thread.start();
- synchronized (Reactor.class) {
- Reactor.class.wait();
- }
- }
- }
- public class SubReactor implements SelfRunable {
- private Selector selector;
- private String name;
- private List<SelfRunable> task = new ArrayList<SelfRunable>();
- public SubReactor(String name) throws IOException {
- this.name = name;
- selector = Selector.open();
- new Thread(this).start();
- }
- @Override
- public String getName() {
- return this.name;
- }
- @Override
- public void run() {
- try {
- while (!Thread.interrupted()) {
- System.out.println("subReactor start select event...");
- selector.select(5000);
- Set selectedKeys = selector.selectedKeys();
- Iterator it = selectedKeys.iterator();
- while (it.hasNext()) {
- dispatch((SelectionKey)it.next());
- }
- selectedKeys.clear();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private void dispatch(SelectionKey key) {
- SelfRunable r = (SelfRunable) key.attachment();
- if (r != null) {
- System.out.println("subReactor dispatch to " + r.getName() + "====");
- r.run();
- }
- }
- public Selector getSelector(){
- return this.selector;
- }
- public void submit(SelfRunable runnable) {
- task.add(runnable);
- }
- }
- public class Acceptor implements SelfRunable {
- int next = 0;
- String name;
- SubReactor[] subReactorPool;
- ServerSocketChannel serverSocket;
- public Acceptor(String name, ServerSocketChannel serverSocket,SubReactor[] subReactorPool) {
- this.name = name;
- this.serverSocket = serverSocket;
- this.subReactorPool = subReactorPool;
- }
- @Override
- public void run() {
- try {
- SocketChannel socket = this.serverSocket.accept();
- if (socket != null) {
- new Handler("handler", subReactorPool[next].getSelector(),socket);
- }
- if (++next == subReactorPool.length) {next=0;}
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- @Override
- public String getName() {
- return this.name;
- }
- }
- public class Handler implements SelfRunable {
- String name;
- Selector selector;
- SocketChannel socket;
- SelectionKey sk;
- ByteBuffer input = ByteBuffer.allocate(1024);
- ByteBuffer output = ByteBuffer.allocate(1024);
- static final int READING = 0, SENDING = 1, PROCESSING = 3;
- volatile int state = READING;
- static ExecutorService poolExecutor = Executors.newFixedThreadPool(5);
- public Handler(String name, Selector selector, SocketChannel socket) throws IOException {
- this.selector = selector;
- this.socket = socket;
- this.name = name;
- this.socket.configureBlocking(false);
- sk = this.socket.register(this.selector,0);
- sk.attach(this);
- sk.interestOps(SelectionKey.OP_READ);
- selector.wakeup();
- }
- @Override
- public void run() {
- try{
- System.out.println("state:" + state);
- if (state == READING) {
- read();
- } else if (state == SENDING) {
- send();
- }
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
- synchronized void read() throws IOException {
- socket.read(input);
- if (inputIsComplete()) {
- state = PROCESSING;
- poolExecutor.execute(new Processer());
- }
- }
- synchronized void processAndHandOff() {
- System.out.println("processAndHandOff=========");
- process();
- state = SENDING; // or rebind attachment
- sk.interestOps(SelectionKey.OP_WRITE);
- selector.wakeup();
- System.out.println("processAndHandOff finish ! =========");
- }
- private void send() throws IOException {
- System.out.println("start send ...");
- socket.write(output);
- socket.close();
- System.out.println("start send finish!");
- if (outputIsComplete()) sk.cancel();
- }
- boolean inputIsComplete() { return true;}
- boolean outputIsComplete() {return true;}
- void process(){
- String msg = new String(input.array());
- System.out.println("讀取內容:" + msg);
- output.put(msg.getBytes());
- output.flip();
- }
- @Override
- public String getName() {
- return this.name;
- }
- class Processer implements Runnable {
- public void run() { processAndHandOff(); }
- }
- }
Reactor線程模型演進
模型 |
簡介 |
弊端 |
單線程模型 |
IO/非IO操作都在Reactor單線程中完成 |
非IO操作執行慢,影響IO操作響應延遲 |
多線程模型 |
拆分非IO操作交給業務線程池執行,IO操作由Reator單線程執行 |
高并發,高負載場景下,Reactor單線程會成為瓶頸 |
主從線程模型 |
Reactor單線程拆分為mainReactor和subReactor |
不適合大量數據傳輸 |
Netty線程模型
Reactor主從線程模型-抽象模型
- 創建ServerSocketChannel過程(創建channel,配置非阻塞)
- ServerSocketChannel注冊到mainReactor的selector對象上,監聽accept事件
- mainReactor的selector監聽到新連接SocketChannel,將SocketChannel注冊到subReactor的selector對象上,監聽read/write事件
- subReactor的selector監聽到read/write事件,移交給業務線程池(對應netty的pipeline)
Netty線程模型
我們再好好看看mainReactor和subReactor,其實這兩個類功能非常相似,所以Netty將mainReactor和subReactor統一成了EventLoop。對于Netty零基礎的,請參考這個Reactor主從線程模型-抽象模型和下面這張圖來理解EventLoop。