美文网首页
(一)namesrv是如何启动以及初始化的

(一)namesrv是如何启动以及初始化的

作者: guessguess | 来源:发表于2021-06-08 16:37 被阅读0次

    在之前使用rocketmq时,很多细节并没有了解到。后面直接从网上下载了4.8版本的源码,直接进行阅读。
    入口还是很好找的,不过感觉看这块源码,需要一些nio的基础,最好有使用或者了解过Netty的源码,阅读起来会更方便。

    感觉直接贴图比较方便一点。下面是namesrv的启动类的位置。


    namesrv的启动类

    先来说说namesrv是做什么的。这个模块其实相当于一个注册中心的功能,实时监控broker的存活状态,而客户端(生产者 或者消费者)只需要从注册中心,去获取broker的真实地址,便可以直接与Broker进行通信, 进行消息的发送,消费,存储等。

    在讲如何启动之前,要先讲一下俩个基本的控制类NamesrvConfig与NettyServerConfig
    这俩个类其实就是一个配置类,用于存放从配置文件中读取到的配置项

    NamesrvConfig

    结构如下

    public class NamesrvConfig {
        //rocketmqhome的位置,默认通过环境变量获取
        private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
        //kvConfig.json的位置,目前暂时不知道这个文件是做什么的
        private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
        //namesrv.properties配置文件存放的位置
        private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
        private String productEnvName = "center";
        private boolean clusterTest = false;
        //默认不支持消息的有序
        private boolean orderMessageEnable = false;
    }
    

    NettyServerConfig

    其实这一块是底层通信模块,netty的配置。

    public class NettyServerConfig implements Cloneable {
        //监听的端口,默认是8888
        private int listenPort = 8888;
        //netty处理业务的线程池
        private int serverWorkerThreads = 8;
        //处理消息消费,心跳发送,消息发送等。这里是网上找到的概述,暂时不那么清楚作用
        private int serverCallbackExecutorThreads = 0;
        //维护selector轮询的线程
        private int serverSelectorThreads = 3;
        //同步发送支持的一次发送的最大消息数
        private int serverOnewaySemaphoreValue = 256;
        //异步发送支持的一次发送的最大消息数
        private int serverAsyncSemaphoreValue = 64;
        //通道的最大空闲时间
        private int serverChannelMaxIdleTimeSeconds = 120;
        //网络发送区域的缓存区大小
        private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
        //网络接收区域的缓存区大小
        private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
        //是否开启缓存
        private boolean serverPooledByteBufAllocatorEnable = true;
        //linux下要开启epoll Io模型
        private boolean useEpollNativeSelector = false;
    }
    

    接下来说说NamesrvController
    其实里面有一堆成员变量,但是暂时只是猜到做什么的。还没有仔细去看

    NamesrvController

    public class NamesrvController {
        注册中心的配置
        private final NamesrvConfig namesrvConfig;
        底层通信配置(底层是使用netty)
        private final NettyServerConfig nettyServerConfig;
        其实还有一些成员变量,但是由于没有不是本次要讲的重点,就先省略......
    }
    

    接下来直接从源码入手

    启动类是如何启动的?

    public class NamesrvStartup {
        通过main方法启动
        public static void main(String[] args) {
            main0(args);
        }
    
        public static NamesrvController main0(String[] args) {
            try {
                先构建controller
                NamesrvController controller = createNamesrvController(args);
                再启动controller
                start(controller);
                String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
                log.info(tip);
                System.out.printf("%s%n", tip);
                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }
            return null;
        }
    }
    

    从上面的代码看到,还是比较简单的,先构建,再启动,所以下面看一下如何构建

    如何构建NamesrvController

    代码如下
    public class NamesrvStartup {
        public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
            。。。省略
            final NamesrvConfig namesrvConfig = new NamesrvConfig();
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            监听端口设置为9876(默认)
            nettyServerConfig.setListenPort(9876);
            如果启动的命令行中,有c参数,指定配置文件,那么就会加载配置文件,并且封装到对应的配置类中
            if (commandLine.hasOption('c')) {
                String file = commandLine.getOptionValue('c');
                if (file != null) {
                    读取配置文件,进行加载
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    properties = new Properties();
                    properties.load(in);
                    通过反射,设置到配置类中的成员变量
                    MixAll.properties2Object(properties, namesrvConfig);
                    MixAll.properties2Object(properties, nettyServerConfig);
                    设置配置文件的路径
                    namesrvConfig.setConfigStorePath(file);
    
                    System.out.printf("load config properties file OK, %s%n", file);
                    in.close();
                }
            }
            命令行可以输入一些参数,对配置进行覆盖,我们可以通过命令行启动进行覆盖
            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
            //没有设置环境变量会报错,直接退出程序
            if (null == namesrvConfig.getRocketmqHome()) {
                System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
                System.exit(-2);
            }
            // 。。。省略部分代码
            final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
            return controller;
        }
    }
    
    NamesrvController的构建流程如下:

    1.配置类的构建NamesrvConfig,NettyServerConfig。这里主要是涉及到底层通信的配置,以及注册中心文件的一些存放路径
    2.命令行如果有指定配置文件的存放路径,则将配置文件解析,并且将配置设置到配置类(NamesrvConfig,NettyServerConfig)的成员变量
    3.读取命令行里面的参数,如果有的话,对namesrvConfig的配置,可以进行覆盖
    所以可以通过配置文件的方式去设置相关配置,也可以通过命令行的方式去设置相关配置。

    NamesrvController是如何启动的?

    接下来还是先回到一开始的入口

    public class NamesrvStartup {
        public static void main(String[] args) {
            main0(args);
        }
    
        public static NamesrvController main0(String[] args) {
    
            try {
                //NamesrvController的构建
                NamesrvController controller = createNamesrvController(args);
                启动。。。。。。。。。。接下来看看这个方法是如何运行的
                start(controller);
                String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
                log.info(tip);
                System.out.printf("%s%n", tip);
                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }
    
            return null;
        }
    }
    

    启动方法的代码

    public class NamesrvStartup {
        public static NamesrvController start(final NamesrvController controller) throws Exception {
    
            if (null == controller) {
                throw new IllegalArgumentException("NamesrvController is null");
            }
            先进行初始化
            boolean initResult = controller.initialize();
            if (!initResult) {
                controller.shutdown();
                System.exit(-3);
            }
            添加一个回调,用于暂停运行的时候,停止controller
            Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    controller.shutdown();
                    return null;
                }
            }));
            启动
            controller.start();
            return controller;
        }
    }
    

    先来看看如何初始化,代码如下

    NamesrvController的初始化
    public class NamesrvController {
        用于远程通信的服务类,以netty为底层
        private RemotingServer remotingServer;
        public boolean initialize() {
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
            线程池,用于处理远程通信
            this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
            注册处理器,其实看过netty底层的可以理解成一个处理请求的channelHandler,或者说是springmvc里面的dispatcher,用于处理请求的处理器。
            this.registerProcessor();
            心跳检查,通过定时任务,10秒一次,将失效的broker移除。这里如何移除的话,每个broker都有一个最新更新时间,与当前时间对比,如果超过120秒了,那说明失效了。
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                }
            }, 5, 10, TimeUnit.SECONDS);
            。。。省略部分代码
            return true;
        }
    }
    
    
    心跳检查的代码
    public class RouteInfoManager {
        public void scanNotActiveBroker() {
            Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, BrokerLiveInfo> next = it.next();
                long last = next.getValue().getLastUpdateTimestamp();
                if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                    超过10秒,则将与broker的通信切断,同时将broker移除
                    RemotingUtil.closeChannel(next.getValue().getChannel());
                    it.remove();
                    this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
                }
            }
        }
    }
    

    初始化的过程中,会初始化一系列的组件。
    比如远程通信的服务,还有用于远程通信的线程池,以及注册处理远程通信的handler。
    另外也会添加心跳检测,用于移除失效的broker。初始化完之后,就是如何启动了

    NamesrvController如何启动

    代码如下

    public class NamesrvController {
        public void start() throws Exception {
            在初始化的过程中,已经将该服务给初始化了,由于底层是netty,所以那会也用了nettyServerConfig作为参数传入。
            this.remotingServer.start();
    
            if (this.fileWatchService != null) {
                this.fileWatchService.start();
            }
        }
    }
    
    启动的代码如下。
    public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
        @Override
        public void start() {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {
    
                    private AtomicInteger threadIndex = new AtomicInteger(0);
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });
    
            prepareSharableHandlers();
            看到这一步,nettyServerConfig的配置都刚刚好被使用上了。
            ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                    .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.SO_KEEPALIVE, false)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                    .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                    .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                .addLast(defaultEventExecutorGroup,
                                    encoder,
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                    connectionManageHandler,
                                    serverHandler
                                );
                        }
                    });
    
            if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
                childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            }
    
            try {
                ChannelFuture sync = this.serverBootstrap.bind().sync();
                InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
                this.port = addr.getPort();
            } catch (InterruptedException e1) {
                throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
            }
    
            if (this.channelEventListener != null) {
                this.nettyEventExecutor.start();
            }
    
            this.timer.scheduleAtFixedRate(new TimerTask() {
    
                @Override
                public void run() {
                    try {
                        NettyRemotingServer.this.scanResponseTable();
                    } catch (Throwable e) {
                        log.error("scanResponseTable exception", e);
                    }
                }
            }, 1000 * 3, 1000);
        }
    }
    

    至此,启动的过程就完成了。
    namesrv在启动的过程中经历几步。
    1.controller的实例化。需要通过配置文件,或者参数,读取相关的配置,如底层netty需要的配置,以及注册中心的配置,并且进行设置
    2.controller的初始化。主要是对底层的远程通信服务进行初始化
    3.启动,将第二步中的通信服务进去启动即可。

    相关文章

      网友评论

          本文标题:(一)namesrv是如何启动以及初始化的

          本文链接:https://www.haomeiwen.com/subject/dqtjeltx.html