Node.js中的stream模塊詳解
什么是stream
定義
流的英文stream,流(Stream)是一個抽象的數據接口,Node.js中很多對象都實現了流,流是EventEmitter對象的一個實例,總之它是會冒數據(以 Buffer 為單位),或者能夠吸收數據的東西,它的本質就是讓數據流動起來。可能看一張圖會更直觀:
水桶管道流轉圖
注意:stream不是node.js獨有的概念,而是一個操作系統最基本的操作方式,只不過node.js有API支持這種操作方式。linux命令的|就是stream。
為什么要學習stream
視頻播放例子
小伙伴們肯定都在線看過電影,對比定義中的圖-水桶管道流轉圖,source就是服務器端的視頻,dest就是你自己的播放器(或者瀏覽器中的flash和h5 video)。大家想一下,看電影的方式就如同上面的圖管道換水一樣,一點點從服務端將視頻流動到本地播放器,一邊流動一邊播放,最后流動完了也就播放完了。
說明:視頻播放的這個例子,如果我們不使用管道和流動的方式,直接先從服務端加載完視頻文件,然后再播放。會造成很多問題
- 因內存占有太多而導致系統卡頓或者崩潰
- 因為我們的網速 內存 cpu運算速度都是有限的,而且還要有多個程序共享使用,一個視頻文件加載完可能有幾個g那么大。
讀取大文件data的例子
有一個這樣的需求,想要讀取大文件data的例子
使用文件讀取
- const http = require('http');
- const fs = require('fs');
- const path = require('path');
- const server = http.createServer(function (req, res) {
- const fileName = path.resolve(__dirname, 'data.txt');
- fs.readFile(fileName, function (err, data) {
- res.end(data);
- });
- });
- server.listen(8000);
使用文件讀取這段代碼語法上并沒有什么問題,但是如果data.txt文件非常大的話,到了幾百M,在響應大量用戶并發請求的時候,程序可能會消耗大量的內存,這樣可能造成用戶連接緩慢的問題。而且并發請求過大的話,服務器內存開銷也會很大。這時候我們來看一下用stream實現。
- const http = require('http');
- const fs = require('fs');
- const path = require('path');
- const server = http.createServer(function (req, res) {
- const fileName = path.resolve(__dirname, 'data.txt');
- let stream = fs.createReadStream(fileName); // 這一行有改動
- stream.pipe(res); // 這一行有改動
- });
- server.listen(8000);
使用stream就可以不需要把文件全部讀取了再返回,而是一邊讀取一邊返回,數據通過管道流動給客戶端,真的減輕了服務器的壓力。
看了兩個例子我想小伙伴們應該知道為什么要使用stream了吧!因為一次性讀取,操作大文件,內存和網絡是吃不消的,因此要讓數據流動起來,一點點的進行操作。
stream流轉過程
再次看這張水桶管道流轉圖
圖中可以看出,stream整個流轉過程包括source,dest,還有連接二者的管道pipe(stream的核心),分別介紹三者來帶領大家搞懂stream流轉過程。
stream從哪里來-soucre
stream的常見來源方式有三種:
- 從控制臺輸入
- http請求中的request
- 讀取文件
這里先說一下從控制臺輸入這種方式,2和3兩種方式stream應用場景章節會有詳細的講解。
看一段process.stdin的代碼
- process.stdin.on('data', function (chunk) {
- console.log('stream by stdin', chunk)
- console.log('stream by stdin', chunk.toString())
- })
- //控制臺輸入koalakoala后輸出結果
- stream by stdin <Buffer 6b 6f 616c 616b 6f 616c 610a>
- stream by stdin koalakoala
運行上面代碼:然后從控制臺輸入任何內容都會被data 事件監聽到,process.stdin就是一個stream對象,data 是stream對象用來監聽數據傳入的一個自定義函數,通過輸出結果可看出process.stdin是一個stream對象。
說明:stream對象可以監聽"data","end","opne","close","error"等事件。node.js中監聽自定義事件使用.on方法,例如process.stdin.on(‘data’,…), req.on(‘data’,…),通過這種方式,能很直觀的監聽到stream數據的傳入和結束
連接水桶的管道-pipe
從水桶管道流轉圖中可以看到,在source和dest之間有一個連接的管道pipe,它的基本語法是source.pipe(dest),source和dest就是通過pipe連接,讓數據從source流向了dest。
stream到哪里去-dest
stream的常見輸出方式有三種:
- 輸出控制臺
- http請求中的response
- 寫入文件
stream應用場景
stream的應用場景主要就是處理IO操作,而http請求和文件操作都屬于IO操作。這里再提一下stream的本質——由于一次性IO操作過大,硬件開銷太多,影響軟件運行效率,因此將IO分批分段進行操作,讓數據像水管一樣流動起來,直到流動完成,也就是操作完成。下面對幾個常用的應用場景分別進行介紹
介紹一個壓力測試的小工具
一個對網絡請求做壓力測試的工具ab,ab 全稱 Apache bench ,是 Apache 自帶的一個工具,因此使用 ab 必須要安裝 Apache 。mac os 系統自帶 Apache ,windows用戶視自己的情況進行安裝。運行ab 之前先啟動 Apache ,mac os 啟動方式是 sudo apachectl start 。
Apache bench對應參數的詳細學習地址,有興趣的可以看一下Apache bench對應參數的詳細學習地址
介紹這個小工具的目的是對下面幾個場景可以進行直觀的測試,看出使用stream帶來了哪些性能的提升。
get請求中應用stream
這樣一個需求:
使用node.js實現一個http請求,讀取data.txt文件,創建一個服務,監聽8000端口,讀取文件后返回給客戶端,講get請求的時候用一個常規文件讀取與其做對比,請看下面的例子。
- 常規使用文件讀取返回給客戶端response例子 ,文件命名為getTest1.js
- // getTest.js
- const http = require('http');
- const fs = require('fs');
- const path = require('path');
- const server = http.createServer(function (req, res) {
- const method = req.method; // 獲取請求方法
- if (method === 'GET') { // get 請求方法判斷
- const fileName = path.resolve(__dirname, 'data.txt');
- fs.readFile(fileName, function (err, data) {
- res.end(data);
- });
- }
- });
- server.listen(8000);
- 使用stream返回給客戶端response 將上面代碼做部分修改,文件命名為getTest2.js
- // getTest2.js
- // 主要展示改動的部分
- const server = http.createServer(function (req, res) {
- const method = req.method; // 獲取請求方法
- if (method === 'GET') { // get 請求
- const fileName = path.resolve(__dirname, 'data.txt');
- let stream = fs.createReadStream(fileName);
- stream.pipe(res); // 將 res 作為 stream 的 dest
- }
- });
- server.listen(8000);
對于下面get請求中使用stream的例子,會不會有些小伙伴提出質疑,難道response也是一個stream對象,是的沒錯,對于那張水桶管道流轉圖,response就是一個dest。
雖然get請求中可以使用stream,但是相比直接file文件讀取·res.end(data)有什么好處呢?這時候我們剛才推薦的壓力測試小工具就用到了。getTest1和getTest2兩段代碼,將data.txt內容增加大一些,使用ab工具進行測試,運行命令ab -n 100 -c 100 http://localhost:8000/,其中-n 100表示先后發送100次請求,-c 100表示一次性發送的請求數目為100個。對比結果分析使用stream后,有非常大的性能提升,小伙伴們可以自己實際操作看一下。
post中使用stream
一個通過post請求微信小程序的地址生成二維碼的需求。
- /*
- * 微信生成二維碼接口
- * params src 微信url / 其他圖片請求鏈接
- * params localFilePath: 本地路徑
- * params data: 微信請求參數
- * */
- const downloadFile=async (src, localFilePath, data)=> {
- try{
- const ws = fs.createWriteStream(localFilePath);
- returnnewPromise((resolve, reject) => {
- ws.on('finish', () => {
- resolve(localFilePath);
- });
- if (data) {
- request({
- method: 'POST',
- uri: src,
- json: true,
- body: data
- }).pipe(ws);
- } else {
- request(src).pipe(ws);
- }
- });
- }catch (e){
- logger.error('wxdownloadFile error: ',e);
- throw e;
- }
- }
看這段使用了stream的代碼,為本地文件對應的路徑創建一個stream對象,然后直接.pipe(ws),將post請求的數據流轉到這個本地文件中,這種stream的應用在node后端開發過程中還是比較常用的。
post與get使用stream總結
request和reponse一樣,都是stream對象,可以使用stream的特性,二者的區別在于,我們再看一下水桶管道流轉圖,
request是source類型,是圖中的源頭,而response是dest類型,是圖中的目的地。
在文件操作中使用stream
一個文件拷貝的例子
- const fs = require('fs')
- const path = require('path')
- // 兩個文件名
- const fileName1 = path.resolve(__dirname, 'data.txt')
- const fileName2 = path.resolve(__dirname, 'data-bak.txt')
- // 讀取文件的 stream 對象
- const readStream = fs.createReadStream(fileName1)
- // 寫入文件的 stream 對象
- const writeStream = fs.createWriteStream(fileName2)
- // 通過 pipe執行拷貝,數據流轉
- readStream.pipe(writeStream)
- // 數據讀取完成監聽,即拷貝完成
- readStream.on('end', function () {
- console.log('拷貝完成')
- })
看了這段代碼,發現是不是拷貝好像很簡單,創建一個可讀數據流readStream,一個可寫數據流writeStream,然后直接通過pipe管道把數據流轉過去。這種使用stream的拷貝相比存文件的讀寫實現拷貝,性能要增加很多,所以小伙伴們在遇到文件操作的需求的時候,盡量先評估一下是否需要使用stream實現。
前端一些打包工具的底層實現
目前一些比較火的前端打包構建工具,都是通過node.js編寫的,打包和構建的過程肯定是文件頻繁操作的過程,離不來stream,例如現在比較火的gulp,有興趣的小伙伴可以去看一下源碼。
stream的種類
- Readable Stream 可讀數據流
- Writeable Stream 可寫數據流
- Duplex Stream 雙向數據流,可以同時讀和寫
- Transform Stream 轉換數據流,可讀可寫,同時可以轉換(處理)數據(不常用)
之前的文章都是圍繞前兩種可讀數據流和可寫數據流,第四種流不太常用,需要的小伙伴網上搜索一下,接下來對第三種數據流Duplex Stream 說明一下。
Duplex Stream 雙向的,既可讀,又可寫。Duplex streams同時實現了 Readable和Writable 接口。Duplex streams的例子包括
- tcp sockets
- zlib streams
- crypto streams我在項目中還未使用過雙工流,一些Duplex Stream的內容可以參考這篇文章NodeJS Stream 雙工流
stream有什么弊端
- 用 rs.pipe(ws) 的方式來寫文件并不是把 rs 的內容 append 到 ws 后面,而是直接用 rs 的內容覆蓋 ws 原有的內容
- 已結束/關閉的流不能重復使用,必須重新創建數據流
- pipe 方法返回的是目標數據流,如 a.pipe(b) 返回的是 b,因此監聽事件的時候請注意你監聽的對象是否正確
- 如果你要監聽多個數據流,同時你又使用了 pipe 方法來串聯數據流的話,你就要寫成:代碼實例:
- data
- .on('end', function() {
- console.log('data end');
- })
- .pipe(a)
- .on('end', function() {
- console.log('a end');
- })
- .pipe(b)
- .on('end', function() {
- console.log('b end');
- });
stream的常見類庫
- event-stream 用起來有函數式編程的感覺
- awesome-nodejs#streams也是一個不錯的第三方stream庫,有興趣的小伙伴可以github看一下
總結
本篇文章屬于進階路線【Node必知必會系列】,看完了這篇文章是不是對stream有了一定的了解,并且知道了node對于文件處理還是有完美的解決方案的。本文中三次展示了水桶管道流轉圖,總要的事情說三遍希望小伙伴們記住它,除了以上內容小伙伴們會不會有一些思考,比如
- stream數據流轉具體內容是什么呢?二進制還是string類型還是其他類型,該類型為stream帶來了什么好處?
- 水桶管道流轉圖中的水管,也就是pipe函數什么時候觸發的呢?在什么情況下觸流轉發?底層機制是什么?上面的疑問(由于篇幅過長拆分為兩篇)會在我stream的第二篇文章為大家詳細講解