Nodejs v14源碼分析之Event模塊
本文轉載自微信公眾號「編程雜技」,作者theanarkh。轉載本文請聯系編程雜技公眾號。
events模塊是Node.js中比較簡單但是卻非常核心的模塊,Node.js中,很多模塊都繼承于events模塊,events模塊是發布、訂閱模式的實現。我們首先看一下如何使用events模塊。
- const { EventEmitter } = require('events');
- class Events extends EventEmitter {}
- const events = new Events();
- events.on('demo', () => {
- console.log('emit demo event');
- });
- events.emit('demo');
接下來我們看一下events模塊的具體實現。
1 初始化 當new一個EventEmitter或者它的子類時,就會進入EventEmitter的邏輯。
- function EventEmitter(opts) {
- EventEmitter.init.call(this, opts);
- }
- EventEmitter.init = function(opts) {
- // 如果是未初始化或者沒有自定義_events,則初始化
- if (this._events === undefined ||
- this._events === ObjectGetPrototypeOf(this)._events) {
- this._events = ObjectCreate(null);
- this._eventsCount = 0;
- }
- /*
- 初始化一類事件的處理函數個數的閾值
- 我們可以通過setMaxListeners接口設置,
- 如果沒有顯示設置,閾值則為defaultMaxListeners的值(10),
- 可通過getMaxListeners接口獲取
- */
- this._maxListeners = this._maxListeners || undefined;
- // 是否開啟捕獲promise reject,默認false
- if (opts && opts.captureRejections) {
- this[kCapture] = Boolean(opts.captureRejections);
- } else {
- this[kCapture] = EventEmitter.prototype[kCapture];
- }
- };
EventEmitter的初始化主要是初始化了一些數據結構和屬性。唯一支持的一個參數就是captureRejections,captureRejections表示當觸發事件,執行處理函數時,EventEmitter是否捕獲處理函數中的異常。后面我們會詳細講解。
2 訂閱事件 初始化完EventEmitter之后,我們就可以開始使用訂閱、發布的功能。我們可以通過addListener、prependListener、on、once訂閱事件。addListener和on是等價的,prependListener的區別在于處理函數會被插入到隊首,而默認是追加到隊尾。once注冊的處理函數,最多被執行一次。四個api都是通過_addListener函數實現的。下面我們看一下具體實現。
- function _addListener(target, type, listener, prepend) {
- let m;
- let events;
- let existing;
- events = target._events;
- // 還沒有初始化_events則初始化,_eventsCount為事件類型個數
- if (events === undefined) {
- events = target._events = ObjectCreate(null);
- target._eventsCount = 0;
- } else {
- /*
- 已經注冊過事件,則判斷是否定義了newListener事件,
- 是的話先觸發,如果監聽了newListener事件,每次注冊
- 其它事件時都會觸發newListener,相當于鉤子
- */
- if (events.newListener !== undefined) {
- target.emit('newListener',
- type,
- listener.listener ?
- listener.listener :
- listener);
- // newListener處理函數可能會修改_events,這里重新賦值
- events = target._events;
- }
- // 判斷是否已經存在處理函數
- existing = events[type];
- }
- // 不存在則以函數的形式存儲,否則以數組形式存儲
- if (existing === undefined) {
- events[type] = listener;
- // 新增一個事件類型,事件類型個數加一
- ++target._eventsCount;
- } else {
- /*
- existing是函數說明之前注冊過該事件一次,
- 否則說明existing為數組,則直接插入相應位置
- */
- if (typeof existing === 'function') {
- existing = events[type] =
- prepend ? [listener, existing] : [existing, listener];
- } else if (prepend) {
- existing.unshift(listener);
- } else {
- existing.push(listener);
- }
- // 處理告警,處理函數過多可能是因為之前的沒有刪除,造成內存泄漏
- m = _getMaxListeners(target);
- // 該事件處理函數達到閾值并且還沒有提示過警告信息則提示
- if (m > 0 && existing.length > m && !existing.warned) {
- existing.warned = true;
- const w = new Error('錯誤信息…');
- w.name = 'MaxListenersExceededWarning';
- w.emitter = target;
- w.type = type;
- w.count = existing.length;
- process.emitWarning(w);
- }
- }
- return target;
- }
接下來我們看一下once的實現,對比其它幾種api,once的實現相對比較復雜,因為我們要控制處理函數最多執行一次,所以我們需要保證在事件觸發的時候,執行用戶定義函數的同時,還需要刪除注冊的事件。
- ventEmitter.prototype.once = function once(type, listener) {
- this.on(type, _onceWrap(this, type, listener));
- return this;
- ;
- unction onceWrapper() {
- // 還沒有觸發過
- if (!this.fired) {
- // 刪除它
- this.target.removeListener(this.type, this.wrapFn);
- // 觸發了
- this.fired = true;
- // 執行
- if (arguments.length === 0)
- return this.listener.call(this.target);
- return this.listener.apply(this.target, arguments);
- }
- }
- // 支持once api
- function _onceWrap(target, type, listener) {
- // fired是否已執行處理函數,wrapFn包裹listener的函數
- const state = { fired: false, wrapFn: undefined, target, type, listener };
- // 生成一個包裹listener的函數
- const wrapped = onceWrapper.bind(state);
- /*
- 把原函數listener也掛到包裹函數中,用于事件沒有觸發前,
- 用戶主動刪除,見removeListener
- */
- wrapped.listener = listener;
- // 保存包裹函數,用于執行完后刪除,見onceWrapper
- state.wrapFn = wrapped;
- return wrapped;
- }
Once函數構造一個上下文(state)保存用戶處理函數和執行狀態等信息,然后通過bind返回一個帶有該上下文(state)的函數wrapped注冊到事件系統。當事件觸發時,在wrapped函數中首先移除wrapped,然后執行用戶的函數。Wrapped起到了劫持的作用。另外還需要在wrapped上保存用戶傳進來的函數,當用戶在事件觸發前刪除該事件時或解除該函數時,在遍歷該類事件的處理函數過程中,可以通過wrapped.listener找到對應的項進行刪除。
3 觸發事件 分析完事件的訂閱,接著我們看一下事件的觸發。
- EventEmitter.prototype.emit = function emit(type, ...args) {
- // 觸發的事件是否是error,error事件需要特殊處理
- let doError = (type === 'error');
- const events = this._events;
- // 定義了處理函數(不一定是type事件的處理函數)
- if (events !== undefined) {
- /*
- 如果觸發的事件是error,并且監聽了kErrorMonitor
- 事件則觸發kErrorMonitor事件
- */
- if (doError && events[kErrorMonitor] !== undefined)
- this.emit(kErrorMonitor, ...args);
- // 觸發的是error事件但是沒有定義處理函數
- doError = (doError && events.error === undefined);
- } else if (!doError)
- // 沒有定義處理函數并且觸發的不是error事件則不需要處理,
- return false;
- // If there is no 'error' event listener then throw.
- // 觸發的是error事件,但是沒有定義處理error事件的函數,則報錯
- if (doError) {
- let er;
- if (args.length > 0)
- er = args[0];
- // 第一個入參是Error的實例
- if (er instanceof Error) {
- try {
- const capture = {};
- /*
- 給capture對象注入stack屬性,stack的值是執行
- Error.captureStackTrace語句的當前棧信息,但是
- 不包括emit的部分
- */
- Error.captureStackTrace(capture, EventEmitter.prototype.emit);
- ObjectDefineProperty(er, kEnhanceStackBeforeInspector, {
- value: enhanceStackTrace.bind(this, er, capture),
- configurable: true
- });
- } catch {}
- throw er; // Unhandled 'error' event
- }
- let stringifiedEr;
- const { inspect } = require('internal/util/inspect');
- try {
- stringifiedEr = inspect(er);
- } catch {
- stringifiedEr = er;
- }
- const err = new ERR_UNHANDLED_ERROR(stringifiedEr);
- err.context = er;
- throw err; // Unhandled 'error' event
- }
- // 獲取type事件對應的處理函數
- const handler = events[type];
- // 沒有則不處理
- if (handler === undefined)
- return false;
- // 等于函數說明只有一個
- if (typeof handler === 'function') {
- // 直接執行
- const result = ReflectApply(handler, this, args);
- // 非空判斷是不是promise并且是否需要處理,見addCatch
- if (result !== undefined && result !== null) {
- addCatch(this, result, type, args);
- }
- } else {
- // 多個處理函數,同上
- const len = handler.length;
- const listeners = arrayClone(handler, len);
- for (let i = 0; i < len; ++i) {
- const result = ReflectApply(listeners[i], this, args);
- if (result !== undefined && result !== null) {
- addCatch(this, result, type, args);
- }
- }
- }
- return true;
- }
我們看到在Node.js中,對于error事件是特殊處理的,如果用戶沒有注冊error事件的處理函數,可能會導致程序掛掉,另外我們看到有一個addCatch的邏輯,addCatch是為了支持事件處理函數為異步模式的情況,比如async函數或者返回Promise的函數。
- function addCatch(that, promise, type, args) {
- // 沒有開啟捕獲則不需要處理
- if (!that[kCapture]) {
- return;
- }
- // that throws on second use.
- try {
- const then = promise.then;
- if (typeof then === 'function') {
- // 注冊reject的處理函數
- then.call(promise, undefined, function(err) {
- process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args);
- });
- }
- } catch (err) {
- that.emit('error', err);
- }
- }
- function emitUnhandledRejectionOrErr(ee, err, type, args) {
- // 用戶實現了kRejection則執行
- if (typeof ee[kRejection] === 'function') {
- ee[kRejection](err, type, ...args);
- } else {
- // 保存當前值
- const prev = ee[kCapture];
- try {
- /*
- 關閉然后觸發error事件,意義
- 1 防止error事件處理函數也拋出error,導致死循環
- 2 如果用戶處理了error,則進程不會退出,所以需要恢復
- kCapture的值如果用戶沒有處理error,則Node.js會觸發
- uncaughtException,如果用戶處理了uncaughtException
- 則需要恢復kCapture的值
- */
- ee[kCapture] = false;
- ee.emit('error', err);
- } finally {
- ee[kCapture] = prev;
- }
- }
- }
4 取消訂閱 我們接著看一下刪除事件處理函數的邏輯。
- function removeAllListeners(type) {
- const events = this._events;
- if (events === undefined)
- return this;
- /*
- 沒有注冊removeListener事件,則只需要刪除數據,
- 否則還需要觸發removeListener事件
- */
- if (events.removeListener === undefined) {
- // 等于0說明是刪除全部
- if (arguments.length === 0) {
- this._events = ObjectCreate(null);
- this._eventsCount = 0;
- } else if (events[type] !== undefined) {
- /*
- 否則是刪除某個類型的事件,是唯一一個處理函數,
- 則重置_events,否則刪除對應的事件類型
- */
- if (--this._eventsCount === 0)
- this._events = ObjectCreate(null);
- else
- delete events[type];
- }
- return this;
- }
- /*
- 說明注冊了removeListener事件,arguments.length === 0
- 說明刪除所有類型的事件
- */
- if (arguments.length === 0) {
- /*
- 逐個刪除,除了removeListener事件,
- 這里刪除了非removeListener事件
- */
- for (const key of ObjectKeys(events)) {
- if (key === 'removeListener') continue;
- this.removeAllListeners(key);
- }
- // 這里刪除removeListener事件,見下面的邏輯
- this.removeAllListeners('removeListener');
- // 重置數據結構
- this._events = ObjectCreate(null);
- this._eventsCount = 0;
- return this;
- }
- // 刪除某類型事件
- const listeners = events[type];
- if (typeof listeners === 'function') {
- this.removeListener(type, listeners);
- } else if (listeners !== undefined) {
- // LIFO order
- for (let i = listeners.length - 1; i >= 0; i--) {
- this.removeListener(type, listeners[i]);
- }
- }
- return this;
- }
removeAllListeners函數主要的邏輯有兩點,第一個是removeListener事件需要特殊處理,這類似一個鉤子,每次用戶刪除事件處理函數的時候都會觸發該事件。第二是removeListener函數。removeListener是真正刪除事件處理函數的實現。removeAllListeners是封裝了removeListener的邏輯。
- function removeListener(type, listener) {
- let originalListener;
- const events = this._events;
- // 沒有東西可刪除
- if (events === undefined)
- return this;
- const list = events[type];
- // 同上
- if (list === undefined)
- return this;
- // list是函數說明只有一個處理函數,否則是數組,如果list.listener === listener說明是once注冊的
- if (list === listener || list.listener === listener) {
- // type類型的處理函數就一個,并且也沒有注冊其它類型的事件,則初始化_events
- if (--this._eventsCount === 0)
- this._events = ObjectCreate(null);
- else {
- // 就一個執行完刪除type對應的屬性
- delete events[type];
- // 注冊了removeListener事件,則先注冊removeListener事件
- if (events.removeListener)
- this.emit('removeListener',
- type,
- list.listener || listener);
- }
- } else if (typeof list !== 'function') {
- // 多個處理函數
- let position = -1;
- // 找出需要刪除的函數
- for (let i = list.length - 1; i >= 0; i--) {
- if (list[i] === listener ||
- list[i].listener === listener) {
- // 保存原處理函數,如果有的話
- originalListener = list[i].listener;
- position = i;
- break;
- }
- }
- if (position < 0)
- return this;
- // 第一個則出隊,否則刪除一個
- if (position === 0)
- list.shift();
- else {
- if (spliceOne === undefined)
- spliceOne = require('internal/util').spliceOne;
- spliceOne(list, position);
- }
- // 如果只剩下一個,則值改成函數類型
- if (list.length === 1)
- events[type] = list[0];
- // 觸發removeListener
- if (events.removeListener !== undefined)
- this.emit('removeListener',
- type,
- originalListener || listener);
- }
- return this;
- };
以上就是events模塊的核心邏輯,另外還有一些工具函數就不一一分析。