最近在项目开发中,遇到需要一个个调用接口,保存数据的需求,但是一个个调太慢,思索着可以并发同时几个一起调,做个并发池,提高保存数据效率。
方式一(推荐)
假设最大支持的工作线为5个,先开启5个工作线,每执行完一个任务,从任务池取下一个任务,开始执行。保持五个工作线一直在执行任务-取任务-执行任务,直到任务池中的任务都取完,则直接返回,停止工作线。

实现代码
function startLimitPool(tasks, max, callback) {
const result = []
Promise.all(Array.from({ length: max }).map(() => {
return new Promise(resolve => {
function runTask() {
if (tasks.length <= 0) {
resolve()
return
}
const task = tasks.shift()
task().then((res) => {
result.push(res)
runTask()
})
}
runTask()
})
})).then(() => callback(result))
}
- 支持传入任务数组
- 支持全部执行完成后,返回结果,并执行回调函数
- 代码简洁
方式二
先投放任务,直到工作区满了,当某个任务执行结束后,通知继续存放任务,直到任务区满了,一直循环操作,直到工作区正在执行的任务数为0,表示全部执行完毕。然后返回全部执行结果,执行回调函数。

实现代码
function startLimitPool(tasks, max, callback) {
class TaskQueue {
constructor(maxNum) {
this.maxNum = maxNum
this.running = 0
this.queue = []
this.results = []
this.callback = null
}
pushTask(task) {
this.queue.push(task)
this.next()
}
next() {
while (this.running < this.maxNum && this.queue.length) {
const task = this.queue.shift()
task().then((res) => {
this.results.push(res)
}).finally(() => {
this.running--
this.next()
})
this.running++
}
if (typeof this.callback === "function" && this.running == 0) {
this.callback.call(null, this.results)
}
}
}
const queue = new TaskQueue(max)
queue.callback = callback
tasks.forEach(task => queue.pushTask(task));
}
方式三
npm中有挺多第三方包,比如 async-pool、es-promise-pool、p-limit等,但是实际使用起来还挺多问题,挑了使用比较多的async-pool
进行重写。
其中,具体实现原理可以查看Promise.all并发限制文章(这边文章提供的代码是存在问题的,但是原理讲得挺清楚的)。
基于这篇文章提供的思路,对代码进行改写,具体如下
/**
* promise并发限制调用
* @param {object[]} data - 调用的数据列表
* @param {number} maxLimit - 并发调用限制个数
* @param {function} iteratee - 处理单个节点的方法
* @returns {promise}
*/
export function promiseLimitPool({ data = [], maxLimit = 3, iteratee = () => {} } = {}, callback=()=>{}) {
const executing = [];
const enqueue = (index = 0) => {
// 边界处理
if (index === data.length) {
return Promise.all(executing);
}
// 每次调用enqueue, 初始化一个promise
const item = data[index];
function itemPromise(index) {
const promise = new Promise(async (resolve) => {
// 处理单个节点
await iteratee({ index, item: cloneDeep(item), data: cloneDeep(data) });
resolve(index);
}).then(() => {
// 执行结束,从executing删除自身
const delIndex = executing.indexOf(promise);
delIndex > -1 && executing.splice(delIndex, 1);
});
return promise;
}
// 插入executing数字,表示正在执行的promise
executing.push(itemPromise(index));
// 使用Promise.rece,每当executing数组中promise数量低于maxLimit,就实例化新的promise并执行
let race = Promise.resolve();
if (executing.length >= maxLimit) {
race = Promise.race(executing);
}
// 递归,直到遍历完
return race.then(() => enqueue(index + 1));
};
return enqueue();
}
// 示例
promiseLimitPool({
data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
maxLimit: 2,
iteratee: async ({ item }) => {
console.log('onClick -> item', item);
await Axios({
method: 'get',
url: `API接口地址`,
params: { page: 0, size: 9 },
});
},
});
缺点:没有提供全部成功后的回调函数(当然,这个也支持扩展);代码逻辑不是很简约
网友评论