美文网首页Nodejs
Node.js之Cluster模块分析与学习

Node.js之Cluster模块分析与学习

作者: 西葫芦炒胖子 | 来源:发表于2019-08-05 08:20 被阅读0次

    Cluster

    • 产生的原因

    • 用以解决多核CPU的利用率问题

    • 工作原理
      cluster模块是child_process和net模块的组合应用
      cluster启动时,它会在内部启动TCP服务器,在cluster.fork()子进程时,将这个TCP服务端socket的文件描述符发送给工作进程。如果进程是通过cluster.fork()复制出来的,那么它的环境变量里就存在NODE_UNIQUE_ID,如果工作进程中存在listen()侦听网络端口的调用,它将拿到该文件描述符,通过SO_REUSEADDR端口重用,从而实现多个子进程共享端口。


    源码地址

    常见问题

    
        问:cluster模块如何区分子进程和主进程
        答:判断环境变量是否含有NODE_UNQUE_ID,有则为子进程,没有则为主进程(源码 node/lib/cluster.js中可以查看)如下:
    
        const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
        module.exports = require(`internal/cluster/${childOrMaster}`);
    
        在(internal/cluster/master.js)文件搜索NODE_UNIQUE_ID ==》 上层为createWorkerProcess函数 ==》 上层为cluster.fork函数
    
        变量NODE_UNIQUE_ID 是在主进程fork子进程时传递进去的参数,因此采用cluster.fork 创建的主进程一定包含NDOE_UNIQUE_ID,而直接使用child_process.fork 的子进程没有NODE_UNIQUE_ID
    
        NODE_UNIQUE_ID 将作为主进程存储活跃的工作进程对象的键值
    
        问:主进程中农是否存在TCP服务器,什么时候创建的
        答:主进程fork子进程,子进程中有显式创建服务器的操作,但实际上在cluster模块下,子进程是把创建服务器所需要的数据发送给主进程,主进程来隐式创建TCP服务器。
    
        当子进程创建HTTP服务器时,http模块会调用net模块(http.Server继承net.Server),创建net.Server对象,同时侦听端口。创建net.Server实例,调用构造函数返回。创建的net.Server实例调用listen(),等待accpet连接。
    
        源码库(https://github.com/nodejs/node/blob/master/lib/internal/cluster/child.js)
    
        这是一个hack函数,当cluster fork 出来子进程只要调用listen方法,就屏蔽掉了,child源码如下:
        function listen(backlog){
            // TODO(bnoordhuis) Send a message to the master that tells it to
            // update the backlog size. The actual backlog should probably be
            // the largest requested size by any worker.
            return 0;
        }
    
        backlog 是已经连接但未进行accept处理的socket队列大小。node默认在socket层设置backlog默认值为511,这是因为nginx和redis默认设置的backlog值也为此。
    
        问:多个子进程与端口复用
        答:子进程中确实创建了net.Server对象,但它并没有像主进程在libuv层构建socket句柄,子进程的net.Server对象使用的是一个人为fake出的一个假句柄来'欺骗'使用者端口已侦听,这样做的目的是为了集群的负载均衡。   
    
        根据源码分析(child.js)当主进程发送创建服务器成功的消息后,子进程执行如下回调函数,它会根据主进程是否返回了handle句柄来选择函数执行。由于cluster默认采用RoundRobin调度策略,因此主进程返回的handle为null,执行rr函数,做了上文提到的hack操作,fake了一个假的handle对象,handle.listen 并没有调用libuv层的listen方法。子进程没有创建底层的服务端socket做侦听,所以在子进程创建的HTTP服务器侦听的端口根本不会出现端口复用的情况。
    
        send(message, (reply, handle) => {
            if (typeof obj._setServerData === 'function')
                obj._setServerData(reply.data);
    
            if (handle)
                shared(reply, handle, indexesKey, cb);  // Shared listen socket.
            else
                rr(reply, indexesKey, cb);              // Round-robin.
        });
    
    
        function rr(message, indexesKey, cb) {
            if (message.errno)
                return cb(message.errno, null);
    
            var key = message.key;
    
            function listen(backlog) {
                // TODO(bnoordhuis) Send a message to the master that tells it to
                // update the backlog size. The actual backlog should probably be
                // the largest requested size by any worker.
                return 0;
            }
            ...
    
            const handle = { close, listen, ref: noop, unref: noop };
    
            if (message.sockname) {
                handle.getsockname = getsockname;  // TCP handles only.
            }
    
            assert(handles.has(key) === false);
            handles.set(key, handle);
            cb(0, handle);
        }
    
        问:如何将请求分发个多个worker
        答:主进程的服务器中会创建RoundRobinHandle 决定分发请求给哪一个子进程,筛选出子进程后发送newconn消息给对应的子进程。子进程接收到newconn消息后,会调用内部的oncennectiono函数,先向主进程发送开始处理请求的消息,然后执行业务处理函数server.onconnection(node在C++层执行的js回调函数)
    
        源码(lib/internal/cluster/round_robin_handle.js)
    
        const message = { act: 'newconn', key: this.key };
    
        sendHelper(worker.process, message, handle, (reply) => {
            if (reply.accepted)
            handle.close();
            else
            this.distribute(0, handle);  // Worker is shutting down. Send to another.
    
            this.handoff(worker);
        });
    
        源码(lib/internal/cluster/child.js)
    
        function onmessage(message, handle) {
            if (message.act === 'newconn')
                onconnection(message, handle);
            else if (message.act === 'disconnect')
                _disconnect.call(worker, true);
        }
    
        // Round-robin connection.
        function onconnection(message, handle) {
            const key = message.key;
            const server = handles.get(key);
            const accepted = server !== undefined;
    
            send({ ack: message.seq, accepted });
    
            if (accepted)
                server.onconnection(0, handle);
        }
    
        请求分发策略:
        handloff 函数获取排队中的客户端请求,并通过IPC发送句柄handle和newconn消息,等待子进程返回。当子进程返回正在处理请求的消息时,执行handoff函数,继续分配请求给子进程,不管该子进程上次请求是否处理完成(node的异步特性和事件循环可以让单进程处理多请求).
        按照这样的策略,主进程的服务器每接受一个req请求,执行修改后的onconnection回调,执行distribute方法,在其内部调用handoff函数,进入该子进程的处理循环中。一旦主进程没有缓存的客户端请求时(this.handles为空),便会将当前子进程加入free空闲队列,等待主进程的下一步调度。这就是cluster的RoundRobin调度策略,每个子进程的处理逻辑都是一个闭环,直到主进程缓存的客户端请求处理完毕时,该子进程的处理闭环才被打开。
    
        源码:(https://github.com/nodejs/node/blob/master/lib/internal/cluster/round_robin_handle.js)
        
        //负责筛选出处理请求的子进程
        //this.free 数组存储空闲的子进程
        //this.handles 数组存放待处理的用户请求
        RoundRobinHandle.prototype.distribute = function(err, handle) {
            this.handles.push(handle);
            const worker = this.free.shift();
    
            if (worker)
                this.handoff(worker);
        };
    
        //发送消息和handle给对应的worker进程,处理业务逻辑
        RoundRobinHandle.prototype.handoff = function(worker) {
            if (this.all.has(worker.id) === false) {
                return;  // Worker is closing (or has closed) the server.
            }
    
            const handle = this.handles.shift();
    
            if (handle === undefined) {
                this.free.push(worker);  // Add to ready queue again.
                return;
            }
    
            const message = { act: 'newconn', key: this.key };
    
            sendHelper(worker.process, message, handle, (reply) => {
                if (reply.accepted)
                handle.close();
                else
                this.distribute(0, handle);  // Worker is shutting down. Send to another.
    
                this.handoff(worker);
            });
        };
    
    const cluster = require('cluster');                                      //  | |
    const http = require('http');                                            //  | |   都执行了
    const numCpus = require('os').cpus().length;                             //  | |------------
    
    if(cluster.isMaster){                                                    //    |
        console.log(`主进程 ${process.pid} 正在运行`);                                //    |
        for(var i = 0;i < numCpus; i++){                                           //    |
            cluster.fork();                                                  //    |
        }                                                                    //    |
    
        for(const id in cluster.workers){                                    //    |
            cluster.workers[id].on('message', messageHandler);               //    |  仅父进程执行
        }                                                                    //    |
    
        function messageHandler(msg) {                                       //    |
            if(msg.cmd && msg.cmd == 'notifyRequest'){                       //    |
                console.log("就是这么玩");                                    //    |
            }                                                                //    |
        }                                                                    //    |----------------------
    }
    else{
        //当子进程执行
        http.Server((req, res) => {                                          //  |
            res.writeHead(200);                                              //  |
            res.end("Hello world");                                          //  |   仅子进程执行
            process.send({cmd:"notifyRequest"});                             //  |
        }).listen(8000);    
    
        console.log(`工作进程 ${process.pid} 已启动`);
    }
    
    process.on("uncaughtException", (err) => {
        process.exit(1);
    })
    

    相关文章

      网友评论

        本文标题:Node.js之Cluster模块分析与学习

        本文链接:https://www.haomeiwen.com/subject/mjqpdctx.html