成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Websocket庫Ws原理分析

開發 前端
我們看到ws監聽了upgrade事件,當有websocket請求到來時就會執行handleUpgrade處理升級請求,升級成功后觸發connection事件。我們先看handleUpgrade。handleUpgrade邏輯不多,主要是處理和校驗升級請求的一些http頭。

 [[394780]]

前言:本文幾基于nodejs的ws模塊分析websocket的原理。

ws服務器邏輯由websocket-server.js的WebSocketServer類實現。該類初始化了一些參數后就執行以下代碼

  1. if (this._server) { 
  2.       // 給server注冊下面事件,返回一個注銷函數(用于注銷下面注冊的事件) 
  3.       this._removeListeners = addListeners(this._server, { 
  4.         // listen成功的回調 
  5.         listening: this.emit.bind(this, 'listening'), 
  6.         error: this.emit.bind(this, 'error'), 
  7.         // 收到協議升級請求的回調 
  8.         upgrade: (req, socket, head) => { 
  9.           this.handleUpgrade(req, socket, head, (ws) => { 
  10.             // 處理成功,觸發鏈接成功事件 
  11.             this.emit('connection', ws, req); 
  12.           }); 
  13.         } 
  14.       }); 

我們看到ws監聽了upgrade事件,當有websocket請求到來時就會執行handleUpgrade處理升級請求,升級成功后觸發connection事件。我們先看handleUpgrade。handleUpgrade邏輯不多,主要是處理和校驗升級請求的一些http頭。ws提供了一個校驗的鉤子。處理完http頭后,會調verifyClient校驗是否允許升級請求。如果成功則執行completeUpgrade。顧名思義,completeUpgrade是完成升級請求的函數,該函數返回同意協議升級并且設置一些http響應頭。另外還有一些重要的邏輯處理。

  1. const ws = new WebSocket(null); 
  2. // 設置管理socket的數據 
  3. ws.setSocket(socket, head, this.options.maxPayload); 
  4. // cb就是this.emit('connection', ws, req); 
  5. cb(ws); 

我們看到這里新建了一個WebSocket對象并且調用了他的setSocket函數。我們來看看他做了什么。setSocket的邏輯非常多,我們慢慢分析。

數據接收者

  1. class Receiver extends Writable {} 

我們看到數據接收者是一個可寫流。這就意味著我們可以往里面寫數據。

  1. const receiver = new Receiver(); 
  2. receiver.write('hello'); 

我們看一下這時候Receiver的邏輯。

  1. _write(chunk, encoding, cb) { 
  2.     if (this._opcode === 0x08 && this._state == GET_INFO) return cb(); 
  3.     this._bufferedBytes += chunk.length; 
  4.     this._buffers.push(chunk); 
  5.     this.startLoop(cb); 
  6.   } 

首先記錄當前數據的大小,然后把數據存起來,最后執行startLoop。

  1. startLoop(cb) { 
  2.     let err; 
  3.     this._loop = true
  4.  
  5.     do { 
  6.       switch (this._state) { 
  7.         // 忽略其他case 
  8.         case GET_DATA: 
  9.           err = this.getData(cb); 
  10.           break; 
  11.         default
  12.           // `INFLATING` 
  13.           this._loop = false
  14.           return
  15.       } 
  16.     } while (this._loop); 
  17.  
  18.     cb(err); 
  19.   } 

我們知道websocket是基于tcp上層的應用層協議,所以我們收到數據時,需要解析出一個個數據包(粘包問題),所以Receiver其實就是一個狀態機,每次收到數據的時候,都會根據當前的狀態進行狀態流轉。比如當前處于GET_DATA狀態,那么就會進行數據的處理。我們接著看一下數據處理的邏輯。

  1. getData(cb) { 
  2.     let data = EMPTY_BUFFER; 
  3.     // 提取數據部分 
  4.     if (this._payloadLength) { 
  5.       data = this.consume(this._payloadLength); 
  6.       if (this._masked) unmask(data, this._mask); 
  7.     } 
  8.     // 是控制報文則執行controlMessage 
  9.     if (this._opcode > 0x07) return this.controlMessage(data); 
  10.     // 做了壓縮,則先解壓 
  11.     if (this._compressed) { 
  12.       this._state = INFLATING; 
  13.       this.decompress(data, cb); 
  14.       return
  15.     } 
  16.     // 沒有壓縮則直接處理(先存到_fragments,然后執行dataMessage) 
  17.     if (data.length) { 
  18.       this._messageLength = this._totalPayloadLength; 
  19.       this._fragments.push(data); 
  20.     } 
  21.  
  22.     return this.dataMessage(); 
  23.   } 

我們執行websocket協議定義了報文的類型,比如控制報文,數據報文。我們分別看一下這兩個的邏輯。

  1. controlMessage(data) { 
  2.     // 連接關閉 
  3.     if (this._opcode === 0x08) { 
  4.       this._loop = false
  5.       if (data.length === 0) { 
  6.         this.emit('conclude', 1005, ''); 
  7.         this.end(); 
  8.       } 
  9.     } else if (this._opcode === 0x09) { 
  10.       this.emit('ping', data); 
  11.     } else { 
  12.       this.emit('pong', data); 
  13.     } 
  14.     this._state = GET_INFO; 
  15.   } 

我們看到控制報文包括三種(conclude、ping、pong)。而數據報文只有this.emit('message', data);一種。這個就是接收者的整體邏輯。

2 數據發送者

數據發送者是對websocket協議的封裝,當用戶調研數據發送者的send接口發送數據時,數據發送者會組裝成一個websocket協議的包再發送出去。

  1. send(data, options, cb) { 
  2.     const buf = toBuffer(data); 
  3.     const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; 
  4.     let opcode = options.binary ? 2 : 1; 
  5.     let rsv1 = options.compress; 
  6.  
  7.     if (this._firstFragment) { 
  8.       this._firstFragment = false
  9.       if (rsv1 && perMessageDeflate) { 
  10.         rsv1 = buf.length >= perMessageDeflate._threshold; 
  11.       } 
  12.       this._compress = rsv1; 
  13.     } else { 
  14.       rsv1 = false
  15.       opcode = 0; 
  16.     } 
  17.  
  18.     if (options.fin) this._firstFragment = true
  19.     // 需要壓縮 
  20.     if (perMessageDeflate) { 
  21.       const opts = { 
  22.         fin: options.fin, 
  23.         rsv1, 
  24.         opcode, 
  25.         mask: options.mask, 
  26.         readOnly: toBuffer.readOnly 
  27.       }; 
  28.       // 正在壓縮,則排隊等待,否則執行壓縮 
  29.       if (this._deflating) { 
  30.         this.enqueue([this.dispatch, buf, this._compress, opts, cb]); 
  31.       } else { 
  32.         this.dispatch(buf, this._compress, opts, cb); 
  33.       } 
  34.     } else { 
  35.       // 不需要壓縮,直接發送 
  36.       this.sendFrame( 
  37.         Sender.frame(buf, { 
  38.           fin: options.fin, 
  39.           rsv1: false
  40.           opcode, 
  41.           mask: options.mask, 
  42.           readOnly: toBuffer.readOnly 
  43.         }), 
  44.         cb 
  45.       ); 
  46.     } 
  47.   } 

send函數做了一些參數的處理后發送數據,但是如果需要壓縮的話,要壓縮后才能發送。數據處理完成后調用真正的發送函數

  1. sendFrame(list, cb) { 
  2.     if (list.length === 2) { 
  3.       this._socket.cork(); 
  4.       this._socket.write(list[0]); 
  5.       this._socket.write(list[1], cb); 
  6.       this._socket.uncork(); 
  7.     } else { 
  8.       this._socket.write(list[0], cb); 
  9.     } 
  10.   } 

了解了數據接收者和發送者的邏輯后,我們看一下websocket對象和setSocket函數做了什么事情,websocket對象本質是對TCP socket的封裝。它接收來自底層的數據,然后透傳給數據接收者,數據接收者處理完后,觸發websocket對應的對應的事件,比如message事件。發送數據的時候,websocket會調用數據發送者的接口,數據發送者組裝成websocket協議的數據包后再發送出去,架構如下圖所示。

接下來我們看看setSocket的邏輯

  1. setSocket(socket, head, maxPayload) { 
  2.     // 數據接收者,負責處理tcp上收到的數據(socket是tcp層的socket) 
  3.     const receiver = new Receiver(...); 
  4.     // 數據發送者,負責發送數據給對端 
  5.     this._sender = new Sender(socket, this._extensions); 
  6.     // 數據接收者,負責解析數據 
  7.     this._receiver = receiver; 
  8.     // net模塊的tcp socket 
  9.     this._socket = socket; 
  10.     // 關聯起來 
  11.     receiver[kWebSocket] = this; 
  12.     socket[kWebSocket] = this; 
  13.     // 監聽接收者的事件,解析數據的時候會回調 
  14.     receiver.on('conclude', receiverOnConclude); 
  15.     // 下面兩個事件由Writable觸發 
  16.     receiver.on('drain', receiverOnDrain); 
  17.     receiver.on('error', receiverOnError); 
  18.     receiver.on('message', receiverOnMessage); 
  19.     receiver.on('ping', receiverOnPing); 
  20.     receiver.on('pong', receiverOnPong); 
  21.     // 清除定時器 
  22.     socket.setTimeout(0); 
  23.     // 關閉nagle算法 
  24.     socket.setNoDelay(); 
  25.     // 升級請求中,攜帶的http body,通常是空 
  26.     if (head.length > 0) socket.unshift(head); 
  27.     // 監聽tcp底層的事件 
  28.     socket.on('close', socketOnClose); 
  29.     socket.on('data', socketOnData); 
  30.     socket.on('end', socketOnEnd); 
  31.     socket.on('error', socketOnError); 
  32.  
  33.     this.readyState = WebSocket.OPEN
  34.     this.emit('open'); 
  35.   } 

我們看到里面監聽了各種事件,下面以data事件為例,看一下處理過程。當tcp socket收到數據的時候會執行socketOnData函數。

  1. function socketOnData(chunk) { 
  2.   // 會調用receiver里的_write函數,其實就是換成到receiver對象上,如果數據解析出錯,會觸發socket error事件 
  3.   if (!this[kWebSocket]._receiver.write(chunk)) { 
  4.     this.pause(); 
  5.   } 

socketOnData通過接收者的接口把數據傳給接收者,接收者會解析數據,然后觸發對應的事件,比如message。

  1. receiver.on('message', receiverOnMessage); 
  2. function receiverOnMessage(data) { 
  3.   this[kWebSocket].emit('message', data); 

然后ws的socket對象繼續往上層觸發message事件。this[kWebSocket]的值是ws提供的socket對象本身。架構圖如下。

這就是ws實現websocket協議的基本原理,具體細節可以參考源碼。

責任編輯:武曉燕 來源: 編程雜技
相關推薦

2017-07-11 13:58:10

WebSocket

2021-04-21 07:52:39

核心SignalR應用

2010-04-14 14:23:26

2023-01-26 01:41:27

核心全局過濾器

2017-08-17 17:48:06

2024-01-11 08:53:58

2023-06-27 07:09:39

2009-06-14 17:19:09

ibmdwWebSphere

2012-09-18 14:23:54

2012-09-29 13:18:23

分布式數據庫Google Span

2020-10-13 07:35:22

JUC - Count

2023-04-26 08:39:41

Bitmap元素存儲

2023-11-28 08:49:01

短輪詢WebSocket長輪詢

2022-04-13 08:23:31

Golang并發

2021-10-12 17:19:17

Random局限性變量

2021-04-27 18:12:22

WebSocket持久化連接HTTP

2010-04-19 15:29:31

2012-12-03 16:57:37

HDFS

2015-06-15 10:12:36

Java原理分析

2021-08-09 11:15:28

MybatisJavaSpring
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产精品久久久一区二区三区 | 久久福利网站 | 欧美一区二区免费 | 亚洲 日本 欧美 中文幕 | 欧美日韩国产一区二区三区不卡 | 亚洲精品久久久 | 精品美女久久久 | 久久99久久久久 | 色婷婷综合久久久中字幕精品久久 | 国产国语精品 | 国产视频一二三区 | 欧美二区三区 | 久久精品色视频 | 精品久久久久久久久久 | 午夜影视| 欧美国产视频 | 亚洲日产精品 | 精品久久久久久国产 | 日本一卡精品视频免费 | 国产精品久久久99 | 二区三区av | 一级黄色片在线看 | 国产亚洲一区二区三区 | 久久精品欧美一区二区三区不卡 | 国产成人免费视频 | 99精品在线 | 久久国产精品72免费观看 | yiren22 亚洲综合 | 在线一区二区三区 | 色久影院| 91国内精品久久 | 日本在线视频不卡 | 国产传媒在线观看 | 欧美日韩中文字幕在线 | 色爱综合网 | 欧美aⅴ片| av片网站| 欧美亚洲日本 | 日韩五月天 | 午夜国产精品视频 | 国产精品久久久久婷婷二区次 |