Node.js中的異步Generator函數和Websockets
異步 generator 函數是 ES2018 中新增的特性。Node.js 從 v10 版本增加了對異步 generator 函數的支持。異步 generator 函數看似一個相當小眾特性特性,但是卻為 node.js websocket 框架提供了一個靈巧的使用機會。
在這篇文章中,我將說明 Node.js websocket 框架將如何使用異步 generator 函數。
HTTP 框架分類
首先,想一下 Express 或 Hapi 之類的 HTTP 服務器框架。一般來說,大多數 HTTP 服務器框架都屬于以下三種之一:
1. 顯式響應。 在 Express 中發送一個 HTTP 響應,你必須調用 res.end(),res.json() 或者 res 對象上的一些其他方法。換句話說,你必須顯式調用一個方法來發送一個響應。
2. 使用 return 隱式響應。 另一方面,Hapi 在 v17 中明確地刪除了 reply() 函數,也就是說 Hapi 沒有等同于 res 的方式。如果需要發送一個響應。你只需在請求的處理方法中 return 一個返回值。之后 Hapi 就會將 return 的值封裝進一個 HTTP 響應中。
3. 在適當的位置修改響應。 Koa 使用了一種混合了以上兩種實現的獨特處理方式。你將以修改 ctx 對象的方式,替代調用 res 對象的方法來構建響應。
換句話說,一些 HTTP 框架要求你顯式調用方法來發送 HTTP 響應,另一些框架會提供給你一個可更改的 HTTP 響應對象,還有一些框架僅需要處理函數中 return 一個值。
Websockets 和 HTTP 的區別在于,Websockets 服務器可以在任何時間向 socket 推送消息,不管是不是基于某條消息的響應。也就是說,初級的 websocket 框架,例如 ws, 看起來很像 “顯式響應” 模式:你需要顯式調用一個方法用于發送一條消息。
然而,是否可以在保持允許消息多發這個優點的同時,使 websockets 可以實現隱式響應?這就是異步 generator 產生的原因。
從服務器上讀取大塊數據
假設你有一個一次讀取一堆文檔的 Mongoose 指針,并且你希望用 websocket 在每一個文檔讀出時盡快將它發送出去。這種方式有助于在任何時刻都使服務器的內存使用量保持在最小:客戶端可以獲取所有的數據,而服務器卻不用為此在內存中一次保存所有的數據。舉個例子,這是使用 async/await 方式讀取一個指針的實現:
- const User = mongoose.model('User', mongoose.Schema({ name: String }));
- const cursor = Model.find().cursor();
- for await (const doc of cursor) {
- console.log(doc.name); // Print user names 1 by 1.
- }
使 generator 函數變得有趣的地方在于,在一個函數中 yield 方法可以被調用多次,并且在上次停止的地方繼續運行,除了這點以外,yield 方法和 return 方法類似。
- const User = mongoose.model('User', mongoose.Schema({ name: String }));
- async function* streamUsers() {
- const cursor = Model.find().cursor();
- for await (const doc of cursor) {
- // Yielding each doc behaves like multiple implicit responses, if you have
- // a framework that supports it.
- yield doc;
- }
- }
以下是如何使用 Node.js 編寫一個 Websocket 服務器:
- const WebSocket = require('ws');
- const server = new WebSocket.Server({
- port: 8080
- });
- server.on('connection', function(socket) {
- socket.on('message', function(msg) {
- // Handle message
- });
- });
至此,接下來要做的是為 websocket 服務器添加 streamUsers() 方法。假設收到的每條消息都是有效的 JSON,并且都有屬性 action 和 id。當 action === 'streamUsers'時,streamUsers() 就會被執行,并且基于 socket 向外發送每個被 Mongoose cursor 查詢出來的用戶。
- const WebSocket = require('ws');
- const server = new WebSocket.Server({
- port: 8080
- });
- server.on('connection', function(socket) {
- socket.on('message', function(msg) {
- msg = JSON.parse(msg);
- if (msg.action === 'streamUsers') {
- void async function() {
- // Send 1 message per user, as opposed to loading all users and then
- // sending them all in 1 message.
- for await (const doc of streamUsers()) {
- socket.send(JSON.stringify({ id: msg.id, doc }));
- }
- }().catch(err => socket.send(JSON.stringify({ id: msg.id, error: err.message })));
- }
- });
- });
以下是如何通過 websocket 客戶端調用 streamUsers() 方法:
- const client = new WebSocket('ws://localhost:8080');
- // Will print each user doc 1 at a time.
- client.on('message', msg => console.log(msg));
- await new Promise(resolve => client.once('open', resolve));
- client.send(JSON.stringify({ action: 'streamUsers', id: 1 }));
后續
異步 generator 函數提供了一種創建更高級的,如同一些 HTTP 框架(例如 Hapi 和 Fastify)那樣,基于隱式響應的 websocket 框架的機會。而隱式響應的主要優勢就在于,你在業務邏輯中不需要關注框架是通過 websocket,HTTP 輪詢或是其他某種方式來發送結果??蚣茏杂墒?Javascript 編程更輕便并且更容易測試。
通過將所有產生的值存放在一個數組中,或者讓客戶端發起多次請求對一個指針進行迭代,streamUsers() 方法就可以很容易的在一個 HTTP 框架,或者是一個使用輪詢的 HTTP 框架中重用。沒有異步 generator 函數,所有這些都是不能實現的。