Node.js Stream 背壓 — 消費端數據積壓來不及處理會怎么樣?
Stream 在 Node.js 中是一個被廣泛應用的模塊,流的兩端可讀流、可寫流之間通過管道鏈接,通常寫入磁盤速度是低于讀取磁盤速度的,這樣管道的兩端就會產生壓力差,就需要一種平衡的機制,使得平滑順暢的從一個端流向另一個端。
背壓是一個術語,表示向流中寫入數據的速度超過了它所能處理的最大能力限制。例如,基于 Stream 寫一個文件時,當寫入端處理不過來時,會通知到讀取端,你可以先等等,我這里忙不過來了...,等到一定時機后再次讀取寫入。
問題來源
“數據是以流的形式從可讀流流向可寫流的,不會全部讀入內存,我想說的是上游流速過快下游來不及消費造成數據積壓 即“背壓” 問題會怎樣” 這個問題來自于「Nodejs技術棧-交流群」一位朋友的疑問,當時沒有給出答案,沒有做過類似的實際數據測試,出現這種情況一般都會導致數據流兩端不平衡,另一端數據不斷積壓,持續消耗系統內存,其它服務也必然受到影響。
本文,通過修改編譯 Node.js 源碼,在禁用掉 “背壓” 之后,做了一些測試,可以明顯看到兩者之間的一個效果對比。
流數據讀取->寫入示例
先構造一個大文件,我在本地創建了一個 2.2GB 大小的文件,通過大文件能夠顯著看到處理積壓與不處理積壓之間的差別。
下面例子實現的功能是讀取文件、經過 gzip 壓縮處理之后寫入到一個新的目標文件,也可寫成 readable.pipe(gzip).pipe(writable) 不過這樣沒有任何的錯誤處理機制,可借助一些工具 https://github.com/mafintosh/pump 處理。
對于處理這樣的任務,Stream 模塊還提供了一個實用的方法 pipeline,管道中可以處理不同的數據流,當其中某個數據流發生錯誤,它會自動處理并釋放掉相應的資源。
- // stream-back-pressure-test.js
- const gzip = require('zlib').createGzip();
- const fs = require('fs');
- const { pipeline } = require('stream/promises');
- const readable = fs.createReadStream('2.2GB-file.zip');
- const writable = fs.createWriteStream('2.2GB-file.zip.gz');
- (async () => {
- try {
- await pipeline(
- readable,
- gzip,
- writable
- );
- console.log('Pipeline succeeded.');
- } catch (err) {
- console.error('Pipeline failed.', err);
- }
- })();
write() 源碼修改與編譯
write(chunk) 方法介紹
可寫流對象的 write(chunk) 方法接收一些數據寫入流,當內部緩沖區小于創建可寫流對象時配置的 highWaterMark 則返回 true,否則返回 false 表示內部緩沖區已滿或溢出,此時就是背壓的一種表現。
向流寫入數據的速度已超出了其能處理的能力,若此時還是不斷調用 write() 方法,可以想象內部的緩沖區也會不斷增加,當前進程占用的系統內存就會不斷增加。
當使用 pipe() 或 pipeline 在內部處理時,還是調用的 stream.write(chunk) 方法。
- stream.write(chunk)
如果要測試數據積壓帶來的一些消耗問題,我們需要修改 Node.js 源碼,將 stream.write(chunk) 方法的返回值改為 true 禁止積壓處理。
源碼修改
我直接拉取的 Master 代碼,剛開始忘記切換 Node.js 版本...,各版本大同小異,大致差不多,主要是找到 Writable.prototype.write() 方法,該方法最終的返回值是一個布爾值,找到 return ret && !state.errored && !state.destroyed 直接改為 return true; 禁用掉背壓處理。
- // https://github.com/nodejs/node/blob/master/lib/internal/streams/writable.js#L334
- Writable.prototype.write = function(chunk, encoding, cb) {
- return _write(this, chunk, encoding, cb) === true;
- };
- // https://github.com/nodejs/node/blob/master/lib/internal/streams/writable.js#L396
- // If we're already writing something, then just put this
- // in the queue, and wait our turn. Otherwise, call _write
- // If we return false, then we need a drain event, so set that flag.
- function writeOrBuffer(stream, state, chunk, encoding, callback) {
- ...
- // stream._write resets state.length
- const ret = state.length < state.highWaterMark;
- ...
- // Return false if errored or destroyed in order to break
- // any synchronous while(stream.write(data)) loops.
- // return ret && !state.errored && !state.destroyed;
- return true;
- }
編譯
源碼編譯對電腦的環境有一些要求,參考 Node.js 給出的這份文檔 Building Node.js。
先執行 ./configure 生成當前環境編譯需要的默認配置,然后執行 make 命令編譯,第一次編譯時間有點略長,差不多夠吃個飯了...
- $ ./configure
- $ make -j4
之后每次修改后也還需要重新編譯,為了方便起見,在當前目錄下創建一個 shell 腳本文件。
- 創建腳本文件 vim compile.sh 輸入以下內容。
- 使腳本具有可執行權限 chmod +x ./test.sh。
- 運行腳本編譯 sh compile.sh。
- #!/bin/bash
- ./configure --debug
- make -j4
- echo "Compiled successfully"
編譯成功后,最后幾行日志輸出如下所示,當前目錄下會生成一個 node 的可執行命令,或者 out/Release/node 也可執行。
- if [ ! -r node ] || [ ! -L node ]; then \
- ln -fs out/Release/node node; fi
現在可以在當前目錄下創建一個測試文件,用剛剛編譯好的 node 運行。
- ./node ./test.js
內存消耗測試
再推薦一個 Linux 命令 /usr/bin/time,能夠測量命令的使用時間并給出系統資源的消耗情況。可以參考這篇文章介紹 http://c.biancheng.net/linux/time.html。
沒有處理積壓的測試結果
運行命令 sudo /usr/bin/time -lp ./node ./stream-back-pressure-test.js 測試沒有積壓處理的情況。
980713472 是執行程序所占用內存的最大值,大約消耗 0.9GB。
- real 188.25
- user 179.72
- sys 28.77
- 980713472 maximum resident set size
- 0 average shared memory size
- 0 average unshared data size
- 0 average unshared stack size
- 3348430 page reclaims
- 3864 page faults
- 0 swaps
- 0 block input operations
- 3 block output operations
- 0 messages sent
- 0 messages received
- 0 signals received
- 21341 voluntary context switches
- 2934500 involuntary context switches
如果是 Mac 電腦,同時打開活動監視器也能看到程序處理過程中的一些內存消耗信息,可以看到內存的占用還是很高的,另外我的電腦上的其它服務也受到了影響,一些應用變得異??D。
image.png
正常程序積壓處理的測試結果
59215872 是執行程序所占用內存的最大值,大約消耗 56 MB。
- real 184.67
- user 176.22
- sys 20.68
- 59215872 maximum resident set size
- 0 average shared memory size
- 0 average unshared data size
- 0 average unshared stack size
- 1486628 page reclaims
- 3971 page faults
- 0 swaps
- 0 block input operations
- 0 block output operations
- 0 messages sent
- 0 messages received
- 1 signals received
- 4843 voluntary context switches
- 2551476 involuntary context switches
通過 Mac 活動監視器看到內存的占用,是沒什么壓力的,電腦上其它服務此時也沒受到影響。
為什么背壓我沒聽說過?
經過上面的測試,可以看到沒有正確處理積壓的結果和正常的經過處理的存在極大的差別,但是你可能又有疑問:“為什么我沒有聽說過背壓?也沒遇到過類似問題?”。
這是因為 Node.js 的 Stream 模塊提供的一些方法 pipe()、pipeline() 已經為我們做了這些處理,使用了這些 API 方法我們是不需要自己考慮去處理 “背壓” 這一問題的**。因為一旦緩沖區中的數據超過了 highWaterMark 限制,可寫流的 write() 方法就會返回 false,處理數據積壓的這一機制也會被觸發。
如果你直接使用的 write() 方法寫入數據,而沒有正確的處理背壓,就要小心了,如果有攻擊者多次發起請求,也會導致你的進程不斷的消耗服務器系統內存,從而會拖垮服務器上的其它應用。
總結
可寫流在消費數據時,內部有一個緩沖區,一旦緩沖區的數據滿了之后,也沒做任何 “背壓” 處理,會導致緩沖區數據溢出,后面來不及消費的數據不得不駐留在內存中,直到程序處理完畢,才會被清除。整個數據積壓的過程中當前進程會不斷的消耗系統內存,對其它進程任務也會產生很大的影響。
最后,留一個問題:“如何用 Node.js 實現從可讀流到可寫流的數據復制?類似于 pipe()”,實現過程要考慮 “背壓” 處理,最好是基于 Promise 方便之后使用 Async/Await 來使用,做一點提示可以考慮結合異步迭代器實現,歡迎在留言討論,下一節揭曉這個問題。
本文轉載自微信公眾號「Nodejs技術?!?,可以通過以下二維碼關注。轉載本文請聯系Nodejs技術棧公眾號。