美文网首页
cluster的一些理解

cluster的一些理解

作者: yydounai | 来源:发表于2017-11-02 11:32 被阅读0次

    看了cluster不明白他是怎么搞得。为什么master进程没有监听端口号,就能实现集群。看了下资料和源码,这里做一下简单的总结。。

    1.cluster.isMaster怎么识别的?主进程fork的时候给到子进程一个NODE_UNIQUE_ID。所以只要环境变量有这个参数就是子进程。

    2.主进程没有创建服务器的语句,到底有没有创建?都知道http基于tcp传输层的,既然主进程没有创建服务器的语句,是否是通过子进程的相关操作实现的。这里翻开nodejs源码对于listen的实现:

    Server.prototype.listen=function(...args) {
    
    varnormalized=normalizeArgs(args);
    
    varoptions=normalized[0];
    
    varcb=normalized[1];
    
    if(this._handle) {
    
    thrownewerrors.Error('ERR_SERVER_ALREADY_LISTEN');
    
    }
    
    varhasCallback=(cb!==null);
    
    if(hasCallback) {
    
    this.once('listening', cb);
    
    }
    
    varbacklogFromArgs=
    
    // (handle, backlog) or (path, backlog) or (port, backlog)
    
    toNumber(args.length>1&&args[1])||
    
    toNumber(args.length>2&&args[2]);// (port, host, backlog)
    
    options=options._handle||options.handle||options;
    
    // (handle[, backlog][, cb]) where handle is an object with a handle
    
    if(optionsinstanceofTCP) {
    
    this._handle=options;
    
    this[async_id_symbol]=this._handle.getAsyncId();
    
    listenInCluster(this,null,-1,-1, backlogFromArgs);
    
    return this;
    
    }
    
    // (handle[, backlog][, cb]) where handle is an object with a fd
    
    if(typeofoptions.fd==='number'&&options.fd>=0) {
    
    listenInCluster(this,null,null,null, backlogFromArgs, options.fd);
    
    return this;
    
    }
    
    // ([port][, host][, backlog][, cb]) where port is omitted,
    
    // that is, listen(), listen(null), listen(cb), or listen(null, cb)
    
    // or (options[, cb]) where options.port is explicitly set as undefined or
    
    // null, bind to an arbitrary unused port
    
    if(args.length===0|| typeofargs[0]==='function'||
    
    (typeofoptions.port==='undefined'&&'port'inoptions)||
    
    options.port===null) {
    
    options.port=0;
    
    }
    
    // ([port][, host][, backlog][, cb]) where port is specified
    
    // or (options[, cb]) where options.port is specified
    
    // or if options.port is normalized as 0 before
    
    varbacklog;
    
    if(typeofoptions.port==='number'|| typeofoptions.port==='string') {
    
    if(!isLegalPort(options.port)) {
    
    thrownewRangeError('"port" argument must be >= 0 and < 65536');
    
    }
    
    backlog=options.backlog||backlogFromArgs;
    
    // start TCP server listening on host:port
    
    if(options.host) {
    
    lookupAndListen(this, options.port|0, options.host, backlog,
    
    options.exclusive);
    
    }else{// Undefined host, listens on unspecified address
    
    // Default addressType 4 will be used to search for master server
    
    listenInCluster(this,null, options.port|0,4,
    
    backlog,undefined, options.exclusive);
    
    }
    
    return this;
    
    }
    
    // (path[, backlog][, cb]) or (options[, cb])
    
    // where path or options.path is a UNIX domain socket or Windows pipe
    
    if(options.path&&isPipeName(options.path)) {
    
    varpipeName=this._pipeName=options.path;
    
    backlog=options.backlog||backlogFromArgs;
    
    listenInCluster(this, pipeName,-1,-1,
    
    backlog,undefined, options.exclusive);
    
    return this;
    
    }
    
    thrownewError('Invalid listen argument: '+util.inspect(options));
    
    };
    

    可以看出参数可以是很多类型,这里针对端口进行分析,发现参数是端口时,执行了listenInCluster()函数然,翻开这个listenInCluster函数,发现代码是:

    
    function listenInCluster(server, address, port, addressType,
    
    backlog, fd, exclusive) {
    
    exclusive= !!exclusive;
    
    if(cluster===null) cluster=require('cluster');
    
    if(cluster.isMaster||exclusive) {
    
    // Will create a new handle
    
    // _listen2 sets up the listened handle, it is still named like this
    
    // to avoid breaking code that wraps this method
    
    server._listen2(address, port, addressType, backlog, fd);
    
    return;
    
    }
    
    constserverQuery={
    
    address:address,
    
    port:port,
    
    addressType:addressType,
    
    fd:fd,
    
    flags:0
    
    };
    
    // Get the master's server handle, and listen on it
    
    cluster._getServer(server, serverQuery, listenOnMasterHandle);
    
    function listenOnMasterHandle(err, handle) {
    
    err=checkBindError(err, port, handle);
    
    if(err) {
    
    varex=exceptionWithHostPort(err,'bind', address, port);
    
    returnserver.emit('error', ex);
    
    }
    
    // Reuse master's server handle
    
    server._handle=handle;
    
    // _listen2 sets up the listened handle, it is still named like this
    
    // to avoid breaking code that wraps this method
    
    server._listen2(address, port, addressType, backlog, fd);
    
    }
    
    }
    

    这个代码很有意思,如果是主进程执行server._listen2(address, port, addressType, backlog, fd);然后return

    如果手机子进程发送serverQuery然后执行回调listenOnMasterHandle,这个回调里也是server._listen2(address, port, addressType, backlog, fd);
    cluster_getServer的源码是:

    cluster._getServer = function(obj, options, cb) {
      const indexesKey = [options.address,
                          options.port,
                          options.addressType,
                          options.fd ].join(':');
    
      if (indexes[indexesKey] === undefined)
        indexes[indexesKey] = 0;
      else
        indexes[indexesKey]++;
    
      const message = util._extend({
        act: 'queryServer',
        index: indexes[indexesKey],
        data: null
      }, options);
    
      // Set custom data on handle (i.e. tls tickets key)
      if (obj._getServerData)
        message.data = obj._getServerData();
    
      send(message, (reply, handle) => {
        if (typeof obj._setServerData === 'function')
          obj._setServerData(reply.data);
    
        if (handle)
          shared(reply, handle, indexesKey, cb);  // Shared listen socket.
        else
          rr(reply, indexesKey, cb);              // Round-robin.
      });
    
      obj.once('listening', () => {
        cluster.worker.state = 'listening';
        const address = obj.address();
        message.act = 'listening';
        message.port = address && address.port || options.port;
        send(message);
      });
    };
    

    大概的意思是将这个worker的信息比如端口什么的发给主进程,按照内容起个服务器,然后就是

      if (handle)
          shared(reply, handle, indexesKey, cb);  // Shared listen socket.
        else
          rr(reply, indexesKey, cb);              // Round-robin.
      });
    

    有句柄的话分享这个shock,没有的话执行rr函数:

    function rr(message, indexesKey, cb) {
      if (message.errno)
        return cb(message.errno, null);
    
      var key = message.key;
    
      function listen(backlog) {
        // TODO(bnoordhuis) Send a message to the master that tells it to
        // update the backlog size. The actual backlog should probably be
        // the largest requested size by any worker.
        return 0;
      }
    
      function close() {
        // lib/net.js treats server._handle.close() as effectively synchronous.
        // That means there is a time window between the call to close() and
        // the ack by the master process in which we can still receive handles.
        // onconnection() below handles that by sending those handles back to
        // the master.
        if (key === undefined)
          return;
    
        send({ act: 'close', key });
        delete handles[key];
        delete indexes[indexesKey];
        key = undefined;
      }
    
      function getsockname(out) {
        if (key)
          util._extend(out, message.sockname);
    
        return 0;
      }
    
      // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
      // with it. Fools net.Server into thinking that it's backed by a real
      // handle. Use a noop function for ref() and unref() because the control
      // channel is going to keep the worker alive anyway.
      const handle = { close, listen, ref: noop, unref: noop };
    
      if (message.sockname) {
        handle.getsockname = getsockname;  // TCP handles only.
      }
    
      assert(handles[key] === undefined);
      handles[key] = handle;
      cb(0, handle);
    }
    

    发现这个listen函数 return了并没有操作,将函数给到handles,然后callback出去,由上面的server._handle=handle接收。所以其实工作进程的监听被hack了,并没有操作。。

    这里再看下server._listen2,源码中Server.prototype._listen2=setupListenHandle;那么看下setupListenHandle的实现吧:

    
    function setupListenHandle(address, port, addressType, backlog, fd) {
    
    debug('setupListenHandle', address, port, addressType, backlog, fd);
    
    // If there is not yet a handle, we need to create one and bind.
    
    // In the case of a server sent via IPC, we don't need to do this.
    
    if(this._handle) {
    
    debug('setupListenHandle: have a handle already');
    
    }else{
    
    debug('setupListenHandle: create a handle');
    
    varrval=null;
    
    // Try to bind to the unspecified IPv6 address, see if IPv6 is available
    
    if(!address&& typeoffd!=='number') {
    
    rval=createServerHandle('::', port,6, fd);
    
    if(typeofrval==='number') {
    
    rval=null;
    
    address='0.0.0.0';
    
    addressType=4;
    
    }else{
    
    address='::';
    
    addressType=6;
    
    }
    
    }
    
    if(rval===null)
    
    rval=createServerHandle(address, port, addressType, fd);
    
    if(typeofrval==='number') {
    
    varerror=exceptionWithHostPort(rval,'listen', address, port);
    
    process.nextTick(emitErrorNT,this, error);
    
    return;
    
    }
    
    this._handle=rval;
    
    }
    
    this[async_id_symbol]=getNewAsyncId(this._handle);
    
    this._handle.onconnection=onconnection;
    
    this._handle.owner=this;
    
    // Use a backlog of 512 entries. We pass 511 to the listen() call because
    
    // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
    
    // which will thus give us a backlog of 512 entries.
    
    varerr=this._handle.listen(backlog||511);
    
    if(err) {
    
    varex=exceptionWithHostPort(err,'listen', address, port);
    
    this._handle.close();
    
    this._handle=null;
    
    nextTick(this[async_id_symbol], emitErrorNT,this, ex);
    
    return;
    
    }
    
    // generate connection key, this should be unique to the connection
    
    this._connectionKey=addressType+':'+address+':'+port;
    
    // unref the handle if the server was unref'ed prior to listening
    
    if(this._unref)
    
    this.unref();
    
    nextTick(this[async_id_symbol], emitListeningNT,this);
    
    }
    

    这里发现代码成功了执行的时createServerHandle函数,听名字是创造socket句柄 ,如果_handle存在就不创建,不存在创建socket,如前面所写子进程已经创建了socket,所以不会再创建socket,所以子进程虽然listen了,但是其实只是表面的而已。具体服务器如何接收客户端请求,涉及到c,不会c,应该是调用一些底层的东西实现的吧。

    相关文章

      网友评论

          本文标题:cluster的一些理解

          本文链接:https://www.haomeiwen.com/subject/qusopxtx.html