美文网首页
RocketMQ总结之NameServer启动过程

RocketMQ总结之NameServer启动过程

作者: FrankerSung | 来源:发表于2019-02-14 23:10 被阅读18次

    NamesrvStartup时序图

    NameServer.png

    NettyRemoteServer.start()

    @Override
        public void start() {
            //创建一个DefaultEventExecutorGroup, 用于处理netty handler中的操作
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {
    
                    private AtomicInteger threadIndex = new AtomicInteger(0);
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });
    
            //初始化Netty:
            ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                    //Channel获取新连接的方式
                    .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    //设置TCP/IP协议listen函数中的backlog参数(用来初始化服务端可连接队列),
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //套接字选项中的SO_REUSEADDR:表示允许重复使用本地地址和端口
                    .option(ChannelOption.SO_REUSEADDR, true)
                    //该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。
                    //设置该选项后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                    .option(ChannelOption.SO_KEEPALIVE, false)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    //接收/发送Buffer大小
                    .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                    .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                    .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                    new HandshakeHandler(TlsSystemConfig.tlsMode))
                                .addLast(defaultEventExecutorGroup,
                                    //编解码
                                    new NettyEncoder(),
                                    new NettyDecoder(),
                                    //心跳检测
                                    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                    //连接管Handler,处理connect, disconnect, register, r/w , close等事件
                                    new NettyConnectManageHandler(),
                                    //处理接收到RemotingCommand消息后的事件, 收到服务器端响应后的相关操作
                                    new NettyServerHandler()
                                );
                        }
                    });
    
            //是否使用Netty内存池----默认启用
            if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
                childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            }
    
            //绑定Netty监听端口
            try {
                ChannelFuture sync = this.serverBootstrap.bind().sync();
                InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
                this.port = addr.getPort();
            } catch (InterruptedException e1) {
                throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
            }
    
            //启动 监听Channel各种事件 的线程
            if (this.channelEventListener != null) {
                this.nettyEventExecutor.start();
            }
    
            //每隔1秒扫描 responseTable 中是否有超时未回应的请求
            this.timer.scheduleAtFixedRate(new TimerTask() {
    
                @Override
                public void run() {
                    try {
                        NettyRemotingServer.this.scanResponseTable();
                    } catch (Throwable e) {
                        log.error("scanResponseTable exception", e);
                    }
                }
            }, 1000 * 3, 1000);
        }
    

    参考资料
    https://github.com/FrankerSun/RocketMQ-With-Comment

    相关文章

      网友评论

          本文标题:RocketMQ总结之NameServer启动过程

          本文链接:https://www.haomeiwen.com/subject/apweeqtx.html