成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

手寫 p-limit,40 行代碼實現并發控制

開發 前端
實現并發控制只要 40 多行代碼,其實這就是 p-limit 的源碼了,大家感興趣也可以自己實現一下。

前端代碼經常要處理各種異步邏輯。

有的是串行的:

const promise1 = new Promise(function(resolve) {
// 異步邏輯 1...
resolve();
});
const promise2 = new Promise(function(resolve) {
// 異步邏輯 2...
resolve();
});

promise1.then(() => promise2);
await promise1;
await promise2;

有的是并行的:

await Promise.all([promise1, promise2]);
await Promise.race([promise1, promise2]);

并行的異步邏輯有時還要做并發控制。

并發控制是常見的需求,也是面試常考的面試題。

一般我們會用 p-limit 來做:

import pLimit from 'p-limit';

const limit = pLimit(2);

const input = [
limit(() => fetchSomething('foo')),
limit(() => fetchSomething('bar')),
limit(() => doSomething())
];

const result = await Promise.all(input);
console.log(result);

比如上面這段邏輯,就是幾個異步邏輯并行執行,并且最大并發是 2。

那如何實現這樣的并發控制呢?

我們自己來寫一個:

首先,要傳入并發數量,返回一個添加并發任務的函數,我們把它叫做 generator:

const pLimit = (concurrency) => {
const generator = (fn, ...args) =>
new Promise((resolve) => {
//...
});

return generator;
}

這里添加的并發任務要進行排隊,所以我們準備一個 queue,并記錄當前在進行中的異步任務。

const queue = [];
let activeCount = 0;

const generator = (fn, ...args) =>
new Promise((resolve) => {
enqueue(fn, resolve, ...args);
});

添加的異步任務就入隊,也就是 enqueue。

enqueue 做的事情就是把一個異步任務添加到 queue 中,并且只要沒達到并發上限就再執行一批任務:

const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));

if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
};

具體運行的邏輯是這樣的:

const run = async (fn, resolve, ...args) => {
activeCount++;

const result = (async () => fn(...args))();

resolve(result);

try {
await result;
} catch {}

next();
};

計數,運行這個函數,改變最后返回的那個 promise 的狀態,然后執行完之后進行下一步處理:

下一步處理自然就是把活躍任務數量減一,然后再跑一個任務:

const next = () => {
activeCount--;

if (queue.length > 0) {
queue.shift()();
}
};

這樣就保證了并發的數量限制。

現在的全部代碼如下,只有 40 行代碼:

const pLimit = (concurrency) => {  
const queue = [];
let activeCount = 0;

const next = () => {
activeCount--;

if (queue.length > 0) {
queue.shift()();
}
};

const run = async (fn, resolve, ...args) => {
activeCount++;

const result = (async () => fn(...args))();

resolve(result);

try {
await result;
} catch {}

next();
};

const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));

if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
};

const generator = (fn, ...args) =>
new Promise((resolve) => {
enqueue(fn, resolve, ...args);
});

return generator;
};

這就已經實現了并發控制。

不信我們跑跑看:

準備這樣一段測試代碼:

const limit = pLimit(2);

function asyncFun(value, delay) {
return new Promise((resolve) => {
console.log('start ' + value);
setTimeout(() => resolve(value), delay);
});
}

(async function () {
const arr = [
limit(() => asyncFun('aaa', 2000)),
limit(() => asyncFun('bbb', 3000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000))
];

const result = await Promise.all(arr);
console.log(result);
})();

沒啥好說的,就是 setTimeout + promise,設置不同的 delay 時間。

并發數量為 2。

我們試下:

圖片

先并發執行前兩個任務,2s 的時候一個任務執行完,又執行了一個任務,然后再過一秒,都執行完了,有同時執行了兩個任務。

經過測試,我們已經實現了并發控制!

回顧一下我們實現的過程,其實就是一個隊列來保存任務,開始的時候一次性執行最大并發數的任務,然后每執行完一個啟動一個新的。

還是比較簡單的。

上面的 40 行代碼是最簡化的版本,其實還有一些可以完善的地方,我們繼續完善一下。

首先,我們要把并發數暴露出去,還要讓開發者可以手動清理任務隊列。

我們這樣寫:

Object.defineProperties(generator, {
activeCount: {
get: () => activeCount
},
pendingCount: {
get: () => queue.length
},
clearQueue: {
value: () => {
queue.length = 0;
}
}
});

用 Object.defineProperties 只定義 get 函數,這樣 activeCount、pendingCount 就是只能讀不能改的。

同時還提供了一個清空任務隊列的函數。

然后傳入的參數也加個校驗邏輯:

if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}

不是整數或者小于 0 就報錯,當然,Infinity 也是可以的。

