Nodejs線程池的設計與實現
本文轉載自微信公眾號「編程雜技」,作者theanarkh。轉載本文請聯系編程雜技公眾號。
前言:之前的版本不方便開放,重新設計了一版nodejs的線程池庫,本文介紹該庫的一些設計和實現。
nodejs雖然提供了線程的能力,但是很多時候,往往不能直接使用線程或者無限制地創建線程,比如我們有一個功能是cpu密集型的,如果一個請求就開一個線程,這很明顯不是最好的實踐,這時候,我們需要使用池化的技術,本文介紹在nodejs線程模塊的基礎上,如何設計和實現一個線程池庫(https://github.com/theanarkh/nodejs-threadpool或npm i nodejs-threadpool )。下面是線程池的總體架構。
設計一個線程池,在真正寫代碼之前,有很多設計需要考慮,大概如下:
1任務隊列的設計,一個隊列,多個線程互斥訪問,或者每個線程一個隊列,不需要互斥訪問。
2 線程退出的設計,可以由主線程檢測空閑線程,然后使子線程退出?;蛘咦泳€程退出,通知主線程??臻e不一定是沒有任務就退出,可以設計空閑時間達到閾值后退出,因為創建線程是有時間開銷的。
3 任務數的設計,每個線程可以有個任務數,還可以增加一個總任務數,即全部線程任務數加起來
4 選擇線程的設計,選擇任務數最少的線程。
5 線程類型的設計,可以區分核心線程和預備線程,任務少的時候,核心線程處理就行。任務多也創建預備線程幫忙處理。
6 線程池類型的設計,cpu密集型的,線程數等于核數,否則自定義線程數就行。
7 支持任務的取消和超時機制,防止一個任務時間過長或者死循環。
本文介紹的線程池具體設計思想如下(參考java):
1 主線程維護一個隊列,子線程的任務由子線程負責分發,不需要互斥訪問,子線程也不需要維護自己的隊列。
2 線程退出的設計,主線程負責檢查子線程空閑時間是否達到閾值,是則使子線程退出。
3 任務數的設計,主線程負責管理任務個數并應有相應的策略。
4 選擇線程的設計,選擇任務數最少的線程。
5 線程類型的設計,區分核心線程和預備線程,任務少的時候,核心線程處理就行。任務多也創建預備線程幫忙處理。
6 線程池類型的設計,cpu密集型的,線程數等于核數,否則自定義線程數就行。
7 支持任務的取消和超時機制,超時或者取消的時候,主線程判斷任務是待執行還是正在執行,如果是待執行則從任務隊列中刪除,如果是正在執行則殺死對應的子線程。下面我們看一下具體的設計。
1 主線程和子線程通信的數據結構
- // 任務類,一個任務對應一個id
- class Work {
- constructor({workId, filename, options}) {
- // 任務id
- this.workId = workId;
- // 任務邏輯,字符串或者js文件路徑
- this.filename = filename;
- // 任務返回的結果
- this.data = null;
- // 任務返回的錯誤
- this.error = null;
- // 執行任務時傳入的參數,用戶定義
- this.options = options;
- }
- }
主線程給子線程分派一個任務的時候,就給子線程發送一個Work對象。在nodejs中線程間通信需要經過序列化和反序列化,所以通信的數據結構包括的信息不能過多。
2 子線程處理任務邏輯
- const { parentPort } = require('worker_threads');
- const vm = require('vm');
- const { isFunction, isJSFile } = require('./utils');
- // 監聽主線程提交過來的任務
- parentPort.on('message', async (work) => {
- try {
- const { filename, options } = work;
- let aFunction;
- if (isJSFile(filename)) {
- aFunction = require(filename);
- } else {
- aFunction = vm.runInThisContext(`(${filename})`);
- }
- if (!isFunction(aFunction)) {
- throw new Error('work type error: js file or string');
- }
- work.data = await aFunction(options);
- parentPort.postMessage({event: 'done', work});
- } catch (error) {
- work.error = error.toString();
- parentPort.postMessage({event: 'error', work});
- }
- });
- process.on('uncaughtException', (...rest) => {
- console.error(...rest);
- });
- process.on('unhandledRejection', (...rest) => {
- console.error(...rest);
- });
子線程的邏輯比較簡單,就是監聽主線程分派過來的任務,然后執行任務,執行完之后通知主線程。任務支持js文件和字符串代碼的形式。需要返回一個Promise或者async函數。用于用于通知主線程任務已經完成。
3 線程池和業務的通信
- // 提供給用戶側的接口
- class UserWork extends EventEmitter {
- constructor({ workId }) {
- super();
- // 任務id
- this.workId = workId;
- // 支持超時取消任務
- this.timer = null;
- // 任務狀態
- this.state = WORK_STATE.PENDDING;
- }
- // 超時后取消任務
- setTimeout(timeout) {
- this.timer = setTimeout(() => {
- this.timer && this.cancel() && this.emit('timeout');
- }, ~~timeout);
- }
- // 取消之前設置的定時器
- clearTimeout() {
- clearTimeout(this.timer);
- this.timer = null;
- }
- // 直接取消任務,如果執行完了就不能取消了,this.terminate是動態設置的
- cancel() {
- if (this.state === WORK_STATE.END || this.state === WORK_STATE.CANCELED) {
- return false;
- } else {
- this.terminate();
- return true;
- }
- }
- // 修改任務狀態
- setState(state) {
- this.state = state;
- }
- }
業務提交一個任務給線程池的時候,線程池會返回一個UserWork類,業務側通過UserWork類和線程池通信。
4 管理子線程的數據結構
- // 管理子線程的數據結構
- class Thread {
- constructor({ worker }) {
- // nodejs的Worker對象,nodejs的worker_threads模塊的Worker
- this.worker = worker;
- // 線程狀態
- this.state = THREAD_STATE.IDLE;
- // 上次工作的時間
- this.lastWorkTime = Date.now();
- }
- // 修改線程狀態
- setState(state) {
- this.state = state;
- }
- // 修改線程最后工作時間
- setLastWorkTime(time) {
- this.lastWorkTime = time;
- }
- }
線程池中維護了多個子線程,Thread類用于管理子線程的信息。
5 線程池 線程池的實現是核心,我們分為幾個部分講。
5.1 支持的配置
- constructor(options = {}) {
- this.options = options;
- // 子線程隊列
- this.workerQueue = [];
- // 核心線程數
- this.coreThreads = ~~options.coreThreads || config.CORE_THREADS;
- // 線程池最大線程數,如果不支持動態擴容則最大線程數等于核心線程數
- this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads;
- // 超過任務隊列長度時的處理策略
- this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD;
- // 是否預創建子線程
- this.preCreate = options.preCreate === true;
- // 線程最大空閑時間,達到后自動退出
- this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME;
- // 是否預創建線程池
- this.preCreate && this.preCreateThreads();
- // 保存線程池中任務對應的UserWork
- this.workPool = {};
- // 線程池中當前可用的任務id,每次有新任務時自增1
- this.workId = 0;
- // 線程池中的任務隊列
- this.queue = [];
- // 線程池總任務數
- this.totalWork = 0;
- // 支持的最大任務數
- this.maxWork = ~~options.maxWork || config.MAX_WORK;
- // 處理任務的超時時間,全局配置
- this.timeout = ~~options.timeout;
- this.pollIdle();
- }
上面的代碼列出了線程池所支持的能力。
5.2 創建線程
- newThread() {
- const worker = new Worker(workerPath);
- const thread = new Thread({worker});
- this.workerQueue.push(thread);
- const threadId = worker.threadId;
- worker.on('exit', () => {
- // 找到該線程對應的數據結構,然后刪除該線程的數據結構
- const position = this.workerQueue.findIndex(({worker}) => {
- return worker.threadId === threadId;
- });
- const exitedThread = this.workerQueue.splice(position, 1);
- // 退出時狀態是BUSY說明還在處理任務(非正常退出)
- this.totalWork -= exitedThread.state === THREAD_STATE.BUSY ? 1 : 0;
- });
- // 和子線程通信
- worker.on('message', (result) => {
- const {
- work,
- event,
- } = result;
- const { data, error, workId } = work;
- // 通過workId拿到對應的userWork
- const userWork = this.workPool[workId];
- // 不存在說明任務被取消了
- if (!userWork) {
- return;
- }
- // 修改線程池數據結構
- this.endWork(userWork);
- // 修改線程數據結構
- thread.setLastWorkTime(Date.now());
- // 還有任務則通知子線程處理,否則修改子線程狀態為空閑
- if (this.queue.length) {
- // 從任務隊列拿到一個任務交給子線程
- this.submitWorkToThread(thread, this.queue.shift());
- } else {
- thread.setState(THREAD_STATE.IDLE);
- }
- switch(event) {
- case 'done':
- // 通知用戶,任務完成
- userWork.emit('done', data);
- break;
- case 'error':
- // 通知用戶,任務出錯
- if (EventEmitter.listenerCount(userWork, 'error')) {
- userWork.emit('error', error);
- }
- break;
- default: break;
- }
- });
- worker.on('error', (...rest) => {
- console.error(...rest);
- });
- return thread;
- }
創建線程,并保持線程對應的數據結構、退出、通信管理、任務分派。子線程執行完任務后,會通知線程池,主線程通知用戶。
5.3 選擇線程
- selectThead() {
- // 找出空閑的線程,把任務交給他
- for (let i = 0; i < this.workerQueue.length; i++) {
- if (this.workerQueue[i].state === THREAD_STATE.IDLE) {
- return this.workerQueue[i];
- }
- }
- // 沒有空閑的則隨機選擇一個
- return this.workerQueue[~~(Math.random() * this.workerQueue.length)];
- }
當用戶給線程池提交一個任務時,線程池會選擇一個空閑的線程處理該任務。如果沒有可用線程則任務插入待處理隊列等待處理。
5.4 提交任務
- // 給線程池提交一個任務
- submit(filename, options = {}) {
- return new Promise(async (resolve, reject) => {
- let thread;
- // 沒有線程則創建一個
- if (this.workerQueue.length) {
- thread = this.selectThead();
- // 該線程還有任務需要處理
- if (thread.state === THREAD_STATE.BUSY) {
- // 子線程個數還沒有達到核心線程數,則新建線程處理
- if (this.workerQueue.length < this.coreThreads) {
- thread = this.newThread();
- } else if (this.totalWork + 1 > this.maxWork){
- // 總任務數已達到閾值,還沒有達到線程數閾值,則創建
- if(this.workerQueue.length < this.maxThreads) {
- thread = this.newThread();
- } else {
- // 處理溢出的任務
- switch(this.discardPolicy) {
- case DISCARD_POLICY.ABORT:
- return reject(new Error('queue overflow'));
- case DISCARD_POLICY.CALLER_RUN:
- const workId = this.generateWorkId();
- const userWork = new UserWork({workId});
- userWork.setState(WORK_STATE.RUNNING);
- userWork.terminate = () => {
- userWork.setState(WORK_STATE.CANCELED);
- };
- this.timeout && userWork.setTimeout(this.timeout);
- resolve(userWork);
- try {
- let aFunction;
- if (isJSFile(filename)) {
- aFunction = require(filename);
- } else {
- aFunction = vm.runInThisContext(`(${filename})`);
- }
- if (!isFunction(aFunction)) {
- throw new Error('work type error: js file or string');
- }
- const result = await aFunction(options);
- // 延遲通知,讓用戶有機會取消或者注冊事件
- setImmediate(() => {
- if (userWork.state !== WORK_STATE.CANCELED) {
- userWork.setState(WORK_STATE.END);
- userWork.emit('done', result);
- }
- });
- } catch (error) {
- setImmediate(() => {
- if (userWork.state !== WORK_STATE.CANCELED) {
- userWork.setState(WORK_STATE.END);
- userWork.emit('error', error.toString());
- }
- });
- }
- return;
- case DISCARD_POLICY.OLDEST_DISCARD:
- const work = this.queue.shift();
- // maxWork為1時,work會為空
- if (work && this.workPool[work.workId]) {
- this.cancelWork(this.workPool[work.workId]);
- } else {
- return reject(new Error('no work can be discarded'));
- }
- break;
- case DISCARD_POLICY.DISCARD:
- return reject(new Error('discard'));
- case DISCARD_POLICY.NOT_DISCARD:
- break;
- default:
- break;
- }
- }
- }
- }
- } else {
- thread = this.newThread();
- }
- // 生成一個任務id
- const workId = this.generateWorkId();
- // 新建一個UserWork
- const userWork = new UserWork({workId});
- this.timeout && userWork.setTimeout(this.timeout);
- // 新建一個work
- const work = new Work({ workId, filename, options });
- // 修改線程池數據結構,把UserWork和Work關聯起來
- this.addWork(userWork);
- // 選中的線程正在處理任務,則先緩存到任務隊列
- if (thread.state === THREAD_STATE.BUSY) {
- this.queue.push(work);
- userWork.terminate = () => {
- this.cancelWork(userWork);
- this.queue = this.queue.filter((node) => {
- return node.workId !== work.workId;
- });
- }
- } else {
- this.submitWorkToThread(thread, work);
- }
- resolve(userWork);
- })
- }
- submitWorkToThread(thread, work) {
- const userWork = this.workPool[work.workId];
- userWork.setState(WORK_STATE.RUNNING);
- // 否則交給線程處理,并修改狀態和記錄該線程當前處理的任務id
- thread.setState(THREAD_STATE.BUSY);
- thread.worker.postMessage(work);
- userWork.terminate = () => {
- this.cancelWork(userWork);
- thread.setState(THREAD_STATE.DEAD);
- thread.worker.terminate();
- }
- }
- addWork(userWork) {
- userWork.setState(WORK_STATE.PENDDING);
- this.workPool[userWork.workId] = userWork;
- this.totalWork++;
- }
- endWork(userWork) {
- delete this.workPool[userWork.workId];
- this.totalWork--;
- userWork.setState(WORK_STATE.END);
- userWork.clearTimeout();
- }
- cancelWork(userWork) {
- delete this.workPool[userWork.workId];
- this.totalWork--;
- userWork.setState(WORK_STATE.CANCELED);
- userWork.emit('cancel');
- }
提交任務是線程池暴露給用戶側的接口,主要處理的邏輯包括,根據當前的策略判斷是否需要新建線程、選擇線程處理任務、排隊任務等,如果任務數達到閾值,則根據丟棄策略處理該任務。
5.5 空閑處理
- pollIdle() {
- setTimeout(() => {
- for (let i = 0; i < this.workerQueue.length; i++) {
- const node = this.workerQueue[i];
- if (node.state === THREAD_STATE.IDLE && Date.now() - node.lastWorkTime > this.maxIdleTime) {
- node.worker.terminate();
- }
- }
- this.pollIdle();
- }, 1000);
- }
當子線程空閑時間達到閾值后,主線程會殺死子線程,避免浪費系統資源。總結,這就是線程池具體的設計和實現,另外創建線程失敗會導致主線程掛掉,所以使用線程的時候,最后新開一個子進程來管理該線程池。