美文网首页
RocketMQ总结之NameServer启动过程

RocketMQ总结之NameServer启动过程

作者: FrankerSung | 来源:发表于2019-02-14 23:10 被阅读18次

NamesrvStartup时序图

NameServer.png

NettyRemoteServer.start()

@Override
    public void start() {
        //创建一个DefaultEventExecutorGroup, 用于处理netty handler中的操作
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        //初始化Netty:
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                //Channel获取新连接的方式
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                //设置TCP/IP协议listen函数中的backlog参数(用来初始化服务端可连接队列),
                .option(ChannelOption.SO_BACKLOG, 1024)
                //套接字选项中的SO_REUSEADDR:表示允许重复使用本地地址和端口
                .option(ChannelOption.SO_REUSEADDR, true)
                //该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。
                //设置该选项后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                //接收/发送Buffer大小
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                            .addLast(defaultEventExecutorGroup,
                                //编解码
                                new NettyEncoder(),
                                new NettyDecoder(),
                                //心跳检测
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                //连接管Handler,处理connect, disconnect, register, r/w , close等事件
                                new NettyConnectManageHandler(),
                                //处理接收到RemotingCommand消息后的事件, 收到服务器端响应后的相关操作
                                new NettyServerHandler()
                            );
                    }
                });

        //是否使用Netty内存池----默认启用
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        //绑定Netty监听端口
        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        //启动 监听Channel各种事件 的线程
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        //每隔1秒扫描 responseTable 中是否有超时未回应的请求
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

参考资料
https://github.com/FrankerSun/RocketMQ-With-Comment

相关文章

网友评论

      本文标题:RocketMQ总结之NameServer启动过程

      本文链接:https://www.haomeiwen.com/subject/apweeqtx.html