成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

探索異步迭代器在 Node.js 中的使用

開發 前端
Node.js v12.16.0 中新增了 events.on(emitter, eventName) 方法,返回一個迭代 eventName 事件的異步迭代器。

[[356203]]

目錄

在 Events 中使用 asyncIterator

  • events.on() 示例 1
  • events.on() 示例 2
  • events.on() 開啟一個 Node.js 服務器
  • 解析 Node.js 源碼對 events.on 異步迭代器的實現

在 Stream 中使用 asyncIterator

  • 異步迭代器 與 Readable
  • 從 Node.js 源碼看 readable 是如何實現的 asyncIterator
  • 異步迭代器與 Writeable

在 MongoDB 中使用 asyncIterator

  • MongoDB 中的 cursor
  • MongoDB 異步迭代器實現源碼分析
  • 使用 for await...of 遍歷可迭代對象 cursor
  • 傳送 cursor 到可寫流

在 Events 中使用 asyncIterator

Node.js v12.16.0 中新增了 events.on(emitter, eventName) 方法,返回一個迭代 eventName 事件的異步迭代器。

events.on() 示例 1

如下例所示, for await...of 循環只會輸出 Hello 當觸發 error 事件時會被 try catch 所捕獲。

  1. const { on, EventEmitter } = require('events'); 
  2.  
  3. (async () => { 
  4.   const ee = new EventEmitter(); 
  5.   const ite = on(ee, 'foo'); 
  6.  
  7.   process.nextTick(() => { 
  8.     ee.emit('foo''Hello'); 
  9.     ee.emit('error', new Error('unknown mistake.')) 
  10.     ee.emit('foo''Node.js'); 
  11.   }); 
  12.  
  13.   try { 
  14.     for await (const event of ite) { 
  15.       console.log(event); // prints ['Hello'
  16.     } 
  17.   } catch (err) { 
  18.     console.log(err.message); // unknown mistake. 
  19.   } 
  20. })(); 

上述示例,如果 EventEmitter 對象實例 ee 觸發了 error 事件,錯誤信息會被拋出并且退出循環,該實例注冊的所有事件偵聽器也會一并移除。

events.on() 示例 2

for await...of 內部塊的執行是同步的,每次只能處理一個事件,即使你接下來還有會立即執行的事件,也是如此。如果是需要并發執行的則不建議使用,這個原因會在下面解析 events.on() 源碼時給出答案。

