前言
Nodejs 中的代码执行在单线程中,非常脆弱,一旦出现了未捕获的异常,那么整个应用就会崩溃,在实际应用的场景中是无法忍受的。通常的解决方案是使用Nodejs的cluster模块,以master-worker模式启动多个应用实例。我们享受cluster带来的福利时候,同时有不少有疑问?
- 为什么我们应用代码有app.listen(port),但是cluster模块在多次fork这份代码时候,却没有报端口被占用?
- master是如何将接受的请求传递到worker中进行处理然后响应的
问题一
master进程再fork worker工作进程时候,会为其附上环境变量NODE_UNIQUE_ID
,是一个从零开始的递增数:
// cluster.js
function createWorkerProcess(id, env) {
// ...
workerEnv.NODE_UNIQUE_ID = '' + id;
// ...
return fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: execArgv,
stdio: cluster.settings.stdio,
gid: cluster.settings.gid,
uid: cluster.settings.uid
});
}
在随后的Nodejs初始化的时,会根据环境变量,来判断进程是否为cluster模块fork
出的工作进程,若是,则执行workerInit()
函数初始化环境,否则执行masterInit()
// lib/net.js
function listenInCluster(server, address, port, addressType,
backlog, fd, exclusive) {
exclusive = !!exclusive;
if (cluster === null) cluster = require('cluster');
if (cluster.isMaster || exclusive) {
// Will create a new handle
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd);
return;
}
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags: 0
};
// Get the master's server handle, and listen on it
cluster._getServer(server, serverQuery, listenOnMasterHandle);
function listenOnMasterHandle(err, handle) {
err = checkBindError(err, port, handle);
if (err) {
var ex = exceptionWithHostPort(err, 'bind', address, port);
return server.emit('error', ex);
}
// Reuse master's server handle
server._handle = handle;
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd);
}
}
在woker的初始化函数中,使用了cluster._getServer方法。这个函数定义在/lib/internal/cluster/child.js
中
cluster._getServer = function(obj, options, cb) {
// ....
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
obj.once('listening', () => {
// ...
send(message);
});
};
上面代码obj.once('listening')
我们可以看到worker进程Hack掉了net.server实例的listen
方法里的监听端口/描述符部分。使用RoundRobinHandle 进行负载均衡。
总结来说
- 端口仅由master进程中的内部TCP服务器监听了一次
- 不会出现端口被重复监听报错,是由于,worker进程中,最后执行监听端口操作的方法,已经被cluster 模块主动hack掉了
问题2
// /cluster/round_robin_handle
function RoundRobinHandle(key, address, port, addressType, fd, flags) {
// ...
this.server.once('listening', () => {
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
});
RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
const worker = this.free.shift();
if (worker)
this.handoff(worker);
};
RoundRobinHandle.prototype.handoff = function(worker) {
if (this.all.has(worker.id) === false) {
return; // Worker is closing (or has closed) the server.
}
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
// ...
this.handoff(worker);
});
}
}
/cluster/child
process.on('internalMessage', internal(worker, onmessage));
function onmessage(message, handle) {
if (message.act === 'newconn')
onconnection(message, handle);
else if (message.act === 'disconnect')
_disconnect.call(worker, true);
}
// Round-robin connection.
function onconnection(message, handle) {
// ...
send({ ack: message.seq, accepted });
if (accepted)
server.onconnection(0, handle);
}
- 所有的请求先统一经过内部的TCP服务器
- 在内部TCP服务器的请求处理逻辑中,有负载均衡的挑选出一个worker进程,将其发送一个
newconn
内部消息,随消息发送客户端句柄 - Worker进程接受此内部消息,根据客户端句柄创建
net.Socket
实例,执行业务逻辑返回
网友评论