Cluster
-
产生的原因
-
用以解决多核CPU的利用率问题
-
工作原理
cluster模块是child_process和net模块的组合应用
cluster启动时,它会在内部启动TCP服务器,在cluster.fork()子进程时,将这个TCP服务端socket的文件描述符发送给工作进程。如果进程是通过cluster.fork()复制出来的,那么它的环境变量里就存在NODE_UNIQUE_ID,如果工作进程中存在listen()侦听网络端口的调用,它将拿到该文件描述符,通过SO_REUSEADDR端口重用,从而实现多个子进程共享端口。
常见问题
问:cluster模块如何区分子进程和主进程
答:判断环境变量是否含有NODE_UNQUE_ID,有则为子进程,没有则为主进程(源码 node/lib/cluster.js中可以查看)如下:
const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrMaster}`);
在(internal/cluster/master.js)文件搜索NODE_UNIQUE_ID ==》 上层为createWorkerProcess函数 ==》 上层为cluster.fork函数
变量NODE_UNIQUE_ID 是在主进程fork子进程时传递进去的参数,因此采用cluster.fork 创建的主进程一定包含NDOE_UNIQUE_ID,而直接使用child_process.fork 的子进程没有NODE_UNIQUE_ID
NODE_UNIQUE_ID 将作为主进程存储活跃的工作进程对象的键值
问:主进程中农是否存在TCP服务器,什么时候创建的
答:主进程fork子进程,子进程中有显式创建服务器的操作,但实际上在cluster模块下,子进程是把创建服务器所需要的数据发送给主进程,主进程来隐式创建TCP服务器。
当子进程创建HTTP服务器时,http模块会调用net模块(http.Server继承net.Server),创建net.Server对象,同时侦听端口。创建net.Server实例,调用构造函数返回。创建的net.Server实例调用listen(),等待accpet连接。
源码库(https://github.com/nodejs/node/blob/master/lib/internal/cluster/child.js)
这是一个hack函数,当cluster fork 出来子进程只要调用listen方法,就屏蔽掉了,child源码如下:
function listen(backlog){
// TODO(bnoordhuis) Send a message to the master that tells it to
// update the backlog size. The actual backlog should probably be
// the largest requested size by any worker.
return 0;
}
backlog 是已经连接但未进行accept处理的socket队列大小。node默认在socket层设置backlog默认值为511,这是因为nginx和redis默认设置的backlog值也为此。
问:多个子进程与端口复用
答:子进程中确实创建了net.Server对象,但它并没有像主进程在libuv层构建socket句柄,子进程的net.Server对象使用的是一个人为fake出的一个假句柄来'欺骗'使用者端口已侦听,这样做的目的是为了集群的负载均衡。
根据源码分析(child.js)当主进程发送创建服务器成功的消息后,子进程执行如下回调函数,它会根据主进程是否返回了handle句柄来选择函数执行。由于cluster默认采用RoundRobin调度策略,因此主进程返回的handle为null,执行rr函数,做了上文提到的hack操作,fake了一个假的handle对象,handle.listen 并没有调用libuv层的listen方法。子进程没有创建底层的服务端socket做侦听,所以在子进程创建的HTTP服务器侦听的端口根本不会出现端口复用的情况。
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
function rr(message, indexesKey, cb) {
if (message.errno)
return cb(message.errno, null);
var key = message.key;
function listen(backlog) {
// TODO(bnoordhuis) Send a message to the master that tells it to
// update the backlog size. The actual backlog should probably be
// the largest requested size by any worker.
return 0;
}
...
const handle = { close, listen, ref: noop, unref: noop };
if (message.sockname) {
handle.getsockname = getsockname; // TCP handles only.
}
assert(handles.has(key) === false);
handles.set(key, handle);
cb(0, handle);
}
问:如何将请求分发个多个worker
答:主进程的服务器中会创建RoundRobinHandle 决定分发请求给哪一个子进程,筛选出子进程后发送newconn消息给对应的子进程。子进程接收到newconn消息后,会调用内部的oncennectiono函数,先向主进程发送开始处理请求的消息,然后执行业务处理函数server.onconnection(node在C++层执行的js回调函数)
源码(lib/internal/cluster/round_robin_handle.js)
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
handle.close();
else
this.distribute(0, handle); // Worker is shutting down. Send to another.
this.handoff(worker);
});
源码(lib/internal/cluster/child.js)
function onmessage(message, handle) {
if (message.act === 'newconn')
onconnection(message, handle);
else if (message.act === 'disconnect')
_disconnect.call(worker, true);
}
// Round-robin connection.
function onconnection(message, handle) {
const key = message.key;
const server = handles.get(key);
const accepted = server !== undefined;
send({ ack: message.seq, accepted });
if (accepted)
server.onconnection(0, handle);
}
请求分发策略:
handloff 函数获取排队中的客户端请求,并通过IPC发送句柄handle和newconn消息,等待子进程返回。当子进程返回正在处理请求的消息时,执行handoff函数,继续分配请求给子进程,不管该子进程上次请求是否处理完成(node的异步特性和事件循环可以让单进程处理多请求).
按照这样的策略,主进程的服务器每接受一个req请求,执行修改后的onconnection回调,执行distribute方法,在其内部调用handoff函数,进入该子进程的处理循环中。一旦主进程没有缓存的客户端请求时(this.handles为空),便会将当前子进程加入free空闲队列,等待主进程的下一步调度。这就是cluster的RoundRobin调度策略,每个子进程的处理逻辑都是一个闭环,直到主进程缓存的客户端请求处理完毕时,该子进程的处理闭环才被打开。
源码:(https://github.com/nodejs/node/blob/master/lib/internal/cluster/round_robin_handle.js)
//负责筛选出处理请求的子进程
//this.free 数组存储空闲的子进程
//this.handles 数组存放待处理的用户请求
RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
const worker = this.free.shift();
if (worker)
this.handoff(worker);
};
//发送消息和handle给对应的worker进程,处理业务逻辑
RoundRobinHandle.prototype.handoff = function(worker) {
if (this.all.has(worker.id) === false) {
return; // Worker is closing (or has closed) the server.
}
const handle = this.handles.shift();
if (handle === undefined) {
this.free.push(worker); // Add to ready queue again.
return;
}
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
handle.close();
else
this.distribute(0, handle); // Worker is shutting down. Send to another.
this.handoff(worker);
});
};
const cluster = require('cluster'); // | |
const http = require('http'); // | | 都执行了
const numCpus = require('os').cpus().length; // | |------------
if(cluster.isMaster){ // |
console.log(`主进程 ${process.pid} 正在运行`); // |
for(var i = 0;i < numCpus; i++){ // |
cluster.fork(); // |
} // |
for(const id in cluster.workers){ // |
cluster.workers[id].on('message', messageHandler); // | 仅父进程执行
} // |
function messageHandler(msg) { // |
if(msg.cmd && msg.cmd == 'notifyRequest'){ // |
console.log("就是这么玩"); // |
} // |
} // |----------------------
}
else{
//当子进程执行
http.Server((req, res) => { // |
res.writeHead(200); // |
res.end("Hello world"); // | 仅子进程执行
process.send({cmd:"notifyRequest"}); // |
}).listen(8000);
console.log(`工作进程 ${process.pid} 已启动`);
}
process.on("uncaughtException", (err) => {
process.exit(1);
})
网友评论