最后,其實還有一個特別需要完善的點,就是這里:

const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));

if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
};

應該改成這樣:

const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));

(async () => {
await Promise.resolve();

if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
})();
};

因為 activeCount-- 的邏輯是在執行完任務之后才執行的,萬一任務還沒執行完,這時候 activeCount 就是不準的。

所以為了保證并發數量能控制準確,要等全部的微任務執行完再拿 activeCount。

怎么在全部的微任務執行完再執行邏輯呢?

加一個新的微任務不就行了?

所以有這樣的 await Promise.resolve(); 的邏輯。

這樣,就是一個完善的并發控制邏輯了,p-limit 也是這么實現的。

感興趣的同學可以自己試一下:

const pLimit = (concurrency) => {
if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}

const queue = [];
let activeCount = 0;

const next = () => {
activeCount--;

if (queue.length > 0) {
queue.shift()();
}
};

const run = async (fn, resolve, ...args) => {
activeCount++;

const result = (async () => fn(...args))();

resolve(result);

try {
await result;
} catch {}

next();
};

const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));

(async () => {
await Promise.resolve();

if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
})();
};

const generator = (fn, ...args) =>
new Promise((resolve) => {
enqueue(fn, resolve, ...args);
});

Object.defineProperties(generator, {
activeCount: {
get: () => activeCount
},
pendingCount: {
get: () => queue.length
},
clearQueue: {
value: () => {
queue.length = 0;
}
}
});

return generator;
};

const limit = pLimit(2);

function asyncFun(value, delay) {
return new Promise((resolve) => {
console.log('start ' + value);
setTimeout(() => resolve(value), delay);
});
}

(async function () {
const arr = [
limit(() => asyncFun('aaa', 2000)),
limit(() => asyncFun('bbb', 3000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000))
];

const result = await Promise.all(arr);
console.log(result);
})();

總結

js 代碼經常要處理異步邏輯的串行、并行,還可能要做并發控制,這也是面試常考的點。

實現并發控制的核心就是通過一個隊列保存所有的任務,然后最開始批量執行一批任務到最大并發數,然后每執行完一個任務就再執行一個新的。

其中要注意的是為了保證獲取的任務數量是準確的,要在所有微任務執行完之后再獲取數量。

實現并發控制只要 40 多行代碼,其實這就是 p-limit 的源碼了,大家感興趣也可以自己實現一下。

責任編輯:武曉燕 來源: 神光的編程秘籍
相關推薦

2022-04-15 08:07:21

ReactDiff算法

2017-03-28 21:03:35

代碼React.js

2017-07-24 15:06:02

代碼人臉識別實踐

2021-08-08 08:08:20

木馬無文件Cobalt Stri

2021-04-07 06:00:18

JavaScript 前端并發控制

2021-01-12 10:22:45

JavaScript并發控制前端

2009-02-09 10:06:03

并發控制Web應用悲觀鎖

2015-05-08 09:58:26

程序員代碼

2021-07-19 09:25:19

數據庫MySQL技術

2017-08-21 10:56:55

MySQL并發控制

2012-07-23 09:58:50

代碼程序員

2022-09-25 23:10:53

Python數據集機器學習

2020-12-17 08:06:33

CSS 日歷界面

2018-01-23 09:17:22

Python人臉識別

2022-03-26 22:28:06

加密通信Python

2022-04-09 09:11:33

Python

2024-10-07 10:02:28

2010-05-25 15:12:22

MySQL分頁

2024-06-17 08:40:16

2009-09-24 14:43:53

Hibernate樂觀
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: a久久 | 亚洲成人av在线播放 | www.成人.com| 高清欧美性猛交 | 亚洲成人av在线播放 | a免费观看| 欧美综合一区 | 日韩一区二区在线视频 | 国内激情av片 | 一区二区三区日本 | 成人午夜激情 | 日韩成人影院 | 欧美日韩精品 | 中文字幕精品一区久久久久 | 日韩在线综合网 | 亚洲性视频在线 | 色必久久 | 羞羞视频在线观免费观看 | 亚洲欧美视频在线观看 | 日韩av福利在线观看 | 国产高清精品一区二区三区 | 91视频网址 | 成人午夜激情 | 午夜资源| 一区影院| 韩国av一区二区 | 国产99视频精品免费视频7 | 在线视频中文字幕 | 欧美精品久久 | 91日韩在线 | 久久久久久免费观看 | av手机免费在线观看 | 国产欧美精品一区二区色综合朱莉 | 国产精品欧美一区二区三区不卡 | 国产精品久久久久久久久久久免费看 | 精品伊人 | 精品国产欧美一区二区三区不卡 | 日本三级日产三级国产三级 | 91美女视频 | 日日射影院 | 一区二区三区在线播放 |