NIO之多線程協作處理數據讀寫
本文轉載自微信公眾號「源碼學徒」,作者皇甫嗷嗷叫 。轉載本文請聯系源碼學徒公眾號。
經過前面幾章的學習,我們已經 能夠掌握了JDK NIO的開發方式,我們來總結一下NIO開發的流程:
- 創建一個服務端通道 ServerSocketChannel
- 創建一個選擇器 Selector
- 將服務端通道注冊到選擇器上,并且關注我們感興趣的事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- 綁定服務管道的地址 serverSocketChannel.bind(new InetSocketAddress(8989));
- 開始進行事件選擇,選擇我們感興趣的事件做對應的操作!
具體的代碼信息請參照第一章:多路復用模型章節,這里不做太多的贅述!
有關多路復用的概念,我們也在第一章進行了分析。多路復用模型能夠最大限度的將一個線程的執行能力榨干,一條線程執行所有的數據,包括新連接的接入、數據的讀取、計算與回寫,但是假設,我們的數據計算及其緩慢,那么該任務的執行就勢必影響下一個新鏈接的接入!
傳統NIO單線程模型
單線程的NIO模型
如圖,我們能了解到,單線程情況下,讀事件因為要做一些業務性操作(數據庫連接、圖片、文件下載)等操作,導致線程阻塞再,讀事件的處理上,此時單線程程序無法進行下一次新鏈接的處理!我們對該線程模型進行優化,select事件處理封裝為任務,提交到線程池!
NIO多線程模型
上面的這種數據結構能夠解決掉因為計算任務耗時過長,導致新鏈接接入阻塞的問題,我們能否再次進行一次優化呢?
我們能否創建多個事件選擇器,每個事件選擇器,負責不同的Socket連接,就像下面這種:
NIO多線程優化模型
這樣我們就可以每一個Select選擇器負責多個客戶端Socket連接,主線程只需要將客戶端新連接選擇一個選擇器注冊到select選擇器上就可以了!所以我們的架構圖,就變成了下圖這樣:
我們在select選擇器內部處理計算任務的時候,也可以將任務封裝為task,提交到線程池里面去,徹底將新連接接入和讀寫事件處理分離開,互不影響!事實上,這也是Netty的核心思想之一,我們可以根據上面的圖例,自己簡單寫一個:
代碼實現
構建一個事件執行器 對應上圖的select選擇器
- /**
- * Nio事件處理器
- *
- * @author huangfu
- * @date
- */
- public class MyNioEventLoop implements Runnable {
- static final ByteBuffer ALLOCATE = ByteBuffer.allocate(128);
- private final Selector selector;
- private final LinkedBlockingQueue<Runnable> linkedBlockingQueue;
- public MyNioEventLoop(Selector selector) {
- this.selector = selector;
- linkedBlockingQueue = new LinkedBlockingQueue<>();
- }
- public Selector getSelector() {
- return selector;
- }
- public LinkedBlockingQueue<Runnable> getLinkedBlockingQueue() {
- return linkedBlockingQueue;
- }
- //忽略 hashCode和eques
- /**
- * 任務處理器
- */
- @Override
- public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- //進行事件選擇 這里我們只處理讀事件
- if (selector.select() > 0) {
- Set<SelectionKey> selectionKeys = selector.selectedKeys();
- Iterator<SelectionKey> iterator = selectionKeys.iterator();
- //處理讀事件
- while (iterator.hasNext()) {
- SelectionKey next = iterator.next();
- iterator.remove();
- if (next.isReadable()) {
- SocketChannel channel = (SocketChannel) next.channel();
- int read = channel.read(ALLOCATE);
- if(read > 0) {
- System.out.printf("線程%s【%s】發來消-息:",Thread.currentThread().getName(), channel.getRemoteAddress());
- System.out.println(new String(ALLOCATE.array(), StandardCharsets.UTF_8));
- }else if(read == -1) {
- System.out.println("連接斷開");
- channel.close();
- }
- ALLOCATE.clear();
- }
- }
- selectionKeys.clear();
- }else {
- //處理異步任務 進行注冊
- while (!linkedBlockingQueue.isEmpty()) {
- Runnable take = linkedBlockingQueue.take();
- //異步事件執行
- take.run();
- }
- }
- } catch (IOException | InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
構建一個選擇器組
- /**
- * 選擇器組
- *
- * @author huangfu
- * @date 2021年3月12日09:44:37
- */
- public class SelectorGroup {
- private final List<MyNioEventLoop> SELECTOR_GROUP = new ArrayList<>(8);
- private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
- private final static AtomicInteger IDX = new AtomicInteger();
- /**
- * 初始化選擇器
- * @param count 處理器數量
- * @throws IOException 異常欣喜
- */
- public SelectorGroup(int count) throws IOException {
- for (int i = 0; i < count; i++) {
- Selector open = Selector.open();
- MyNioEventLoop myNioEventLoop = new MyNioEventLoop(open);
- SELECTOR_GROUP.add(myNioEventLoop);
- }
- }
- public SelectorGroup() throws IOException {
- this(AVAILABLE_PROCESSORS << 1);
- }
- /**
- * 輪詢獲取一個選擇器
- * @return 返回一個選擇器
- */
- public MyNioEventLoop next(){
- int andIncrement = IDX.getAndIncrement();
- int length = SELECTOR_GROUP.size();
- return SELECTOR_GROUP.get(Math.abs(andIncrement % length));
- }
- }
構建一個執行器記錄器
- /**
- * @author huangfu
- * @date
- */
- public class ThreadContext {
- /**
- * 記錄當前使用過的選擇器
- */
- public static final Set<MyNioEventLoop> RUN_SELECT = new HashSet<>();
- }
構建一個新連接接入選擇器
- /**
- * 連接器
- *
- * @author huangfu
- * @date 2021年3月12日10:15:37
- */
- public class Acceptor implements Runnable {
- private final ServerSocketChannel serverSocketChannel;
- private final SelectorGroup selectorGroup;
- public Acceptor(ServerSocketChannel serverSocketChannel, SelectorGroup selectorGroup) {
- this.serverSocketChannel = serverSocketChannel;
- this.selectorGroup = selectorGroup;
- }
- @Override
- public void run() {
- try {
- SocketChannel socketChannel = serverSocketChannel.accept();
- MyNioEventLoop next = selectorGroup.next();
- //向隊列追加一個注冊任務
- next.getLinkedBlockingQueue().offer(() -> {
- try {
- //客戶端注冊為非阻塞
- socketChannel.configureBlocking(false);
- //注冊到選擇器 關注一個讀事件
- socketChannel.register(next.getSelector(), SelectionKey.OP_READ);
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
- //喚醒對應的任務,讓其處理異步任務
- next.getSelector().wakeup();
- System.out.println("檢測到連接:" + socketChannel.getRemoteAddress());
- //當當前選擇器已經被使用過了 就不再使用了,直接注冊就行了
- if (ThreadContext.RUN_SELECT.add(next)) {
- //啟動任務
- new Thread(next).start();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
創建啟動器
- /**
- * @author huangfu
- * @date
- */
- public class TestMain {
- public static void main(String[] args) throws IOException {
- //創建一個選擇器組 傳遞選擇器組的大小 決定使用多少選擇器來實現
- SelectorGroup selectorGroup = new SelectorGroup(2);
- //開啟一個服務端管道
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- //開啟一個服務端專用的選擇器
- Selector selector = Selector.open();
- //設置非阻塞
- serverSocketChannel.configureBlocking(false);
- //創建一個連接器
- Acceptor acceptor = new Acceptor(serverSocketChannel, selectorGroup);
- //將服務端通道注冊到服務端選擇器上 這里會綁定一個新連接接入器
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, acceptor);
- //綁定端口
- serverSocketChannel.bind(new InetSocketAddress(8989));
- //啟動處理器
- new Reactor(selector).run();
- }
- }
總結
單線程下的NIO存在性能瓶頸,當某一計算過程緩慢的時候會阻塞住整個線程,導致影響其他事件的處理!
為了解決這一缺陷,我們提出了使用異步線程的方式去操作任務,將耗時較長的業務,封裝為一個異步任務,提交到線程池執行!
為了使業務操作和新連接接入完全分離開,我們做了另外一重優化,我們封裝了一個選擇器組,輪詢的方式獲取選擇器,每一個選擇器都能夠處理多個新連接, socket連接->selector選擇器 = 多 -> 1,在每一個選擇器里面又可以使用線程池來處理任務,進一步提高吞吐量!