成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Nodejs-Ipc的設計與實現

網絡 通信技術
對于有繼承關系的進程,nodejs本身為我們提供了進程間通信的方式,但是對于沒有繼承關系的進程,比如兄弟進程,想要通信最簡單的方式就是通過主進程中轉,類似前端框架中子組件通過更新父組件的數據,然后父通知其他子組件。

 [[347927]]

本文轉載自微信公眾號「編程雜技」,作者theanarkh。轉載本文請聯系編程雜技公眾號。    

對于有繼承關系的進程,nodejs本身為我們提供了進程間通信的方式,但是對于沒有繼承關系的進程,比如兄弟進程,想要通信最簡單的方式就是通過主進程中轉,類似前端框架中子組件通過更新父組件的數據,然后父通知其他子組件。因為nodejs的進程間通信需要經過序列化和反序列化,所以這種方式可能會帶來一定的性能損耗,而且在實現上也比較麻煩。今天介紹的是兄弟進程直接通信的方式。在windows上使用tcp,在非windows上使用unix域,本機通信,unix域性能上會更好,因為tcp通信需要經過協議棧的封包和解包。下面具體介紹一下這個ipc庫的設計和實現。

設計思想主要是一個進程啟動一個服務,然后其他客戶端進程可以通過地址信息去和服務器建立長連接。這里沒有使用短連接,短連接雖然在實現上會變得容易,但是頻繁通信的進程,不斷地創建和銷毀數據結構會帶來一定的開銷,長連接雖然會一直占用內存,但是這是非常小的,而長連接帶來的效率明顯會更好。不過長連接會帶來一個難點,那就是對數據的解析,比如對于tcp來說,我們從中拿到的是一串字節流,這一串字節流中可能有幾個數據包的數據,我們需要從這一串字節流中解析出一個個數據包。這就涉及到協議的設計。所以首先我們要定義一個應用層協議。

1 應用層協議的設計和實現

