美文网首页
实现一个简单的并发请求队列

实现一个简单的并发请求队列

作者: 这名字真不对 | 来源:发表于2019-04-15 21:09 被阅读0次

    前言

    最近维护一个老项目中的微信公众号h5的新需求,项目是Node.js服务,Node层负责前端路由、api聚合以及用户信息校验等工作,项目较大,上一次维护已经是3年前,现在不方便重构整个项目,新需求也依赖Node层做路由和api管理,因此需要在原项目内做开发。

    原项目的前端资源由fis3输出到指定目录,其中js、css、img资源会上传至cdn,由另一台服务器nginx代理(文件上传服务会由另一个Node服务处理),Node层只负责解析html的解析,以减少Node层业务的压力。新需求基于Vue开发,需要webpack打包,因此需要通过其他方法上传静态资源到nginx的服务器。

    文件上传请求是一个异步操作,那么是否可以通过并发请求,加快上传的速度呢?新需求是多页面,资源有上百个文件,同时并发上百个请求会对服务器造成过大压力,我们需要一定的机制让请求排队,除此以外,当上传失败时,还需要一定的重试能力。面对这些问题,接下来研究一下如何实现上述的需求。

    尝试与学习

    这里我参考了一篇并发请求的实践,可以先了解一下原文:

    不到50行代码实现一个能对请求并发数做限制的通用RequestDecorator - 作者:陈纪庚

    在大佬的基础上做了一些简单的改造,增加了重试的功能,同时项目开发中也遇到了一些小问题,以下是具体的实现,有注释说明:

    // 任务队列
    class RequestQueue {
      constructor(maxLimit = 5, retry = 2) {
        // 最大并发量
        this.maxLimit = maxLimit
        // 重试次数
        this.retry = retry
    
        // blocking queue 若当前请求并发量已经超过maxLimit,则将请求延迟到下某个任务完成,再执行该队列任务
        this.requestQueue = []
        // 当前并发量数目
        this.currentConcurrent = 0
        
        // 说明1:
        // 实际请求中,可能会异步的抛出多个error
        // 任务重试过程中,当catch到 error且 重试已到上限,会执行 next() 执行下一个任务,
        // 此时,如果有异常抛出前一个异步任务的,会无法捕获 
        // 因此通过全局时间捕获剩余的异步异常
        process.on("unhandledRejection", function(e){
          console.log(e);
        })
      }
    
      async run(request) {
        // 并发限制
        if (this.currentConcurrent >= this.maxLimit) {
          await this.startBlocking() // 等待执行,直到某个任务执行this.next()
        }
        // 队列+1
        this.currentConcurrent++
    
        // 设置队列中同一个任务尝试次数
        for (let retryCount = this.retry; retryCount > 0; retryCount--) {
          let done = false
          console.log('[ retryCount ]:' + retryCount)
          try {
            // 这里与大佬的方法有不同,这里需要传入一个包装好请求的Promise实例,如有需要也可以用pify将请求转成promise
            const result = await request() 
            // 执行成功则结束尝试
            done = true
            return Promise.resolve(result)
            // 如果有错误,会被捕获,不会执行resolve
          } catch (error) {
            console.log('[ request error ] - ' + error)
            // 最后一次重试失败时停止重试,返回报错
            if (retryCount === 1) {
              done = true
              return Promise.reject(error) // 错误只会抛出一次
            }
          } finally {
            // 如果已经结束重试,执行请求队列的下一个任务
            if (done) {
              this.currentConcurrent--
              this.next()
              break;
            }
          }
        }
      }
    
      next() {
        if (this.requestQueue.length <= 0) return
        const resolve = this.requestQueue.shift()
        resolve()  // 取出block promise 的resolve 执行
      }
    
      startBlocking() {
        let _resolve
        let promise2 = new Promise((resolve) => (_resolve = resolve))
        this.requestQueue.push(_resolve)
        return promise2 // 返回block promise 用于暂停队列的执行
      }
    }
    

    使用方式:

    const request = () => {
        return new Promise((resolve, reject) => {
         setTimeout(() => { resolve() }, 1000)
       })
    }
    
    const instance = new RequestQueue()
    
    const promises = []
    for (let i = 0; i < 100; i++ ) {
      promises.push(instance.run(request)
        .catch(err => {
            // 这里是否catch(err)取决于是否允许某个任务失败时,其他任务继续执行
           console.log(err)
        })
      )
    }
    
    Promise.all(promises)
      .catch(err => console.log(err)) 
      // 如果前面的push过程中不catch,则一旦有任务抛出错误,剩余的任务不再执行
    

    整个实现如上,与大佬的实践略有不同,仅供学习。实际生产中,更推荐使用开源社区成熟的库,async,这个库提供更全面的异步流控制,便于我们进行开发。

    相关文章

      网友评论

          本文标题:实现一个简单的并发请求队列

          本文链接:https://www.haomeiwen.com/subject/upipwqtx.html