看了cluster不明白他是怎么搞得。为什么master进程没有监听端口号,就能实现集群。看了下资料和源码,这里做一下简单的总结。。
1.cluster.isMaster怎么识别的?主进程fork的时候给到子进程一个NODE_UNIQUE_ID。所以只要环境变量有这个参数就是子进程。
2.主进程没有创建服务器的语句,到底有没有创建?都知道http基于tcp传输层的,既然主进程没有创建服务器的语句,是否是通过子进程的相关操作实现的。这里翻开nodejs源码对于listen的实现:
Server.prototype.listen=function(...args) {
varnormalized=normalizeArgs(args);
varoptions=normalized[0];
varcb=normalized[1];
if(this._handle) {
thrownewerrors.Error('ERR_SERVER_ALREADY_LISTEN');
}
varhasCallback=(cb!==null);
if(hasCallback) {
this.once('listening', cb);
}
varbacklogFromArgs=
// (handle, backlog) or (path, backlog) or (port, backlog)
toNumber(args.length>1&&args[1])||
toNumber(args.length>2&&args[2]);// (port, host, backlog)
options=options._handle||options.handle||options;
// (handle[, backlog][, cb]) where handle is an object with a handle
if(optionsinstanceofTCP) {
this._handle=options;
this[async_id_symbol]=this._handle.getAsyncId();
listenInCluster(this,null,-1,-1, backlogFromArgs);
return this;
}
// (handle[, backlog][, cb]) where handle is an object with a fd
if(typeofoptions.fd==='number'&&options.fd>=0) {
listenInCluster(this,null,null,null, backlogFromArgs, options.fd);
return this;
}
// ([port][, host][, backlog][, cb]) where port is omitted,
// that is, listen(), listen(null), listen(cb), or listen(null, cb)
// or (options[, cb]) where options.port is explicitly set as undefined or
// null, bind to an arbitrary unused port
if(args.length===0|| typeofargs[0]==='function'||
(typeofoptions.port==='undefined'&&'port'inoptions)||
options.port===null) {
options.port=0;
}
// ([port][, host][, backlog][, cb]) where port is specified
// or (options[, cb]) where options.port is specified
// or if options.port is normalized as 0 before
varbacklog;
if(typeofoptions.port==='number'|| typeofoptions.port==='string') {
if(!isLegalPort(options.port)) {
thrownewRangeError('"port" argument must be >= 0 and < 65536');
}
backlog=options.backlog||backlogFromArgs;
// start TCP server listening on host:port
if(options.host) {
lookupAndListen(this, options.port|0, options.host, backlog,
options.exclusive);
}else{// Undefined host, listens on unspecified address
// Default addressType 4 will be used to search for master server
listenInCluster(this,null, options.port|0,4,
backlog,undefined, options.exclusive);
}
return this;
}
// (path[, backlog][, cb]) or (options[, cb])
// where path or options.path is a UNIX domain socket or Windows pipe
if(options.path&&isPipeName(options.path)) {
varpipeName=this._pipeName=options.path;
backlog=options.backlog||backlogFromArgs;
listenInCluster(this, pipeName,-1,-1,
backlog,undefined, options.exclusive);
return this;
}
thrownewError('Invalid listen argument: '+util.inspect(options));
};
可以看出参数可以是很多类型,这里针对端口进行分析,发现参数是端口时,执行了listenInCluster()函数然,翻开这个listenInCluster函数,发现代码是:
function listenInCluster(server, address, port, addressType,
backlog, fd, exclusive) {
exclusive= !!exclusive;
if(cluster===null) cluster=require('cluster');
if(cluster.isMaster||exclusive) {
// Will create a new handle
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd);
return;
}
constserverQuery={
address:address,
port:port,
addressType:addressType,
fd:fd,
flags:0
};
// Get the master's server handle, and listen on it
cluster._getServer(server, serverQuery, listenOnMasterHandle);
function listenOnMasterHandle(err, handle) {
err=checkBindError(err, port, handle);
if(err) {
varex=exceptionWithHostPort(err,'bind', address, port);
returnserver.emit('error', ex);
}
// Reuse master's server handle
server._handle=handle;
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd);
}
}
这个代码很有意思,如果是主进程执行server._listen2(address, port, addressType, backlog, fd);然后return
如果手机子进程发送serverQuery然后执行回调listenOnMasterHandle,这个回调里也是server._listen2(address, port, addressType, backlog, fd);
cluster_getServer的源码是:
cluster._getServer = function(obj, options, cb) {
const indexesKey = [options.address,
options.port,
options.addressType,
options.fd ].join(':');
if (indexes[indexesKey] === undefined)
indexes[indexesKey] = 0;
else
indexes[indexesKey]++;
const message = util._extend({
act: 'queryServer',
index: indexes[indexesKey],
data: null
}, options);
// Set custom data on handle (i.e. tls tickets key)
if (obj._getServerData)
message.data = obj._getServerData();
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
obj.once('listening', () => {
cluster.worker.state = 'listening';
const address = obj.address();
message.act = 'listening';
message.port = address && address.port || options.port;
send(message);
});
};
大概的意思是将这个worker的信息比如端口什么的发给主进程,按照内容起个服务器,然后就是
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
有句柄的话分享这个shock,没有的话执行rr函数:
function rr(message, indexesKey, cb) {
if (message.errno)
return cb(message.errno, null);
var key = message.key;
function listen(backlog) {
// TODO(bnoordhuis) Send a message to the master that tells it to
// update the backlog size. The actual backlog should probably be
// the largest requested size by any worker.
return 0;
}
function close() {
// lib/net.js treats server._handle.close() as effectively synchronous.
// That means there is a time window between the call to close() and
// the ack by the master process in which we can still receive handles.
// onconnection() below handles that by sending those handles back to
// the master.
if (key === undefined)
return;
send({ act: 'close', key });
delete handles[key];
delete indexes[indexesKey];
key = undefined;
}
function getsockname(out) {
if (key)
util._extend(out, message.sockname);
return 0;
}
// Faux handle. Mimics a TCPWrap with just enough fidelity to get away
// with it. Fools net.Server into thinking that it's backed by a real
// handle. Use a noop function for ref() and unref() because the control
// channel is going to keep the worker alive anyway.
const handle = { close, listen, ref: noop, unref: noop };
if (message.sockname) {
handle.getsockname = getsockname; // TCP handles only.
}
assert(handles[key] === undefined);
handles[key] = handle;
cb(0, handle);
}
发现这个listen函数 return了并没有操作,将函数给到handles,然后callback出去,由上面的server._handle=handle接收。所以其实工作进程的监听被hack了,并没有操作。。
这里再看下server._listen2,源码中Server.prototype._listen2=setupListenHandle;那么看下setupListenHandle的实现吧:
function setupListenHandle(address, port, addressType, backlog, fd) {
debug('setupListenHandle', address, port, addressType, backlog, fd);
// If there is not yet a handle, we need to create one and bind.
// In the case of a server sent via IPC, we don't need to do this.
if(this._handle) {
debug('setupListenHandle: have a handle already');
}else{
debug('setupListenHandle: create a handle');
varrval=null;
// Try to bind to the unspecified IPv6 address, see if IPv6 is available
if(!address&& typeoffd!=='number') {
rval=createServerHandle('::', port,6, fd);
if(typeofrval==='number') {
rval=null;
address='0.0.0.0';
addressType=4;
}else{
address='::';
addressType=6;
}
}
if(rval===null)
rval=createServerHandle(address, port, addressType, fd);
if(typeofrval==='number') {
varerror=exceptionWithHostPort(rval,'listen', address, port);
process.nextTick(emitErrorNT,this, error);
return;
}
this._handle=rval;
}
this[async_id_symbol]=getNewAsyncId(this._handle);
this._handle.onconnection=onconnection;
this._handle.owner=this;
// Use a backlog of 512 entries. We pass 511 to the listen() call because
// the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
// which will thus give us a backlog of 512 entries.
varerr=this._handle.listen(backlog||511);
if(err) {
varex=exceptionWithHostPort(err,'listen', address, port);
this._handle.close();
this._handle=null;
nextTick(this[async_id_symbol], emitErrorNT,this, ex);
return;
}
// generate connection key, this should be unique to the connection
this._connectionKey=addressType+':'+address+':'+port;
// unref the handle if the server was unref'ed prior to listening
if(this._unref)
this.unref();
nextTick(this[async_id_symbol], emitListeningNT,this);
}
这里发现代码成功了执行的时createServerHandle函数,听名字是创造socket句柄 ,如果_handle存在就不创建,不存在创建socket,如前面所写子进程已经创建了socket,所以不会再创建socket,所以子进程虽然listen了,但是其实只是表面的而已。具体服务器如何接收客户端请求,涉及到c,不会c,应该是调用一些底层的东西实现的吧。
网友评论