p-limit源码阅读

作者: 拜仁的月饼 | 来源:发表于2023-07-27 12:59 被阅读0次

    p-limit源码

    开篇

    源码统共68行,硬要背也是可以背下来的,但更多时候要“知其然”。因此写一篇源码阅读。

    正文

    该库的用法: (源码摘自: 手写 p-limit,40 行代码实现并发控制--神光的编程秘籍)

    import pLimit from 'p-limit';
    
    const generator = pLimit(2);
    const input = [
        generator(() => fetchSomething('foo')),
        generator(() => fetchSomething('bar')),
        generator(() => doSomething())
    ];
    
    const result = await Promise.all(input);
    console.log(result);
    

    上面代码意义是多个异步逻辑并行执行,最大并发数为2. 那么这一逻辑是怎样实现呢,这就要翻看源码了。

    源码第一行引用了第三方队列实现yocto-queue,在看源码中把它当一个数组就可以。

    import Queue from 'yocto-queue';
    

    快速翻看一遍源码,发现源码最终导出的只是一个pLimit函数,参数concurrencynumber类型,返回结果为一个generator函数。那么,我们可以按图索骥,看一下generator函数是如何实现的?

    const generator = (fn, ...args) => new Promise(resolve => {
        // 从字面理解上来看,该函数的意思是"入队"
        enqueue(fn, resolve, args);
    });
    

    仅有一行,逻辑是返回一个Promise,初始化时将要执行的函数、resolve、参数都推到队列中去。继续按图索骥,看一下队列定义和入队都做了什么。

    源码中除却前三行的错误处理逻辑外,马上就是队列定义和activeCount定义。

    // 队列定义, 从yocto-queue引入的队列类
    const queue = new Queue();
    
    let activeCount = 0;
    

    接下来跳到enqueue函数的实现:

    // 直接copy源码
    const enqueue = (fn, resolve, args) => {
        // run函数执行结果入队。
        // run函数传入fn, Promise.resov, fn的参数args,并绑定this为undefined
        queue.enqueue(run.bind(undefined, fn, resolve, args));
    
        (async () => {
            // This function needs to wait until the next microtask before comparing
            // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
            // when the run function is dequeued and called. The comparison in the if-statement
            // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
    
            // 上述为英文注释,英文好的同学可以一读。下面是我的翻译
            // 该IIFE (立即执行函数) 在activeCount与concurrency作比较之前须等待下一个微任务执行
            // 因为activeCount在run函数出队和调用时是异步更新的
            // 在if语句中 activeCount与concurrency作比较这一过程需要异步进行,以便获取最新的activeCount
            await Promise.resolve();
    
            if (activeCount < concurrency && queue.size > 0) {
                queue.dequeue()();
            }
        })();
    };
    

    这段代码看的是不是感觉不知所云?即使是读了一遍英文+中文注释后也感觉到这样?即使如此,还是让我的抽丝剥茧一下,直接看代码。enqueue函数只作了两件事:

    1. 将一个异步任务推到队列中
    2. 立即执行一个IIFE。

    由此引出的两个问题是:

    1. run函数做了什么?
    2. IIFE中做了什么?

    首先回答第一个问题。还是去看run函数源码。由于run函数与next函数密切相关,所以源码一并粘贴过来:

    const next = () => {
        // activecount减1
        activeCount--;
        // 如果还有任务,让任务出队并执行
        if (queue.size > 0) {
            queue.dequeue()();
        }
    };
    
    const run = async (fn, resolve, args) => {
        // activecount加1
        activeCount++;
                    
        // 异步任务执行结果
        const result = (async () => fn(...args))();
      
        // resolve获取执行结果
        resolve(result);
    
        try {
            await result;
        } catch {}
          
        // 当前任务执行完毕后下一个
        next();
    };
    

    第二个问题,IIFE中的逻辑其实与next函数大同小异, 都是只要activeCountconcurrency小,并且队列有任务,那么就让任务出队并执行。

    以上是p-limit的核心实现。简单综述一下流程:

    1. 维护一个队列,队列任务数量上限是指定的参数;
    2. 待异步函数完毕后,立即让执行完的函数出队,然后下一个异步函数入队。但注意不要超过并发限制。

    相关文章

      网友评论

        本文标题:p-limit源码阅读

        本文链接:https://www.haomeiwen.com/subject/rzmlpdtx.html