成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

徹底掌握 Node.js 四大流,解決爆緩沖區的“背壓”問題

開發 前端
各種語言基本都實現了 stream 的 api,Node.js 也是,stream api 是比較常用的,下面我們就來探究一下 stream。

[[419477]]

把一個東西從 A 搬到 B 該怎么搬呢?

抬起來,移動到目的地,放下不就行了么。

那如果這個東西有一噸重呢?

那就一部分一部分的搬。

其實 IO 也就是搬東西,包括網絡的 IO、文件的 IO,如果數據量少,那么直接傳送全部內容就行了,但如果內容特別多,一次性加載到內存會崩潰,而且速度也慢,這時候就可以一部分一部分的處理,這就是流的思想。

各種語言基本都實現了 stream 的 api,Node.js 也是,stream api 是比較常用的,下面我們就來探究一下 stream。

本文會回答以下問題:

  • Node.js 的 4 種 stream 是什么
  • 生成器如何與 Readable Stream 結合
  • stream 的暫停和流動
  • 什么是背壓問題,如何解決

Node.js 的 4種 stream

流的直觀感受

從一個地方流到另一個地方,顯然有流出的一方和流入的一方,流出的一方就是可讀流(readable),而流入的一方就是可寫流(writable)。

當然,也有的流既可以流入又可以流出,這種叫做雙工流(duplex)

既然可以流入又可以流出,那么是不是可以對流入的內容做下轉換再流出呢,這種流叫做轉換流(transform)

duplex 流的流入和流出內容不需要相關,而 transform 流的流入和流出是相關的,這是兩者的區別。

流的 api

Node.js 提供的 stream 就是上面介紹的那 4 種:

  1. const stream = require('stream'); 
  2.  
  3. // 可讀流 
  4. const Readable = stream.Readable; 
  5. // 可寫流 
  6. const Writable = stream.Writable; 
  7. // 雙工流 
  8. const Duplex = stream.Duplex; 
  9. // 轉換流 
  10. const Transform = stream.Transform; 

它們都有要實現的方法:

  • Readable 需要實現 _read 方法來返回內容
  • Writable 需要實現 _write 方法來接受內容
  • Duplex 需要實現 _read 和 _write 方法來接受和返回內容
  • Transform 需要實現 _transform 方法來把接受的內容轉換之后返回

我們分別來看一下:

Readable

