美文网首页
(javascript)100行代码实现可重试可并发的异步任务队

(javascript)100行代码实现可重试可并发的异步任务队

作者: Lawrence_4aee | 来源:发表于2020-06-12 12:32 被阅读0次

    背景

    昨天我们团队项目中有需要控制并发数,并且将大文件切片上传的需求,期望是并发的去上传文件切片,于是乎构思了一下,利用闭包实现了一个并发并且可重试的非顺序执行的任务队列

    构思

    首先需要有一个队列栈task_queue,和一个任务池task_pool
    当任务完成时,将完成的任务从task_queue中移除,同时从任务池task_pool中拿到新的任务入栈
    当任务失败的时候,也要将任务从task_queue中移除,同时加入task_pool的栈首,等待前一任务执行后进行重试

    构造函数

    constructor(options) {
        this.retry_counts_map = {} // 记录重试次数
        this.retry_times = options.retry_times || 3 // 默认重试3次
        this.concurrent_size = options.concurrent_size
        this.task_pool = (options.tasks || []).map((task, index) => {
          task.id = index
          return task
        })
        this.handler = options.handler
        this.retry_fun = options.retry_fun
        this.task_quene = []
        this.failed_tasks = []
        this.success_count = 0
      }
    

    利用闭包来记录任务,执行失败后进行重试,并在remove task之后再次执行start

    start () {
        if (this.task_quene.length === 0 && this.task_pool.length === 0) {
          this.handler({
            succees: this.success_count,
            failed: this.failed_tasks.length,
            failed_tasks: this.failed_tasks
          })
          return
        }
        for (let i = this.task_quene.length; i < this.concurrent_size; i++) {
          let task = this.task_pool.shift()
          if (!task) {
            return
          }
          this.push(task)
        }
      }
      push (task) {
        let mission = ((t) => {
          return t().then(res => {
            console.log('完成任务:' + task.id)
            this.success_count = this.success_count + 1
            this.remove(t)
          }).catch(e => {
            this.retry(task)
          })
        })(task)
        this.task_quene.push(mission)
      }
      remove (task) {
        this.task_quene.splice(this.task_quene.findIndex(t => {
          return t.id === task.id
        }), 1)
        this.start()
      }
    

    记录失败的任务,加入任务池的首位

    retry (task) {
        if (!this.retry_counts_map[task.id]) {
          this.retry_counts_map[task.id] = 1
        } else {
          this.retry_counts_map[task.id] = this.retry_counts_map[task.id] + 1
        }
        if (this.retry_counts_map[task.id] > this.retry_times) {
          console.log(`任务${task.id} 重试次数已达到三次`)
          this.failed_tasks.push(task)
        } else {
          if (this.retry_fun) {
            this.retry_fun().then(() => {
              console.log(`异步重试任务${task.id} 重试第${this.retry_counts_map[task.id]}次`)
              this.task_pool.unshift(task)
            }).catch(e => {
              this.failed_tasks.push(task)
            })
          } else {
            console.log(`任务${task.id} 重试第${this.retry_counts_map[task.id]}次`)
            this.task_pool.unshift(task)
          }
        }
        this.remove(task)
      }
    }
    

    完整代码&DEMO

    class TaskQueue {
      constructor(options) {
        this.retry_counts_map = {} // 记录重试次数
        this.retry_times = options.retry_times || 3 // 默认重试3次
        this.concurrent_size = options.concurrent_size
        this.task_pool = (options.tasks || []).map((task, index) => {
          task.id = index
          return task
        })
        this.handler = options.handler
        this.retry_fun = options.retry_fun
        this.task_quene = []
        this.failed_tasks = []
        this.success_count = 0
      }
    
      start () {
        if (this.task_quene.length === 0 && this.task_pool.length === 0) {
          this.handler({
            succees: this.success_count,
            failed: this.failed_tasks.length,
            failed_tasks: this.failed_tasks
          })
          return
        }
        for (let i = this.task_quene.length; i < this.concurrent_size; i++) {
          let task = this.task_pool.shift()
          if (!task) {
            return
          }
          this.push(task)
        }
      }
      push (task) {
        let mission = ((t) => {
          return t().then(res => {
            console.log('完成任务:' + task.id)
            this.success_count = this.success_count + 1
            this.remove(t)
          }).catch(e => {
            this.retry(task)
          })
        })(task)
        this.task_quene.push(mission)
      }
      remove (task) {
        this.task_quene.splice(this.task_quene.findIndex(t => {
          return t.id === task.id
        }), 1)
        this.start()
      }
      retry (task) {
        if (!this.retry_counts_map[task.id]) {
          this.retry_counts_map[task.id] = 1
        } else {
          this.retry_counts_map[task.id] = this.retry_counts_map[task.id] + 1
        }
        if (this.retry_counts_map[task.id] > this.retry_times) {
          console.log(`任务${task.id} 重试次数已达到三次`)
          this.failed_tasks.push(task)
        } else {
          if (this.retry_fun) {
            this.retry_fun().then(() => {
              console.log(`异步重试任务${task.id} 重试第${this.retry_counts_map[task.id]}次`)
              this.task_pool.unshift(task)
            }).catch(e => {
              this.failed_tasks.push(task)
            })
          } else {
            console.log(`任务${task.id} 重试第${this.retry_counts_map[task.id]}次`)
            this.task_pool.unshift(task)
          }
        }
        this.remove(task)
      }
    }
    
    
    const randomNum = function (minNum, maxNum) {
      switch (arguments.length) {
        case 1:
          return parseInt(Math.random() * minNum + 1, 10);
        case 2:
          return parseInt(Math.random() * (maxNum - minNum + 1) + minNum, 10);
        default:
          return 0;
      }
    }
    
    const __main__ = function () {
      let tasks = []
      for (let i = 0; i < 20; i++) {
        tasks.push((() => {
          return new Promise((resolve, reject) => {
            setTimeout(function () {
              let num = randomNum(0, 100)
              if (num > 50) {
                resolve()
              } else {
                reject()
              }
            }, randomNum(300, 800))
          })
        }))
      }
      const task_queue = new TaskQueue({
        retry_times: 3, // 默认重试3次
        concurrent_size: 5,
        tasks: tasks,
        handler (res) {
          console.log(res)
        }
      })
      task_queue.start()
    }
    
    __main__()
    

    相关文章

      网友评论

          本文标题:(javascript)100行代码实现可重试可并发的异步任务队

          本文链接:https://www.haomeiwen.com/subject/gfugtktx.html