null應用層協議的設計非常簡單

  1. 總長度是除了開頭標記之外的其他數據長度。因為數據部分是變長的,所以我們需要一個總長度來判斷后續的數據長度是多少。
  2. 序列號是用于關聯請求和響應,因為我們在一個連接上可能會串行發送多個數據包,當我們收到一個回包的時候,我們不知道是來自哪個請求的響應,通過響應體中的seq,我們就知道是來自哪個請求的響應。設計了通信協議后,我們就需要對協議進行封包解包。首先我們看一下封包邏輯。
  1. function seq() { 
  2.    return ~~(Math.random() * Math.pow(2, 31)) 
  3. function packet(data, sequnce) { 
  4.     // 轉成buffer 
  5.     const bufferData = Buffer.from(data, 'utf-8'); 
  6.     // 開始標記長度 
  7.     const startFlagLength = Buffer.from([PACKET_START]).byteLength; 
  8.     // 序列號 
  9.     const seq = sequnce || seq(); 
  10.     // 分配一個buffer存儲數據 
  11.     let buffer = Buffer.allocUnsafe(startFlagLength + TOTAL_LENGTH + SEQ_LEN); 
  12.     // 設計開始標記 
  13.     buffer[0] = 0x3; 
  14.     // 寫入總長度字段的值 
  15.     buffer.writeUIntBE(TOTAL_LENGTH + SEQ_LEN + bufferData.byteLength, 1, TOTAL_LENGTH); 
  16.     // 寫入序列號的值 
  17.     buffer.writeUIntBE(seq, startFlagLength + TOTAL_LENGTH, SEQ_LEN); 
  18.     // 把協議元數據和數據組裝到一起 
  19.     buffer = Buffer.concat([buffer, bufferData], buffer.byteLength + bufferData.byteLength); 
  20.     return buffer; 

接著我們看一下解包的邏輯,因為數據的傳輸是字節流,所以有可能多個數據包的數據會粘在一起,所以我們第一步首先要根據協議解析出一個個數據包,然后再解析每一個數據包。我們通過有限狀態機實現數據的解析。下面是狀態機的狀態集。

  1. const PARSE_STATE = { 
  2.   PARSE_INIT: 0, 
  3.   PARSE_HEADER: 1, 
  4.   PARSE_DATA: 2, 
  5.   PARSE_END: 3, 
  6. }; 

接著我們定義狀態集的轉換規則。

  1. class StateSwitcher { 
  2.     constructor(options) { 
  3.         this.options = options; 
  4.     } 
  5.  
  6.     [PARSE_STATE.PARSE_INIT](data) { 
  7.         // 數據不符合預期 
  8.         if (data[0] !== PACKET_START) { 
  9.             // 跳過部分數據,找到開始標記 
  10.             const position = data.indexOf(PACKET_START); 
  11.             // 沒有開始標記,說明這部分數據無效,丟棄 
  12.             if (position === -1) { 
  13.                 return [NEED_MORE_DATA, null]; 
  14.             } 
  15.             // 否則返回有效數據部分,繼續解析 
  16.             return [PARSE_STATE.PACKET_START, data.slice(position)]; 
  17.         } 
  18.         // 保存當前正在解析的數據包 
  19.         this.packet = new Packet(); 
  20.         // 跳過開始標記的字節數,進入解析協議頭階段 
  21.         return [PARSE_STATE.PARSE_HEADER, data.slice(Buffer.from([PACKET_START]).byteLength)]; 
  22.     }  
  23.  
  24.     [PARSE_STATE.PARSE_HEADER](data) { 
  25.         // 數據不夠頭部的大小則等待數據到來 
  26.         if (data.length < TOTAL_LENGTH + SEQ_LEN) { 
  27.           return [NEED_MORE_DATA, data]; 
  28.         } 
  29.         // 有效數據包的長度 = 整個數據包長度 - 頭部長度 
  30.         this.packet.set('length', data.readUInt32BE() - (TOTAL_LENGTH + SEQ_LEN)); 
  31.         // 序列號 
  32.         this.packet.set('seq', data.readUInt32BE(TOTAL_LENGTH)); 
  33.         // 解析完頭部了,跳過去 
  34.         data = data.slice(TOTAL_LENGTH + SEQ_LEN); 
  35.         // 進入解析數據階段 
  36.         return [PARSE_STATE.PARSE_DATA, data]; 
  37.     } 
  38.  
  39.     [PARSE_STATE.PARSE_DATA](data) { 
  40.         const len = this.packet.get('length'); 
  41.         // 數據部分的長度小于協議頭中定義的長度,則繼續等待 
  42.         if (data.length < len) { 
  43.             return [NEED_MORE_DATA, data]; 
  44.         } 
  45.         // 截取數據部分 
  46.         this.packet.set('data', data.slice(0, len)); 
  47.         // 解析完數據了,完成一個包的解析,跳過數據部分 
  48.         data = data.slice(len); 
  49.         typeof this.options.cb === 'function' && this.options.cb(this.packet); 
  50.         this.packet = null
  51.         // 解析完一個數據包,進入結束標記階段 
  52.         return [PARSE_STATE.PARSE_INIT, data]; 
  53.     } 

我們再看一下狀態機的實現

  1. class FSM { 
  2.     constructor(options) { 
  3.         this.options = options; 
  4.         // 狀態處理機,定義了狀態轉移集合 
  5.         this.stateSwitcher = new StateSwitcher({cb: options.cb}); 
  6.         // 當前狀態 
  7.         this.state = PARSE_STATE.PARSE_INIT; 
  8.         // 結束狀態 
  9.         this.endState = PARSE_STATE.PARSE_END; 
  10.         // 當前待解析的數據 
  11.         this.buffer = null
  12.     } 
  13.  
  14.     run(data) { 
  15.         // 沒有數據或者解析結束了直接返回 
  16.         if (this.state === this.endState || !data || !data.length) { 
  17.             return
  18.         } 
  19.         // 保存待解析的數據 
  20.         this.buffer = this.buffer ? Buffer.concat([this.buffer, data]) : data; 
  21.         // 還沒結束,并且還有數據可以處理則繼續執行 
  22.         while(this.state !== this.endState && this.buffer && this.buffer.length) { 
  23.             // 執行狀態處理函數,返回[下一個狀態, 剩下的數據] 
  24.             const result = this.stateSwitcher[this.state](this.buffer); 
  25.             // 如果下一個狀態是NEED_MORE_DATA則說明需要更多的數據才能繼續解析,并保持當前狀態 
  26.             if (result[0] === NEED_MORE_DATA) { 
  27.                 return
  28.             } 
  29.             // 記錄下一個狀態和數據 
  30.             [this.state, this.buffer] = result; 
  31.         } 
  32.  
  33.     } 

狀態機就是對開始狀態、結束狀態、狀態轉換集的封裝。實現了協議的封包和解析后我們看一下如何使用。

2 IPC服務器的設計與實現

首先我們實現一個Client類表示和客戶端通信的實例。

  1. // Client代表一個和server建立連接的客戶端 
  2. class Client extends EventEmitter { 
  3.   constructor(options) { 
  4.     super(); 
  5.     this.options = options; 
  6.   } 
  7.   send(data) { 
  8.     this.options.client.write(data); 
  9.   } 
  10.   end(data) { 
  11.     this.options.client.end(data); 
  12.   } 

然后我們開始實現真正的IPC服務器

  1. class Server extends EventEmitter { 
  2.     constructor(options, connectionListener) { 
  3.       super(); 
  4.       this.options = { ...options }; 
  5.       // 根據平臺處理參數 
  6.       if (os.platform() === 'win32') { 
  7.         !~~this.options.port && (this.options.port = port); 
  8.         delete this.options.path; 
  9.       } else { 
  10.         !this.options.path && (this.options.path = path);  
  11.         delete this.options.host; 
  12.         delete this.options.port; 
  13.         fs.existsSync(this.options.path) && fs.unlinkSync(this.options.path); 
  14.         process.on('exit', () => { 
  15.           fs.existsSync(this.options.path) && fs.unlinkSync(this.options.path); 
  16.         }); 
  17.       } 
  18.       this.server = net.createServer({allowHalfOpen: true}, (client) => { 
  19.         const _client = new Client({client}); 
  20.         typeof connectionListener === 'function' && connectionListener(_client); 
  21.         const fsm = new FSM({ 
  22.             cb: function(packet) { 
  23.               _client.emit('message', packet); 
  24.             } 
  25.         }) 
  26.         client.on('data', fsm.run.bind(fsm)); 
  27.         client.on('end', () => { 
  28.           // 觸發end事件 
  29.           _client.emit('end'); 
  30.           // 用戶側沒有關閉寫端,則默認關閉 
  31.           !client.writableEnded && this.options.autoEnd !== false && client.end(); 
  32.         }); 
  33.         client.on('error', (error) => { 
  34.           _client.listenerCount('error') > 0 && _client.emit('error', error); 
  35.         }); 
  36.       }); 
  37.       this.server.listen(this.options, () => { 
  38.         this.emit('listen'); 
  39.       }); 
  40.       this.server.on('error', (error) => { 
  41.         this.listenerCount('error') > 0 && this.emit('error', error); 
  42.       }); 
  43.     } 

服務器是對tcp和unix域服務器的封裝,基于tcp或者unix域傳輸的數據由狀態機進行處理,狀態機解析完數據包后,通知調用方。

3 IPC客戶端的設計與實現

  1. class Client extends EventEmitter { 
  2.   constructor(options) { 
  3.     super(); 
  4.     this.options = { ...options }; 
  5.     this.socket = null
  6.     this.fsm = new FSM({ 
  7.         cb: (packet) => { 
  8.             this.emit('message', packet); 
  9.         } 
  10.     }) 
  11.   } 
  12.   initOnce() { 
  13.     if (!this.socket) { 
  14.       if (os.platform() === 'win32') { 
  15.         !~~this.options.port && (this.options.port = port); 
  16.         delete this.options.path; 
  17.       } else { 
  18.         !this.options.path && (this.options.path = path);  
  19.         delete this.options.host; 
  20.         delete this.options.port; 
  21.       } 
  22.       this.socket = net.connect({allowHalfOpen: true, ...this.options}); 
  23.       this.socket.on('data', this.fsm.run.bind(this.fsm)); 
  24.       this.socket.on('end', () => { 
  25.         // 觸發end事件 
  26.         this.emit('end'); 
  27.         // 用戶側沒有關閉寫端,則默認關閉 
  28.         !this.socket.writableEnded && this.options.autoEnd !== false && this.socket.end(); 
  29.       }); 
  30.       this.socket.on('error', (e) => { 
  31.         this.listenerCount('error') > 0 && this.emit('error', e); 
  32.       }); 
  33.     } 
  34.   } 
  35.   send(data) { 
  36.     this.initOnce(); 
  37.     this.socket.write(data); 
  38.     return this; 
  39.   } 
  40.   end(data) { 
  41.     this.initOnce(); 
  42.     this.socket.end(data); 
  43.   } 

客戶端和服務器類似,也是對tcp和unix域客戶端的封裝。其中數據也是由狀態機處理。

4 使用

接下來我們看一下如何使用。server.js

  1. const { Server, packet, seq } = require('../../'); 
  2. // window下使用tcp,非window使用unix域,即使傳了port  
  3. new Server({port: 80, path: '/tmp/unix.sock'}, function(client) { 
  4.     client.on('message', (data) => { 
  5.         console.log('receive', data); 
  6.         client.send(packet('world', data.seq)); 
  7.         client.send(packet('world', data.seq)); 
  8.     }); 
  9.     client.on('end', (data) => { 
  10.         client.end(); 
  11.     }); 
  12. }); 

client.js

  1. const { Client, packet, seq } = require('../../'); 
  2. const client = new Client({port: 80, path: '/tmp/unix.sock'}) 
  3. client.send(packet('hello', seq())); 
  4. client.send(packet('hello', seq())); 
  5. client.on('message'function(res) { 
  6.   console.log('receive', res); 
  7. }) 

服務器輸出

客戶端輸出

5 拓展

我們實現了數據的傳輸和解析,但是如何我們希望數據的請求和響應是一一對應的怎么辦呢?比如像http在tcp上可以并發發起多個請求一樣,響應是否可以亂序返回,我們又如何知道某個響應對應的是哪個請求?接下來介紹如何解決這個問題。首先我們實現一個請求管理的類。

  1. class RequestManager { 
  2.     constructor(options) { 
  3.         this.options = { timeout: 10000, ...options }; 
  4.         this.map = {}; 
  5.         this.timerId = null
  6.         this.startPollTimeout(); 
  7.     } 
  8.     set(key, context) { 
  9.         if (typeof context.cb !== 'function') { 
  10.             throw new Error('cb is required'); 
  11.         } 
  12.         this.map[key] = { 
  13.             startTime: Date.now(), 
  14.             ...context, 
  15.         }; 
  16.     } 
  17.     get(key) { 
  18.         return this.map[key]; 
  19.     } 
  20.     del(key) { 
  21.         return delete this.map[key]; 
  22.     } 
  23.     // 執行上下文 
  24.     exec(key, data) { 
  25.         const context = this.get(key); 
  26.         if (context) { 
  27.             this.del(key); 
  28.             context.cb(data); 
  29.         } 
  30.     }  
  31.     // 定時輪詢是否超時 
  32.     startPollTimeout() { 
  33.         this.timerId = setTimeout(() => { 
  34.             if (!this.timerId) { 
  35.                 return
  36.             } 
  37.             const nextMap = {}; 
  38.             for (const [key, context] of Object.entries(this.map)) { 
  39.                 if (Date.now() - context.startTime < (context.timeout || this.options.timeout)) { 
  40.                     nextMap[key] = context; 
  41.                 } else { 
  42.                     context.cb(new Error('timeout')); 
  43.                 } 
  44.             } 
  45.             this.map = nextMap; 
  46.             this.startPollTimeout(); 
  47.         }, 1000); 
  48.     } 

該類的邏輯主要是請求的seq保存對應的上下文,然后收到響應的時候,我們根據響應的seq拿到對應的上下文,從而執行對應的回調。我們看看如何使用該類。server.js

  1. const { Server, packet, seq } = require('../../'); 
  2. new Server({port: 80, path: '/tmp/unix.sock'}, function(client) { 
  3.     client.on('message', (data) => { 
  4.         console.log('receive', data); 
  5.         // setTimeout測試超時場景 
  6.         //setTimeout(() => { 
  7.             client.send(packet('world', data.seq)); 
  8.         // }, 2000) 
  9.     }); 
  10.     client.on('end', (data) => { 
  11.         client.end(); 
  12.     }); 
  13. }); 

client.js

  1. const { Client, packet, seq, RequestManager } = require('../../'); 
  2. const requestManager = new RequestManager({timeout: 3000}); 
  3. const client = new Client({port: 80, path: '/tmp/unix.sock'}); 
  4. const _seq = seq();  
  5. // 保存seq對應的上下文 
  6. requestManager.set(_seq, { 
  7.   cb: function() { 
  8.     console.log(...arguments); 
  9.   } 
  10. }) 
  11. // 發送數據包 
  12. client.send(packet('hello', _seq)); 
  13. client.on('message'function(packet) { 
  14.   // 根據響應的seq執行對應的上下文 
  15.   requestManager.exec(packet.seq, packet); 
  16. }) 

 

責任編輯:武曉燕 來源: 編程雜技
相關推薦

2020-10-19 10:01:12

Nodejs線程池設計

2022-12-28 08:31:38

平臺設計應用

2015-11-03 09:28:52

Hybrid技術設計實現

2015-06-30 11:05:11

flexibleWebAPP設計

2022-09-12 07:17:20

redis命令redissynce

2023-05-26 08:24:17

短信渠道模型

2022-09-14 09:37:22

數據系統

2022-05-03 21:18:38

Vue.js組件KeepAlive

2020-07-19 10:26:47

Kubernetes數據結構

2025-03-20 09:54:47

2011-04-21 15:22:27

ArcGIS Engi

2021-11-24 08:55:38

代理網關Netty

2022-10-18 08:28:38

運營活動實現邏輯整體協作

2025-02-25 09:29:34

2017-10-25 14:41:19

UPS遠程監控電源

2023-07-07 10:41:00

Javajar文件

2022-05-31 08:04:30

前端設計模式

2022-07-12 06:05:27

NutUI折疊面板組件開發

2025-05-22 08:15:00

2022-04-25 07:36:21

組件數據函數
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲精品视频在线观看免费 | 中文字幕在线观看av | 国产精品1区2区 | 亚洲高清在线观看 | 久久综合伊人一区二区三 | 一区观看| 午夜精品一区二区三区免费视频 | 欧美bondage紧缚视频 | 免费在线看黄 | 视频三区| 午夜影院视频在线观看 | 男女免费观看在线爽爽爽视频 | av不卡一区 | 国产96色在线 | 亚洲欧美综合精品久久成人 | 99国产精品99久久久久久 | 一区二区三区精品在线视频 | 欧美一区二区三区在线播放 | 99久久精品国产麻豆演员表 | 精品久久久久久久久久久院品网 | 午夜免费在线 | 国产99热| 日韩中文字幕视频在线 | 精品亚洲永久免费精品 | www性色 | 日韩一区在线播放 | 九九视频在线观看 | av黄色国产| 在线观看成人av | 亚洲精品电影在线 | 成人夜晚看av | av一二三区| 99爱在线视频 | 午夜影院在线观看版 | 狠狠亚洲 | 久久久久久国产精品免费免费狐狸 | 日韩电影免费在线观看中文字幕 | 国产激情一区二区三区 | 欧美日韩国产一区二区三区 | 久久精品一区 | wwwxxx国产|