美文网首页
bagpipe 源码解析

bagpipe 源码解析

作者: wpzero | 来源:发表于2016-03-22 15:05 被阅读0次

    这个也是朴大的一个解决异步并发
    控制的包。
    抄朴大的一段话,来描述应用场景。
    在node中,我们可以十分方便地异步发起并行的调用。使用下面的代码,我们可以轻松发起100次异步调用

    for(var i = 0;i < 100; i++){
      async();
    }
    

    但是如果并发量过大,我们的下层服务器将会吃不消。如果是对文件系统进行大量并发调用,操作系统的文件描述符数量会瞬间用光,抛出如下,错误

    Error: EMFILE, too many open files
    

    可以看出,同步I/O和异步I/O的显著差距:同步I/O因为每一个I/O都是彼此阻塞的,在循环体中,总是一个接着一个的调用,不会出现消耗文件描述符太多的情况,同时性能也一下, 对于异步I/O,虽然并发容易实现,但是由于太容易实现,依然需要控制。换言之,尽管是要压榨底层系统的性能,但是需要给与一定的过载保护,以防止过犹不及。
    bagpipe的应用呢,就是维护一个队列来控制并发。
    具体应用代码如下:

    var Bagpipe = require('bagpipe');
    var bagpipe = new Bagpipe(10);
    for(var i = 1; i < 10; i++){
      bagpipe.push(async,  function(){
      });
    }
    bagpipe.on('full', function(length){
      console.log('底层系统处理不能及时完成,队列堵塞,目前队列长度' + length);
    });
    

    下面就来看一看bagpipe的源码吧。
    这里朴大的bagpipe是继承了nodejs的EventEmitter

    构造函数

    var Bagpipe = function (limit, options) {
      events.EventEmitter.call(this);
      // 活跃的任务数(并发数)
      this.limit = limit;
      // 当前活跃的任务数量
      this.active = 0;
      // 异步处理的队列
      this.queue = [];
      // 一些配置信息
      this.options = {
        // 是否应用控制并发
        disabled: false,
        // 如果异步事件长度超过了queueLength是否还进入队列
        refuse: false,
        // 根据limit来算队列的长度,用于refuse
        ratio: 1,
        // 超时的时间,如果超过了这个时间任务失败
        timeout: null
      };
      if (typeof options === 'boolean') {
        options = {
          disabled: options
        };
      }
      options = options || {};
      for (var key in this.options) {
        if (options.hasOwnProperty(key)) {
          this.options[key] = options[key];
        }
      }
      // queue length
      this.queueLength = Math.round(this.limit * (this.options.ratio || 1));
    };
    util.inherits(Bagpipe, events.EventEmitter);
    

    push 方法(添加异步任务)

    Bagpipe.prototype.push = function (method) {
      // 处理异步任务method之外的其他参数
      var args = [].slice.call(arguments, 1);
      // 异步任务的回调函数
      var callback = args[args.length - 1];
      if (typeof callback !== 'function') {
        args.push(function () {});
      }
      // 如果不控制,或者limit小于1,那么就立即执行
      if (this.options.disabled || this.limit < 1) {
        method.apply(null, args);
        return this;
      }
    
      // 队列长度也超过限制值时
      if (this.queue.length < this.queueLength || !this.options.refuse) {
        this.queue.push({
          method: method,
          args: args
        });
      } else {
        var err = new Error('Too much async call in queue');
        err.name = 'TooMuchAsyncCallError';
        callback(err);
      }
    
      if (this.queue.length > 1) {
        this.emit('full', this.queue.length);
      }
      // 执行next方法,来检测是否可以执行一个异步任务
      this.next();
      return this;
    };
    

    next方法,用于检测是否可以并且有异步任务执行

    Bagpipe.prototype.next = function () {
      var that = this;
      // 活跃的任务数小于限制数
      if (that.active < that.limit && that.queue.length) {
        var req = that.queue.shift();
        //执行异步任务
        that.run(req.method, req.args);
      }
    };
    
    // 异步任务执行成功结束,之后调用的内部方法
    Bagpipe.prototype._next = function () {
      //活跃数减一
      this.active--;
      this.next();
    };
    

    执行异步任务 run

    Bagpipe.prototype.run = function (method, args) {
      var that = this;
      // 活跃数,并行执行的任务数加一
      that.active++;
      var callback = args[args.length - 1];
      var timer = null;
      var called = false;
    
      // 添加超时逻辑
      args[args.length - 1] = function (err) {
        // anyway, clear the timer
        // 如果有timer,就要clear掉这个timer
        if (timer) {
          clearTimeout(timer);
          timer = null;
        }
        // 没有过期之前
        if (!called) {
          that._next();
          callback.apply(null, arguments);
        // 过期了,所以执行抛错
        } else {
          // pass the outdated error
          if (err) {
            that.emit('outdated', err);
          }
        }
      };
    
      // 设置一个timer,来防止超时
      var timeout = that.options.timeout;
      if (timeout) {
        timer = setTimeout(function () {
          // set called as true
          called = true;
          that._next();
          // pass the exception
          var err = new Error(timeout + 'ms timeout');
          err.name = 'BagpipeTimeoutError';
          err.data = {
            name: method.name,
            method: method.toString(),
            args: args.slice(0, -1)
          };
          callback(err);
        }, timeout);
      }
      // 执行异步任务
      method.apply(null, args);
    };
    

    其实在bagpipe中维护一个异步并发任务的队列,来使得最大的并发数也小于limit的数。

    相关文章

      网友评论

          本文标题:bagpipe 源码解析

          本文链接:https://www.haomeiwen.com/subject/htvklttx.html