Readable 要實現 _read 方法,通過 push 返回具體的數據。

  1. const Stream = require('stream'); 
  2.  
  3. const readableStream = Stream.Readable(); 
  4.  
  5. readableStream._read = function() { 
  6.     this.push('阿門阿前一棵葡萄樹,'); 
  7.     this.push('阿東阿東綠的剛發芽,'); 
  8.     this.push('阿東背著那重重的的殼呀,'); 
  9.     this.push('一步一步地往上爬。'
  10.     this.push(null); 
  11.  
  12. readableStream.on('data', (data)=> { 
  13.     console.log(data.toString()) 
  14. }); 
  15.  
  16. readableStream.on('end', () => { 
  17.     console.log('done~'); 
  18. }); 

當 push 一個 null 時,就代表結束流。

執行效果如下:

創建 Readable 也可以通過繼承的方式:

  1. const Stream = require('stream'); 
  2.  
  3. class ReadableDong extends Stream.Readable { 
  4.  
  5.     constructor() { 
  6.         super(); 
  7.     } 
  8.  
  9.     _read() { 
  10.         this.push('阿門阿前一棵葡萄樹,'); 
  11.         this.push('阿東阿東綠的剛發芽,'); 
  12.         this.push('阿東背著那重重的的殼呀,'); 
  13.         this.push('一步一步地往上爬。'
  14.         this.push(null); 
  15.     } 
  16.  
  17.  
  18. const readableStream = new ReadableDong(); 
  19.  
  20. readableStream.on('data', (data)=> { 
  21.     console.log(data.toString()) 
  22. }); 
  23.  
  24. readableStream.on('end', () => { 
  25.     console.log('done~'); 
  26. }); 

可讀流是生成內容的,那么很自然可以和生成器結合:

  1. const Stream = require('stream'); 
  2.  
  3. class ReadableDong extends Stream.Readable { 
  4.  
  5.     constructor(iterator) { 
  6.         super(); 
  7.         this.iterator = iterator; 
  8.     } 
  9.  
  10.     _read() { 
  11.         const next = this.iterator.next(); 
  12.         if(next.done) { 
  13.             return this.push(null); 
  14.         } else { 
  15.             this.push(next.value) 
  16.         } 
  17.     } 
  18.  
  19.  
  20. function *songGenerator() { 
  21.     yield '阿門阿前一棵葡萄樹,'
  22.     yield '阿東阿東綠的剛發芽,'
  23.     yield '阿東背著那重重的的殼呀,'
  24.     yield '一步一步地往上爬。'
  25.  
  26. const songIterator = songGenerator(); 
  27.  
  28. const readableStream = new ReadableDong(songIterator); 
  29.  
  30. readableStream.on('data', (data)=> { 
  31.     console.log(data.toString()) 
  32. }); 
  33.  
  34. readableStream.on('end', () => { 
  35.     console.log('done~'); 
  36. }); 

這就是可讀流,通過實現 _read 方法來返回內容。

Writable

Writable 要實現 _write 方法,接收寫入的內容。

  1. const Stream = require('stream'); 
  2.  
  3. const writableStream = Stream.Writable(); 
  4.  
  5. writableStream._write = function (data, enc, next) { 
  6.    console.log(data.toString()); 
  7.    // 每秒寫一次 
  8.    setTimeout(() => { 
  9.        next(); 
  10.    }, 1000); 
  11.  
  12. writableStream.on('finish', () => console.log('done~')); 
  13.  
  14. writableStream.write('阿門阿前一棵葡萄樹,'); 
  15. writableStream.write('阿東阿東綠的剛發芽,'); 
  16. writableStream.write('阿東背著那重重的的殼呀,'); 
  17. writableStream.write('一步一步地往上爬。'); 
  18. writableStream.end(); 

接收寫入的內容,打印出來,并且調用 next 來處理下一個寫入的內容,這里調用 next 是異步的,可以控制頻率。

跑了一下,確實可以正常的處理寫入的內容:

這就是可寫流,通過實現 _write 方法來處理寫入的內容。

Duplex

Duplex 是可讀可寫,同時實現 _read 和 _write 就可以了

  1. const Stream = require('stream'); 
  2.  
  3. var duplexStream = Stream.Duplex(); 
  4.  
  5. duplexStream._read = function () { 
  6.     this.push('阿門阿前一棵葡萄樹,'); 
  7.     this.push('阿東阿東綠的剛發芽,'); 
  8.     this.push('阿東背著那重重的的殼呀,'); 
  9.     this.push('一步一步地往上爬。'
  10.     this.push(null); 
  11.  
  12. duplexStream._write = function (data, enc, next) { 
  13.     console.log(data.toString()); 
  14.     next(); 
  15.  
  16. duplexStream.on('data', data => console.log(data.toString())); 
  17. duplexStream.on('end', data => console.log('read done~')); 
  18.  
  19. duplexStream.write('阿門阿前一棵葡萄樹,'); 
  20. duplexStream.write('阿東阿東綠的剛發芽,'); 
  21. duplexStream.write('阿東背著那重重的的殼呀,'); 
  22. duplexStream.write('一步一步地往上爬。'); 
  23. duplexStream.end(); 
  24.  
  25. duplexStream.on('finish', data => console.log('write done~')); 

整合了 Readable 流和 Writable 流的功能,這就是雙工流 Duplex。

Transform

Duplex 流雖然可讀可寫,但是兩者之間沒啥關聯,而有的時候需要對流入的內容做轉換之后流出,這時候就需要轉換流 Transform。

Transform 流要實現 _transform 的 api,我們實現下對內容做反轉的轉換流:

  1. const Stream = require('stream'); 
  2.  
  3. class TransformReverse extends Stream.Transform { 
  4.  
  5.   constructor() { 
  6.     super() 
  7.   } 
  8.  
  9.   _transform(buf, enc, next) { 
  10.     const res = buf.toString().split('').reverse().join(''); 
  11.     this.push(res) 
  12.     next() 
  13.   } 
  14.  
  15. var transformStream = new TransformReverse(); 
  16.  
  17. transformStream.on('data', data => console.log(data.toString())) 
  18. transformStream.on('end', data => console.log('read done~')); 
  19.  
  20. transformStream.write('阿門阿前一棵葡萄樹'); 
  21. transformStream.write('阿東阿東綠的剛發芽'); 
  22. transformStream.write('阿東背著那重重的的殼呀'); 
  23. transformStream.write('一步一步地往上爬'); 
  24. transformStream.end() 
  25.  
  26. transformStream.on('finish', data => console.log('write done~')); 

跑了一下,效果如下:

流的暫停和流動

我們從 Readable 流中獲取內容,然后流入 Writable 流,兩邊分別做 _read 和 _write 的實現,就實現了流動。

背壓

但是 read 和 write 都是異步的,如果兩者速率不一致呢?

如果 Readable 讀入數據的速率大于 Writable 寫入速度的速率,這樣就會積累一些數據在緩沖區,如果緩沖的數據過多,就會爆掉,會丟失數據。

而如果 Readable 讀入數據的速率小于 Writable 寫入速度的速率呢?那沒關系,最多就是中間有段空閑時期。

這種讀入速率大于寫入速率的現象叫做“背壓”,或者“負壓”。也很好理解,寫入段壓力比較大,寫不進去了,會爆緩沖區,導致數據丟失。

這個緩沖區大小可以通過 readableHighWaterMark 和 writableHightWaterMark 來查看,是 16k。

解決背壓

怎么解決這種讀寫速率不一致的問題呢?

當沒寫完的時候,暫停讀就行了。這樣就不會讀入的數據越來越多,駐留在緩沖區。

readable stream 有個 readableFlowing 的屬性,代表是否自動讀入數據,默認為 true,也就是自動讀入數據,然后監聽 data 事件就可以拿到了。

當 readableFlowing 設置為 false 就不會自動讀了,需要手動通過 read 來讀入。

  1. readableStream.readableFlowing = false
  2.  
  3. let data; 
  4. while((data = readableStream.read()) != null) { 
  5.     console.log(data.toString()); 

但自己手動 read 比較麻煩,我們依然可以用自動流入的方式,調用 pause 和 resume 來暫停和恢復就行了。

當調用 writable stream 的 write 方法的時候會返回一個 boolean 值代表是寫入了目標還是放在了緩沖區:

  • true: 數據已經寫入目標
  • false:目標不可寫入,暫時放在緩沖區

我們可以判斷返回 false 的時候就 pause,然后等緩沖區清空了就 resume:

  1. const rs = fs.createReadStream(src); 
  2. const ws = fs.createWriteStream(dst); 
  3.  
  4. rs.on('data'function (chunk) { 
  5.     if (ws.write(chunk) === false) { 
  6.         rs.pause(); 
  7.     } 
  8. }); 
  9.  
  10. rs.on('end'function () { 
  11.     ws.end(); 
  12. }); 
  13.  
  14. ws.on('drain'function () { 
  15.     rs.resume(); 
  16. }); 

這樣就能達到根據寫入速率暫停和恢復讀入速率的功能,解決了背壓問題。

pipe 有背壓問題么?

平時我們經常會用 pipe 來直接把 Readable 流對接到 Writable 流,但是好像也沒遇到過背壓問題,其實是 pipe 內部已經做了讀入速率的動態調節了。

  1. const rs = fs.createReadStream(src); 
  2. const ws = fs.createWriteStream(dst); 
  3.  
  4. rs.pipe(ws); 

總結

流是傳輸數據時常見的思想,就是一部分一部分的傳輸內容,是文件讀寫、網絡通信的基礎概念。

Node.js 也提供了 stream 的 api,包括 Readable 可讀流、Writable 可寫流、Duplex 雙工流、Transform 轉換流。它們分別實現 _read、_write、_read + _write、_transform 方法,來做數據的返回和處理。

創建 Readable 對象既可以直接調用 Readable api 創建,然后重寫 _read 方法,也可以繼承 Readable 實現一個子類,之后實例化。其他流同理。(Readable 可以很容易的和 generator 結合)

當讀入的速率大于寫入速率的時候就會出現“背壓”現象,會爆緩沖區導致數據丟失,解決的方式是根據 write 的速率來動態 pause 和 resume 可讀流的速率。pipe 就沒有這個問題,因為內部做了處理。

流是掌握 IO 繞不過去的一個概念,而背壓問題也是流很常見的問題,遇到了數據丟失可以考慮是否發生了背壓。希望這篇文章能夠幫大家理清思路,真正掌握 stream!

 

責任編輯:武曉燕 來源: 神光的編程秘籍
相關推薦

2020-11-02 11:40:24

Node.jsRequire前端

2020-05-29 15:33:28

Node.js框架JavaScript

2011-12-14 16:30:42

javanio

2019-02-27 13:58:29

漏洞緩沖區溢出系統安全

2017-01-09 17:03:34

2009-06-16 15:33:13

AJAX框架jQueryExt JS

2021-08-10 07:27:42

數據積壓Node

2022-08-16 12:03:40

網絡安全硬件的安全

2009-09-24 18:16:40

2022-01-02 06:55:08

Node.js ObjectWrapAddon

2017-07-04 17:09:10

Map環形緩沖區數據

2014-07-30 11:21:46

2018-01-26 14:52:43

2009-11-16 17:26:17

Oracle優化緩沖區

2009-11-16 17:08:59

Oracle日志緩沖區

2018-11-07 13:00:30

機器學習深度學習集成學習

2020-04-20 16:00:05

Node.js框架JavaScript

2010-05-14 11:38:24

虛擬機備份

2020-05-17 20:38:40

機器物聯網物聯網IOT

2011-07-20 10:54:14

C++
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 天堂久久av | 亚洲精品电影网在线观看 | 久久狠狠 | 日韩精品一区二区三区 | 成人亚洲视频 | 国产91久久久久蜜臀青青天草二 | 91正在播放 | 久久久久久久久久久久91 | 九九伊人sl水蜜桃色推荐 | 国产精品7777777 | 蜜桃视频在线观看免费视频网站www | 亚洲精品高清视频 | 欧美午夜一区二区三区免费大片 | 亚洲高清av | 国产精品性做久久久久久 | 亚洲成a| 国产精品久久久久久久久久了 | 福利视频二区 | 日韩国产中文字幕 | 日韩精品a在线观看图片 | 亚洲 成人 在线 | 91av视频在线观看 | 成人国产精品久久久 | 一区二区三区四区国产精品 | 韩国主播午夜大尺度福利 | 国产成人99久久亚洲综合精品 | 欧州一区二区 | 久久久久国产一级毛片高清网站 | 欧美群妇大交群中文字幕 | 精品在线一区 | 日日干天天干 | 免费看啪啪网站 | 欧美国产大片 | 精品国产乱码久久久久久丨区2区 | 日产精品久久久一区二区福利 | 免费v片在线观看 | 国产伦一区二区三区四区 | 中文字幕在线观看一区 | 久久久精品网站 | 欧美一级免费黄色片 | aaaa一级毛片 |