美文网首页
async异步工具

async异步工具

作者: joyer_li | 来源:发表于2022-06-18 21:17 被阅读0次

    在es6中的async的语法中,可以参照java并发包实现一些有意思的异步工具,辅助在异步场景(一般指请求)下的开发。
    由于js是单线程,下面的实现都比java中实现简单 (抛除线程概念)。同时涉及到js的执行机制,宏任务,微任务,asyncpromise相关内容,需要提前具备这些知识。

    wait(等待)

    异步函数中,等待(相当于java线程中的阻塞)一段时间。

    实现代码:

    async function wait(time = 0) {
      await new Promise(resolve => setTimeout(resolve, time));
      // 避免转译成return await, 会在一些safari版本里面报错
      return undefined;
    }
    

    模拟使用代码:

    (async () => {
      console.time();
      await wait(1000);
      console.timeEnd(); // 输出: default: 1.002s
    })();
    

    Lock(锁)

    模拟java并发包中的Lock类实现锁操作。 保证同一个锁包围的异步代码执行过程中,同一时刻只有一段代码在执行。

    该锁不符合html5的的异步锁接口,而是提供一个java异步包中Lock接口的简单实现

    推荐使用场景:多个请求执行过程前,保证同一时刻只有一个token失效验证转换操作。

    实现代码:

    export type Resolve<T = any> = (value: T | PromiseLike<T>) => void;
    
    export type Reject = (reason?: any) => void;
    
    export interface FlatPromise<T = any> {
      promise: Promise<T>;
      resolve: Resolve<T>;
      reject: Reject;
    };
    
    interface Key {
      key: number,
      resolve: Resolve,
    };
    
    /**
    * 创建一个扁平的promise
    * @returns Prmise
    */
    function flatPromise<T = any>(): FlatPromise<T> {
      const result: any = {};
      const promise = new Promise<T>((resolve, reject) => {
        result.resolve = resolve;
        result.reject = reject;
      });
      result.promise = promise;
      return result as FlatPromise<T>;
    }
    
    class Lock {
      
      keys: Key[] = [];
      hasLock: boolean = false;
      idCount: number = 0;
      
      constructor() {
        this.keys = [];
        this.hasLock = false;
        this.idCount = 0;
      }
      
      _pushKey(resolve: Resolve) {
        this.idCount += 1;
        const key: Key = {
          key: this.idCount,
          resolve,
        };
        this.keys.push(key);
        return key;
      }
      
      _removeKey(key: Key) {
        const index = this.keys.findIndex(item => item.key === key.key);
        if (index >= 0) {
          this.keys.splice(index, 1);
        }
      }
      
      /**
      * 获取锁.
      * 如果当前锁已经锁定,那么就阻塞当前操作
      */
      async lock() {
        if (this.keys.length || this.hasLock) {
          const { promise, resolve } = flatPromise();
          this._pushKey(resolve);
          await promise;
          return null;
        }
        
        this.hasLock = true;
        return null;
      }
      
      /**
      * 尝试获取锁.
      * 该函数如果没有指定一个有效的time,则立马返回一个结果:如果获取到锁则为true,反之为false.
      * 如果指定一个有效的time(time=0有效),则返回一个promise对象,改对象返回的结果为是否获取到锁
      * @param time 最长等待时间
      */
      tryLock(time?: number) {
        if (time === undefined ||
            Number.isNaN(Math.floor(time)) || time < 0) {
          if (this.hasLock) {
            return false;
          }
          this.lock();
          return Promise.resolve(true);
        }
        
        if (!this.hasLock && !this.keys.length) {
          this.hasLock = true;
          return Promise.resolve(true);
        }
        
        const asyncFn = async () => {
          const { promise, resolve: res } = flatPromise();
          const key = this._pushKey(res);
          
          setTimeout(() => {
            this._removeKey(key);
            key.resolve(false);
          }, time);
          
          const isTimeout = await promise;
          
          return isTimeout !== false;
        };
        
        return asyncFn();
      }
      
      async lockFn(asyncFn: () => Promise<void>) {
        await this.lock();
        try {
          await asyncFn();
        } finally {
          this.unlock();
        }
      }
      
      /**
      * 释放锁
      */
      unlock() {
        if (this.keys.length === 0 && this.hasLock === true) {
          this.hasLock = false;
          return;
        }
        
        if (this.keys.length === 0) {
          return;
        }
        
        const index = Math.floor(Math.random() * this.keys.length);
        const key = this.keys[index];
        this._removeKey(key);
        key.resolve(undefined);
      }
      
      toString() {
        return `${this.keys.length}-${this.hasLock}`;
      }
    }
    

    模拟使用代码:

    function delay(callback: () => void, time: number) {
      return new Promise<void>((resolve) => setTimeout(() => {
        callback();
        resolve(undefined);
      }, time));
    }
    
    (async () => {
      const lock = new Lock();
      const syncResult: string[] = [];
      const unSyncResult: string[] = [];
      const withLockAsync = async () => {
        await lock.lock();
        
        await delay(() => {
          syncResult.push('1');
        }, Math.random() * 10);
        
        await delay(() => {
          syncResult.push('2');
        }, Math.random() * 10);
        
        await delay(() => {
          syncResult.push('3');
        }, Math.random() * 10);
        
        lock.unlock();
      };
      
      const withoutLockAsync = async () => {
        await delay(() => {
          unSyncResult.push('1');
        }, Math.random() * 3);
        
        await delay(() => {
          unSyncResult.push('2');
        }, Math.random() * 3);
        
        await delay(() => {
          unSyncResult.push('3');
        }, Math.random() * 3);
      };
      
      const taskList = [];
      for (let i = 0; i < 10; i += 1) {
        taskList.push(withLockAsync(), withoutLockAsync());
      }
      await Promise.all(taskList);
      
      // 输出1,2,3,1,2,3...
      // 证明withLockAsync函数中的代码同一时刻只有一个执行,不会被打算
      console.log(syncResult);
      // 输出的值不一定按照1,2,3,1,2,3...
      // 证明在普通的async函数中,await后的代码会被打乱
      console.log(unSyncResult);
    })();
    

    Semaphore(信号量)

    Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

    推荐使用场景:一般用于流量的控制,特别是公共资源有限的应用场景。例如控制同一时刻请求的连接数量,假设浏览器的请求连接数上限为10个,多个异步并发请求可以使用Semaphore来控制请求的异步执行个数最多为10个。

    简单实现代码:

    class Semaphore {
      constructor(permits) {
        this.permits = permits;
        this.execCount = 0;
        this.waitTaskList = [];
      }
      
      async acquire() {
        const that = this;
        this.execCount += 1;
        if (this.execCount <= this.permits) {
          // 为了保证按照调用顺序执行
          // 如果有等待的,那么先执行等待的,当前的挂起
          // 没有则快速通过
          if (that.waitTaskList.length !== 0) {
            const waitTask = this.waitTaskList.pop();
            waitTask();
            await new Promise((resolve) => {
              that.waitTaskList.push(resolve);
            });
          }
          return;
        }
        await new Promise((resolve) => {
          that.waitTaskList.push(resolve);
        });
      }
      
      release() {
        this.execCount -= 1;
        if (this.execCount < 0) {
          this.execCount = 0;
        }
        if (this.waitTaskList.length === 0) {
          return;
        }
        const waitTask = this.waitTaskList.pop();
        waitTask();
      }
    }
    

    模拟一个复杂的页面中复杂的请求场景:

    (async () => {
      const semaphore = new Semaphore(5);
      
      let doCount = 0;
      let currntCount = 0;
      const request = async (id) => {
        await semaphore.acquire();
        currntCount++;
        console.log(`执行请求${id}, 正在执行的个数${currntCount}`);
        await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500));
        semaphore.release();
        currntCount--;
        doCount++;
        console.log(`执行请求${id}结束,已经执行${doCount}次`);
      };
      const arr = new Array(10).fill(1);
      setTimeout(() => {
        // 依次执行10个请求
        arr.forEach((_, index) => {
          request(`1-${index}`);
        });
      }, 300)
      // 随机触发10个请求
      let index = 0;
      const timerId = setInterval(() => {
        if (index > 9) {
          clearInterval(timerId);
          return;
        }
        request(`2-${index}`);
        index++;
      }, Math.random() * 300 + 300);
      // 同时执行10个请求
      await Promise.all(arr.map(async (_, index) => {
        await request(`3-${index}`);
      }));
      // 等待上面的内容全部执行, 资源释放后,在执行4个请求
      setTimeout(() => {
        const lastArr = new Array(4).fill(1);
        lastArr.forEach((_, index) => {
          request(`4-${index}`);
        });
      }, 5000);
    })();
    

    执行结果:

    执行请求3-0, 正在执行的个数1
    执行请求3-1, 正在执行的个数2
    执行请求3-2, 正在执行的个数3
    执行请求3-3, 正在执行的个数4
    执行请求3-4, 正在执行的个数5
    执行请求3-2结束,已经执行1次
    执行请求2-1, 正在执行的个数5
    执行请求3-3结束,已经执行2次
    执行请求2-0, 正在执行的个数5
    ...
    执行请求3-8结束,已经执行29次
    执行请求3-6结束,已经执行30次
    执行请求4-0, 正在执行的个数1
    执行请求4-1, 正在执行的个数2
    执行请求4-2, 正在执行的个数3
    执行请求4-3, 正在执行的个数4
    执行请求4-3结束,已经执行31次
    执行请求4-0结束,已经执行32次
    执行请求4-2结束,已经执行33次
    执行请求4-1结束,已经执行34次
    

    CountDownLatch(倒计时闭锁)和CyclicBarrier(循环栅栏)

    在es的async中,一般情景的CountDownLatch可以直接用Promise.all替代。

    使用场景:复杂的Promise.all需求场景,支持在离散的多个异步函数中灵活使用(本人还没有碰到过),从而摆脱Promise.all在使用时需要一个promiseiterable类型的输入。

    代码实现:

    class CountDownLatch {
      
      constructor(count) {
        this.count = count;
        this.waitTaskList = [];
      }
      
      countDown() {
        this.count--;
        if (this.count <= 0) {
          this.waitTaskList.forEach(task => task());
        }
      }
      
      // 避免使用关键字,所以命名跟java中不一样
      async awaitExec() {
        if (this.count <= 0) {
          return;
        }
        const that = this;
        await new Promise((resolve) => {
          that.waitTaskList.push(resolve);
        });
      }
    }
    

    模拟使用代码:

    (async () => {
      const countDownLatch = new CountDownLatch(10);
      
      const request = async (id) => {
        console.log(`执行请求${id}`);
        await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500));
        console.log(`执行请求${id}结束`);
        countDownLatch.countDown();
      };
      
      const task = new Array(10).fill(1);
      // 后续的代码等同于
      // await Promise.all(task.map(async (_, index) => {
      //   await request(index);
      // }));
      task.forEach((_1, index) => {
        request(index);
      });
      await countDownLatch.awaitExec();
      console.log('执行完毕');
    })();
    

    CyclicBarrier抛除线程相关概念后,核心功能就是一个可以重复使用的CountDownLatch,这里就不实现了。

    相关文章

      网友评论

          本文标题:async异步工具

          本文链接:https://www.haomeiwen.com/subject/fxdlvrtx.html