注釋掉 on('data') 請求為什么一直掛著?— 了解 Node.js Stream 的兩種模式
這是來自「Nodejs技術棧」交流群一位讀者朋友提的一個問題,“如果注釋掉 req.on('data') 事件監聽,end 事件就收不到了,進而永遠也不會執行 res.end(),請求會被一直掛著,為什么?”。
如果你讀到這里,也可以先思考下這個問題!
- const http = require('http');
- http.createServer((req, res) => {
- let data = '';
- // req.on('data', chunk => {
- // data += chunk.toString();
- // });
- req.on('end', () => {
- res.end(data);
- });
- }).listen(3000);
Node.js 的可讀流對象提供了兩種模式:流動模式(flowing)、暫停模式(paused),如果你使用管道 pipe() 或異步迭代可能不會關注到這個問題,在它們的內部實現中已經處理好了,如果你是基于事件的 API 來處理流,可能會有這些疑問。
流動模式(flowing)
流動模式下數據自動從底層系統獲取,并通過 EventEmitter 提供的事件接口,盡可能快的提供給應用程序。需要注意的是所有的可讀流一開始都處于暫停模式,要切換為流動模式,可通過以下幾種方式實現:
一:注冊 'data' 事件
為可讀流對象注冊一個 'data' 事件,傳入事件處理函數,會把流切換為流動模式,在數據可用時會立即把數據塊傳送給注冊的事件處理函數。
這也是上面的疑問,為什么注釋掉 'data' 事件,請求就會一直被掛起。
- req.on('data', chunk => {
- data += chunk.toString();
- });
二:stream.pipe() 方法
調用 pipe() 方法將數據發送到可寫流。
- readable.pipe(writeable)
可讀流的 pipe() 方法實現中也是注冊了 'data' 事件,一邊讀取數據一邊寫入數據至可寫流。可以參見筆者之前的這篇文章 Node.js Stream 模塊 pipe 方法使用與實現原理分析。
- Readable.prototype.pipe = function(dest, options) {
- const src = this;
- src.on('data', ondata);
- function ondata(chunk) {
- const ret = dest.write(chunk);
- if (ret === false) {
- ...
- src.pause();
- }
- }
- ...
- };
三:stream.resume() 方法
stream.resume() 將處于暫停模式的可讀流,恢復觸發 'data' 事件,切換為流動模式。
對一開始的示例做一個改造,先調用 stream.resume() 用來耗盡流中的數據,但此時沒有做任何的數據處理,之后會收到 end 事件。
- const http = require('http');
- http.createServer((req, res) => {
- req.resume();
- req.on('end', () => {
- res.end('Ok!');
- });
- }).listen(3000);
四:異步迭代
無需注冊事件監聽函數,使用 for...await of 遍歷可讀流,寫法上也很簡單。下例,因為用到**頂級 await 特性,**需要在 ES Modules 規范中使用。
- // app.mjs
- import { createServer as server } from 'http';
- import { on } from 'events';
- const ee = on(server().listen(3000), 'request');
- for await (const [{ url }, res] of ee) {
- res.end('OK!');
- }
暫停模式
暫停模式也是流一開始時所處的模式,該模式下會觸發 'readable' 事件,表示流中有可讀取的數據,我們需要不斷調用 read() 方法拉取數據,直到返回 null,表示緩沖區中的數據已被耗盡,在 read() 返回 null 后,會再次觸發 'readable' 事件,表示仍有可讀取的數據,如果此時停止 read() 方法調用,同樣的請求也會被掛起。
stream.read(size) 方法從流緩沖區拉取數據,每次返回指定 size 大小的數據,如果不指定 size 則返回內部所有緩沖的數據。
- const http = require('http');
- http.createServer((req, res) => {
- let data = '';
- let chunk;
- req.on('readable', () => {
- while (null !== (chunk = req.read())) {
- data += chunk.toString();
- }
- })
- req.on('end', () => {
- res.end(data);
- });
- }).listen(3000);
背壓問題思考??
以流的形式從可讀流拉取數據到可寫流,通常**從磁盤讀取數據的速度比磁盤寫入的速度是快的,如果可寫流來不及消費數據造成數據積壓(專業術語會稱呼這個問題為 “背壓”)會怎么樣?**也是來自「Nodejs技術棧」交流群讀者朋友的疑問,可以思考下,答案可以寫在評論區,感興趣的關注下「Nodejs技術棧」下一次講解。
總結
流剛開始處于暫停模式,所以注釋掉 req.on('data') 事件監聽,請求才會一直掛起。在基于流的方式讀取文件時,之前通常使用注冊 'data' 事件處理函數的方式從可讀流中拉取數據,現在 Node.js 支持了異步迭代,更推薦你使用 for...await of 這種方式來讀取數據,代碼看起來也會更簡潔,同步編碼思維讓人也能更好的理解。
本文轉載自微信公眾號「Nodejs技術棧」,可以通過以下二維碼關注。轉載本文請聯系Nodejs技術棧公眾號。