如下所示,雖然事件是按順序同時觸發了兩次,但是在內部塊模擬了 2s 的延遲,下一次事件的處理也會得到延遲。

  1. const ite = on(ee, 'foo'); 
  2.  
  3. process.nextTick(() => { 
  4.   ee.emit('foo''Hello'); 
  5.   ee.emit('foo''Node.js'); 
  6.   // ite.return(); // 調用后可以結束 for await...of 的遍歷 
  7.   // ite.throw() // 迭代器對象拋出一個錯誤 
  8. }); 
  9.  
  10. try { 
  11.   for await (const event of ite) { 
  12.     console.log(event); // prints ['Hello'] ['Node.js'
  13.     await sleep(2000); 
  14.   } 
  15. } catch (err) { 
  16.   console.log(err.message); 
  17.  
  18. // Unreachable here 
  19. console.log('這里將不會被執行'); 

上例中最后一句代碼是不會執行的,此時的迭代器會一直處于遍歷中,雖然上面兩個事件 emit 都觸發了,但是迭代器并沒有終止,什么時候終止呢?也就是當內部出現一些錯誤或我們手動調用可迭代對象的 return() 或 throw() 方法時迭代器才會終止。

events.on() 開啟一個 Node.js 服務器

之前一篇文章《“Hello Node.js” 這一次是你沒見過的寫法》寫過一段使用 events.on() 開啟一個 HTTP 服務器的代碼,在留言中當時有小伙伴對此提出疑惑,基于本章對異步迭代器在 events.on() 中使用的學習,可以很好的解釋。

相關代碼如下所示:

  1. import { createServer as server } from 'http'
  2. import { on } from 'events'
  3. const ee = on(server().listen(3000), 'request'); 
  4. for await (const [{ url }, res] of ee) 
  5.   if (url === '/hello'
  6.     res.end('Hello Node.js!'); 
  7.   else 
  8.     res.end('OK!'); 

以上代碼看似新穎,其核心實現就是使用 events.on() 返回 createServer() 對象 request 事件的異步可迭代對象,之后用 for await...of 語句遍歷,客戶端每一次請求,就相當于做了一次 ee.emit('request', Req, Res)。

由于內部塊的執行是同步的,下一次事件處理需要依賴上次事件完成才可以執行,對于一個 HTTP 服務器需要考慮并發的,請不要使用上面這種方式!

解析 Node.js 源碼對 events.on 異步迭代器的實現

events 模塊直接導出了 on() 方法,這個 on() 方法主要是將異步迭代器與事件的 EventEmitter 類的實例對象做了結合,實現還是很巧妙的,以下對核心源碼做下解釋,理解之后你完全也可以自己實現一個 events.on()。

  • 行 {1} ObjectSetPrototypeOf 是為對象設置一個新的原型,這個對象包含了 next()、return()、throw() 三個方法。
  • 行 {2} 根據異步可迭代協議,可迭代對象必須要包含一個 Symbol.asyncIterator 屬性,該屬性是一個無參數的函數,返回可迭代對象本身,也就是下面代碼中 SymbolAsyncIterator。
  • 行 {3} 新的原型就是 ObjectSetPrototypeOf 的第二個參數 AsyncIteratorPrototype。
  • 行 {4} eventTargetAgnosticAddListener 是對事件注冊監聽器,里面還是用的事件觸發器對象的 on() 方法 emitter.on(name, listener) 。
  • 行 {5} addErrorHandlerIfEventEmitter 判斷事件名如果不等于 'error' 同時注冊一個 error 事件的監聽器,具體實現同行 {4}。
  • 行 {6} eventHandler() 函數就是上面注冊的監聽器函數 listener 當有事件觸發時執行該監聽器函數,與異步迭代器的結合就在這里,當有新事件觸發時會從 unconsumedPromises 數組里取出第一個元素執行,如果理解異步迭代器實現標準你會發現 PromiseResolve(createIterResult(args, false)) 就是異步迭代器對象 next() 方法返回值的標準定義。

下面繼續看 unconsumedPromises 從何而來。

  1. module.exports = EventEmitter; 
  2. module.exports.on = on
  3.  
  4. function on(emitter, event) { 
  5.   const unconsumedEvents = []; 
  6.   const unconsumedPromises = []; 
  7.   const iterator = ObjectSetPrototypeOf({ // {1} 
  8.     next() { .... }, 
  9.     return() { ... }, 
  10.     throw(err) { ... }, 
  11.     [SymbolAsyncIterator]() { // {2} 
  12.       return this; 
  13.     } 
  14.   }, AsyncIteratorPrototype); // {3} 
  15.   eventTargetAgnosticAddListener(emitter, event, eventHandler); // {4} 
  16.   if (event !== 'error') { 
  17.     addErrorHandlerIfEventEmitter(emitter, errorHandler); // {5} 
  18.   } 
  19.   return iterator; 
  20.                
  21.   function eventHandler(...args) { // {6} 
  22.     const promise =  .shift(); 
  23.     if (promise) { 
  24.       // 以下等價于 promise.resolve({ value: args, done: false }); 
  25.       PromiseResolve(createIterResult(args, false)); 
  26.     } else { 
  27.       // for await...of 遍歷器內部塊的執行是同步的,所以每次只能處理 1 個事件,如果同時觸發多個事件,上次事件未完成剩下的事件會被保存至 unconsumedEvents 中,待上次事件完成后,遍歷器會自動調用 iterator 對象的 next() 方法,消費所有未處理的事件。 
  28.       unconsumedEvents.push(args); 
  29.     } 
  30.   } 
  31.  
  32. function eventTargetAgnosticAddListener(emitter, name, listener, flags) { 
  33.   ... 
  34.   emitter.on(name, listener); 

以下是 iterator 對象的 next() 方法實現:

  • 行 {1} 首先消費未讀消息
  • 行 {2} 判斷如果是發生錯誤則拋出錯誤信息,例如 iterator 對象的 throw() 方法被調用后就會對 error 做賦值待下次遍歷器調用 next() 此處代碼就會被執行。
  • 行 {3} 如果迭代器對象完成,返回的 Promise 對象 done 屬性設置為 true,遍歷器也就結束了,變量 finished 是由 iterator 對象的 return() 方法被調用之后設置的。
  • 行 {4} 這個是上面提到的 unconsumedPromises 數據來源處,例如當我們執行 for await...of 語句遍歷異步迭代器對象時就會自動觸發 iterator 對象的 next() 方法,執行到行 {4} 處會創建一個 Promise 對象但是 resolve 并沒有被立即執行,而是先存放在 unconsumedPromises 數組中,所以在上面 #events.on() 示例 2# 提到一個問題,for await...of 遍歷事件的異步迭代器對象時后面的代碼塊并不會被執行, 當我們觸發一個事件時才會在監聽器函數里執行這個 resolve 函數,此時才會被釋放,之后 for await...of 遍歷器會自動再次執行 next() 方法,然后 new 一個新的 Promise 反復循環,直到事件對象拋出 error 事件或執行 iterator 對象的 return() 方法。
  1. const iterator = ObjectSetPrototypeOf({ 
  2.   next() { 
  3.     // {1} 首先,我們會消費所有未讀消息 
  4.     const value = unconsumedEvents.shift(); 
  5.     if (value) { 
  6.       return PromiseResolve(createIterResult(value, false)); 
  7.     } 
  8.  
  9.     // {2} 如果發生一次 error 就會執行 Promise.reject 拋出一個錯誤,在這個錯誤發生后也會停止事件監聽。 
  10.     if (error) { 
  11.       const p = PromiseReject(error); 
  12.       // Only the first element errors 
  13.       error = null
  14.       return p; 
  15.     } 
  16.  
  17.     // {3} 如果迭代器對象完成,Promise.resolve done 設置為 true 
  18.     if (finished) { 
  19.       return PromiseResolve(createIterResult(undefined, true)); 
  20.     } 
  21.  
  22.     // {4} 等待直到一個事件發生 
  23.     return new Promise(function(resolve, reject) { 
  24.       unconsumedPromises.push({ resolve, reject }); 
  25.     }); 
  26.   } 
  27.   ... 

在 Stream 中使用 asyncIterator

Node.js Stream 模塊的可讀流對象在 v10.0.0 版本試驗性的支持了 [Symbol.asyncIterator] 屬性,可以使用 for await...of 語句遍歷可讀流對象,在 v11.14.0 版本以上已 LTS 支持。

異步迭代器 與 Readable

借助 fs 模塊創建一個可讀流對象 readable。

  1. const fs = require('fs'); 
  2. const readable = fs.createReadStream('./hello.txt', { 
  3.   encoding: 'utf-8'
  4.   highWaterMark: 1 
  5. }); 

以往當我們讀取一個文件時,需要監聽 data 事件,拼接數據,在 end 事件里判斷完成,如下所示:

  1. function readText(readable) { 
  2.   let data = ''
  3.   return new Promise((resolve, reject) => { 
  4.     readable.on('data', chunk => { 
  5.       data += chunk; 
  6.     }) 
  7.     readable.on('end', () => { 
  8.       resolve(data); 
  9.     }); 
  10.     readable.on('error', err => { 
  11.       reject(err); 
  12.     }); 
  13.   }) 

現在通過異步迭代器能以一種更簡單的方式實現,如下所示:

  1. async function readText(readable) { 
  2.   let data = ''
  3.   for await (const chunk of readable) { 
  4.     data += chunk; 
  5.   } 
  6.   return data; 

現在我們可以調用 readText 做測試。

  1. (async () => { 
  2.   try { 
  3.     const res = await readText(readable); 
  4.     console.log(res); // Hello Node.js 
  5.   } catch (err) { 
  6.     console.log(err.message); 
  7.   } 
  8. })(); 

使用 for await...of 語句遍歷 readable,如果循環中因為 break 或 throw 一個錯誤而終止,則這個 Stream 也將被銷毀。

上述示例中 chunk 每次接收的值是根據創建可讀流時 highWaterMark 這個屬性決定的,為了能清晰的看到效果,在創建 readable 對象時我們指定了 highWaterMark 屬性為 1 每次只會讀取一個字符。

從 Node.js 源碼看 readable 是如何實現的 asyncIterator

與同步的迭代器遍歷語句 for...of 類似,用于 asyncIterator 異步迭代器遍歷的 for await...of 語句在循環內部會默認調用可迭代對象 readable 的 Symbol.asyncIterator() 方法得到一個異步迭代器對象,之后調用迭代器對象的 next() 方法獲取結果。

本文以 Node.js 源碼 v14.x 為例來看看源碼是如何實現的。當我們調用 fs.createReadStream() 創建一個可讀流對象時,對應的該方法內部會調用 ReadStream 構造函數

  1. // https://github.com/nodejs/node/blob/v14.x/lib/fs.js#L2001 
  2. function createReadStream(path, options) { 
  3.   lazyLoadStreams(); 
  4.   return new ReadStream(path, options); 

其實在 ReadStream 這個構造函數里沒有我們要找的,重點是它通過原型的方式繼承了 Stream 模塊的 Readable 構造函數。

  1. function ReadStream(path, options) { 
  2.   ... 
  3.   Readable.call(this, options); 

那么現在我們重點來看看 Readable 這個構造函數的實現。

Readable 原型上定義了 SymbolAsyncIterator 屬性,該方法返回了一個由生成器函數創建的迭代器對象。

  1. // for await...of 循環會調用 
  2. Readable.prototype[SymbolAsyncIterator] = function() { 
  3.   let stream = this; 
  4.   ... 
  5.   const iter = createAsyncIterator(stream); 
  6.   iter.stream = stream; 
  7.   return iter; 
  8. }; 
  9.  
  10. // 聲明一個創建異步迭代器對象的生成器函數 
  11. async function* createAsyncIterator(stream) { 
  12.   let callback = nop; 
  13.  
  14.   function next(resolve) { 
  15.     if (this === stream) { 
  16.       callback(); 
  17.       callback = nop; 
  18.     } else { 
  19.       callback = resolve; 
  20.     } 
  21.   } 
  22.  
  23.   const state = stream._readableState; 
  24.  
  25.   let error = state.errored; 
  26.   let errorEmitted = state.errorEmitted; 
  27.   let endEmitted = state.endEmitted; 
  28.   let closeEmitted = state.closeEmitted; 
  29.   
  30.   // error、endclose 事件控制了什么時候結束迭代器遍歷。 
  31.   stream 
  32.     .on('readable'next
  33.     .on('error'function(err) { 
  34.       error = err; 
  35.       errorEmitted = true
  36.       next.call(this); 
  37.     }) 
  38.     .on('end'function() { 
  39.       endEmitted = true
  40.       next.call(this); 
  41.     }) 
  42.     .on('close'function() { 
  43.       closeEmitted = true
  44.       next.call(this); 
  45.     }); 
  46.  
  47.   try { 
  48.     while (true) { 
  49.       // stream.read() 從內部緩沖拉取并返回數據。如果沒有可讀的數據,則返回 null 
  50.       // readable 的 destroy() 方法被調用后 readable.destroyed 為 true,readable 即為下面的 stream 對象 
  51.       const chunk = stream.destroyed ? null : stream.read(); 
  52.       if (chunk !== null) { 
  53.         yield chunk; // 這里是關鍵,根據迭代器協議定義,迭代器對象要返回一個 next() 方法,使用 yield 返回了每一次的值 
  54.       } else if (errorEmitted) { 
  55.         throw error; 
  56.       } else if (endEmitted) { 
  57.         break; 
  58.       } else if (closeEmitted) { 
  59.         break; 
  60.       } else { 
  61.         await new Promise(next); 
  62.       } 
  63.     } 
  64.   } catch (err) { 
  65.     destroyImpl.destroyer(stream, err); 
  66.     throw err; 
  67.   } finally { 
  68.     if (state.autoDestroy || !endEmitted) { 
  69.       // TODO(ronag): ERR_PREMATURE_CLOSE? 
  70.       destroyImpl.destroyer(stream, null); 
  71.     } 
  72.   } 

通過上面源碼可以看到可讀流的異步迭代器實現使用了生成器函數 Generator yield,那么對于 readable 對象遍歷除了 for await...of 遍歷之外,其實也是可以直接使用調用生成器函數的 next() 方法也是可以的。

  1. const ret = readable[Symbol.asyncIterator]() 
  2. console.log(await ret.next()); // { value: 'H', done: false } 
  3. console.log(await ret.next()); // { value: 'e', done: false } 

異步迭代器與 Writeable

通過上面講解,我們知道了如何遍歷異步迭代器從 readable 對象獲取數據,但是你有沒有想過如何將一個異步迭代器對象傳送給可寫流?正是此處要講的。

從迭代器中創建可讀流

Node.js 流對象提供了一個實用方法 stream.Readable.from(),對于符合 Symbol.asyncIterator 或 Symbol.iterator 協議的可迭代對象(Iterable)會先創建一個可讀流對象 readable 之后從迭代器中構建 Node.js 可讀流。

以下是 從理解到實現輕松掌握 ES6 中的迭代器 一文中曾講解過的例子,r1 就是我們創建的可迭代對象。使用 stream.Readable.from() 方法則可以將可迭代對象構造為一個可讀流對象 readable。

  1. function Range(start, end) { 
  2.   this.id = start; 
  3.   this.end = end
  4. Range.prototype[Symbol.asyncIterator] = async function* () { 
  5.   while (this.id <= this.end) { 
  6.     yield this.id++; 
  7.   } 
  8. const r1 = new Range(0, 3); 
  9. const readable = stream.Readable.from(r1); 
  10. readable.on('data', chunk => { 
  11.   console.log(chunk); // 0 1 2 3 
  12. }); 

傳送異步迭代器到可寫流

使用 pipeline 可以將一系列的流和生成器函數通過管道一起傳送,并在管道完成時獲取通知。

使用 util.promisify 將 pipeline 轉化為 promise 形式。

  1. const util = require('util'); 
  2. const pipeline = util.promisify(stream.pipeline); // 轉為 promise 形式 
  3.  
  4. (async () => { 
  5.   try { 
  6.     const readable = stream.Readable.from(r1); 
  7.     const writeable = fs.createWriteStream('range.txt'); 
  8.     await pipeline( 
  9.       readable, 
  10.       async function* (source) { 
  11.         for await (const chunk of source) { 
  12.           yield chunk.toString(); 
  13.         } 
  14.       }, 
  15.       writeable 
  16.     ); 
  17.     console.log('Pipeline 成功'); 
  18.   } catch (err) { 
  19.     console.log(err.message); 
  20.   } 
  21. })() 

在寫入數據時,傳入的 chunk 需是 String、Buffer、Uint8Array 類型,否則 writeable 對象在寫入數據時會報錯。由于我們自定義的可迭代對象 r1 里最終返回的值類型為 Number 在這里需要做次轉換,管道中間的生成器函數就是將每次接收到的值轉為字符串。

在 MongoDB 中使用 asyncIterator

除了上面我們講解的 Node.js 官方提供的幾個模塊之外,在 MongoDB 中也是支持異步迭代的,不過介紹這點的點資料很少,MongoDB 是通過一個游標的概念來實現的。

MongoDB 中的 cursor

本處以 Node.js 驅動 mongodb 模塊來介紹,當我們調用 db.collection.find() 這個方法返回的是一個 cursor(游標),如果想要訪問文檔那么我們需要迭代這個游標對象來完成,但是通常我們會直接使用 toArray() 這個方法來完成。

下面讓我們通過一段示例來看,現在我們有一個數據庫 example,一個集合 books,表里面有兩條記錄,如下所示:

image.png

查詢 books 集合的所有數據,以下代碼中定義的 myCursor 變量就是游標對象,它不會自動進行迭代,可以使用游標對象的 hasNext() 方法檢測是否還有下一個,如果有則可以使用 next() 方法訪問數據。

通過以下日志記錄可以看到在第三次調用 hasNext() 時返回了 false,如果此時在調用 next() 就會報錯,游標已關閉,也就是已經沒有數據可遍歷了。

  1. const MongoClient = require('mongodb').MongoClient; 
  2. const dbConnectionUrl = 'mongodb://127.0.0.1:27017/example'
  3.  
  4. (async () => { 
  5.   const client = await MongoClient.connect(dbConnectionUrl, { useUnifiedTopology: true }); 
  6.   const bookColl = client.db('example').collection('books'); 
  7.   const myCursor = await bookColl.find(); 
  8.   
  9.   console.log(await myCursor.hasNext()); // true 
  10.   console.log((await myCursor.next()).name); // 深入淺出Node.js 
  11.   console.log(await myCursor.hasNext()); // true 
  12.   console.log((await myCursor.next()).name); // Node.js實戰 
  13.   console.log(await myCursor.hasNext()); // false 
  14.   console.log((await myCursor.next()).name); // MongoError: Cursor is closed 
  15. })() 

直接調用 next() 也可檢測,如果還有值則返回該條記錄,否則 next() 方法返回 null。

  1. console.log((await myCursor.next()).name); 
  2. console.log((await myCursor.next()).name); 
  3. console.log((await myCursor.next())); 

MongoDB 異步迭代器實現源碼分析

MongoDB 中游標是以 hasNext() 返回 false 或 next() 返回為 null 來判斷是否達到游標尾部,與之不同的是在我們的 JavaScript 可迭代協議定義中是要有一個 Symbol.asyncIterator 屬性的迭代器對象,且迭代器對象是 { done, value } 的形式。

幸運的是 MongoDB Node.js 驅動已經幫助我們實現了這一功能,通過一段源碼來看在 MongoDB 中的實現。

  • find 方法

find 方法返回的是一個可迭代游標對象。

  1. // https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/collection.js#L470 
  2.  
  3. Collection.prototype.find = deprecateOptions( 
  4.   { 
  5.     name'collection.find'
  6.     deprecatedOptions: DEPRECATED_FIND_OPTIONS, 
  7.     optionsIndex: 1 
  8.   }, 
  9.   function(query, options, callback) { 
  10.     const cursor = this.s.topology.cursor
  11.       new FindOperation(this, this.s.namespace, findCommand, newOptions), 
  12.       newOptions 
  13.     ); 
  14.  
  15.     return cursor
  16.   } 
  17. ); 
  • CoreCursor

核心實現就在這里,這是一個游標的核心類,MongoDB Node.js 驅動程序中所有游標都是基于此,如果當前支持異步迭代器,則在 CoreCursor 的原型上設置 Symbol.asyncIterator 屬性,返回基于 Promise 實現的異步迭代器對象,這符合 JavaScript 中關于異步可迭代對象的標準定義。

  1. // https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/core/cursor.js#L610 
  2.  
  3. if (SUPPORTS.ASYNC_ITERATOR) { 
  4.   CoreCursor.prototype[Symbol.asyncIterator] = require('../async/async_iterator').asyncIterator; 
  1. // https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/async/async_iterator.js#L16 
  2.  
  3. // async function* asyncIterator() { 
  4. //   while (true) { 
  5. //     const value = await this.next(); 
  6. //     if (!value) { 
  7. //       await this.close(); 
  8. //       return
  9. //     } 
  10.  
  11. //     yield value; 
  12. //   } 
  13. // } 
  14.  
  15. // TODO: change this to the async generator function above 
  16. function asyncIterator() { 
  17.   const cursor = this; 
  18.  
  19.   return { 
  20.     nextfunction() { 
  21.       return Promise.resolve() 
  22.         .then(() => cursor.next()) 
  23.         .then(value => { 
  24.           if (!value) { 
  25.             return cursor.close().then(() => ({ value, done: true })); 
  26.           } 
  27.           return { value, done: false }; 
  28.         }); 
  29.     } 
  30.   }; 

目前是默認使用的 Promise 的形式實現的,上面代碼中有段 TODO, Node.js 驅動關于異步迭代實現這塊可能后期會改為基于生成器函數的實現,這對我們使用是沒變化的.

使用 for await...of 遍歷可迭代對象 cursor

還是基于我們上面的示例,如果換成 for await...of 語句遍歷就簡單的多了。

  1. const myCursor = await bookColl.find(); 
  2. for await (val of myCursor) { 
  3.   console.log(val.name); 

在 MongoDB 中的聚合管道中使用也是如此,就不再做過多分析了,如下所示:

  1. const myCursor = await bookColl.aggregate(); 
  2. for await (val of myCursor) { 
  3.   console.log(val.name); 

對于遍歷龐大的數據集時,使用游標它會批量加載 MongoDB 中的數據,我們也不必擔心一次將所有的數據存在于服務器的內存中,造成內存壓力過大。

傳送 cursor 到可寫流

MongoDB 游標對象本身也是一個可迭代對象(Iterable),結合流模塊的 Readable.from() 則可轉化為可讀流對象,是可以通過流的方式進行寫入文件。

但是要注意 MongoDB 中的游標每次返回的是單條文檔記錄,是一個 Object 類型的,如果直接寫入,可寫流是會報參數類型錯誤的,因為可寫流默認是一個非對象模式(僅接受 String、Buffer、Unit8Array),所以才會看到在 pipeline 傳輸的中間又使用了生成器函數,將每次接收的數據塊處理為可寫流 Buffer 類型。

  1. const myCursor = await bookColl.find(); 
  2. const readable = stream.Readable.from(myCursor); 
  3. await pipeline( 
  4.   readable, 
  5.   async function* (source) { 
  6.     for await (const chunk of source) { 
  7.       yield Buffer.from(JSON.stringify(chunk)); 
  8.     } 
  9.   }, 
  10.   fs.createWriteStream('books.txt'
  11. ); 

Reference

  • https://nodejs.org/dist/latest-v14.x/docs/api/stream.html#stream_readable_symbol_asynciterator
  • https://nodejs.org/dist/latest-v14.x/docs/api/events.html#events_events_on_emitter_eventname
  • https://docs.mongodb.com/manual/tutorial/iterate-a-cursor/index.html

本文轉載自微信公眾號「Nodejs技術棧」,可以通過以下二維碼關注。轉載本文請聯系Nodejs技術棧公眾號。

 

責任編輯:武曉燕 來源: Nodejs技術棧
相關推薦

2021-03-04 23:12:57

Node.js異步迭代器開發

2021-04-06 10:15:29

Node.jsHooks前端

2025-01-13 00:00:00

2021-12-01 00:05:03

Js應用Ebpf

2021-09-07 07:53:43

工具

2021-03-16 16:16:41

GeneratorWebsockets前端

2011-12-23 13:58:57

node.js

2021-01-26 08:07:44

Node.js模塊 Async

2012-03-09 09:11:29

Node.js

2022-01-11 17:23:12

配置Node.jsNode

2021-07-26 05:24:59

Node.js SO_RESUEPORLibuv

2021-10-03 15:02:50

HTTPNodejs

2021-10-22 08:29:14

JavaScript事件循環

2017-04-10 13:28:32

Node.jsJavaScript

2014-03-07 13:43:32

Node.jsNode

2021-08-20 09:00:00

Node.js開發API

2022-08-28 16:30:34

Node.jsDocker指令

2023-06-30 08:05:41

2024-01-05 08:49:15

Node.js異步編程

2021-05-21 09:36:42

開發技能代碼
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲电影在线播放 | 久久综合九色综合欧美狠狠 | 91伊人 | 国产在线精品一区二区三区 | 欧美一区二区三区国产 | 亚洲欧美中文字幕在线观看 | 黄色综合 | 国产精品99久久久久久大便 | 精品久久视频 | 色综合中文 | 一二区视频 | 97高清国语自产拍 | 国产在线精品一区二区 | 在线观看视频91 | 亚洲国产高清在线 | 亚洲一区二区欧美 | 久久综合九色综合欧美狠狠 | 国产一区二区三区在线看 | 亚洲国产高清在线观看 | 天堂一区二区三区 | 国产在线资源 | 欧美在线天堂 | 久久久久久天堂 | 亚洲欧美成人在线 | 国产剧情一区 | 偷拍亚洲色图 | 精品成人免费视频 | 干干干操操操 | 精品久久精品 | 国产日韩欧美激情 | 欧美三级电影在线播放 | 亚洲福利在线视频 | 中文字幕日韩一区 | 韩国欧洲一级毛片 | 欧美久久一区 | 美国av毛片| 91精品国产综合久久婷婷香蕉 | 天堂三级 | 99视频久| 亚洲国产精品区 | 久久国产精品视频 |