美文网首页rocketMq理论与实践
RocketMQ namesrv 启动流程解析

RocketMQ namesrv 启动流程解析

作者: 晴天哥_王志 | 来源:发表于2020-05-01 14:45 被阅读0次

    namesrv三部曲

    开篇

    • 本文基于rocketmq-all-4.6.1的版本进行分析,主要分析rocketmq的namesrv功能,namesrv的核心功能包括启动流程、元数据存储、以及交互流程。

    • 这篇文章主要是分析namesrv的启动流程。

    namesrv的定位

    • namesrv的定位是作为注册中心,保存broker节点的路由信息,保存一些简单的k/v配置信息。
    • namesrv支持集群模式,但是每个namesrv之间相互独立不进行任何通信,它的多点容灾通过producer/consumer在访问namesrv的时候轮询获取信息(当前节点访问失败就转向下一个)。
    • namesrv作为注册中心,负责接收broker定期的注册信息并维持在内存当中,没错namesrv是没有持久化功能的,所有数据都保存在内存当中,broker的注册过程也是循环遍历所有namesrv进行注册。
    • namesrv提供对外接口给producer和consumer访问broker的路由信息,底层通过netty来实现。
    • namesrv对broker的存活检测机制:心跳机制即namesrv作为broker的server端定期接收broker的心跳信息,超时无心跳就移除broker;连接异常检测机制即底层通过epoll的消息机制来检测连接的断开。

    namesrv启动流程

    启动流程
    启动流程
    • 解析配置文件生成namesrv的配置,包括namesrvConfig和nettyServerConfig。
    • 根据配置文件生成NamesrvController对象并进入启动流程。
    • NamesrvController的初始化中创建NettyRemotingServer、初始化各类线程池、注册Namesrv的processor。
    • NamesrvController的启动主要是NamesrvController的启动,核心是启动NettyServer的监听以及各类扫描线程。
    • namesrv启动过程中JVM只有当所有的非守护线程都结束时候才会结束,参考Netty使用案例 -服务启动退出
    NamesrvController核心配置
    • NamesrvController的核心变量包括配置、路由、server。

    启动流程分析

    整体流程

    public class NamesrvStartup {
    
        private static InternalLogger log;
        private static Properties properties = null;
        private static CommandLine commandLine = null;
    
        public static void main(String[] args) {
            main0(args);
        }
    
        public static NamesrvController main0(String[] args) {
    
            try {
                // 创建NamesrvController对象
                NamesrvController controller = createNamesrvController(args);
    
                // 启动NamesrvController对象
                start(controller);
    
                return controller;
            } catch (Throwable e) {
            }
    
            return null;
        }
    }
    
    • Namesrv的整体启动流程包括createNamesrvController和start(controller)两步。
    • createNamesrvController负责NamesrvController对象的创建。
    • start(controller)负责NamesrvController对象的启动。

    NamesrvController的创建流程

    public class NamesrvStartup {
    
        public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
            System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
            //PackageConflictDetect.detectFastjson();
    
            // 创建namesrv的启动参数解析器
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
                return null;
            }
    
            // 解析namesrv启动参数并生成namesrvConfig和nettyServerConfig
            final NamesrvConfig namesrvConfig = new NamesrvConfig();
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            nettyServerConfig.setListenPort(9876);
            if (commandLine.hasOption('c')) {
                String file = commandLine.getOptionValue('c');
                if (file != null) {
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    properties = new Properties();
                    properties.load(in);
                    MixAll.properties2Object(properties, namesrvConfig);
                    MixAll.properties2Object(properties, nettyServerConfig);
    
                    namesrvConfig.setConfigStorePath(file);
    
                    System.out.printf("load config properties file OK, %s%n", file);
                    in.close();
                }
            }
    
            if (commandLine.hasOption('p')) {
                InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
                MixAll.printObjectProperties(console, namesrvConfig);
                MixAll.printObjectProperties(console, nettyServerConfig);
                System.exit(0);
            }
    
            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
    
            if (null == namesrvConfig.getRocketmqHome()) {
                System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
                System.exit(-2);
            }
    
            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
            JoranConfigurator configurator = new JoranConfigurator();
            configurator.setContext(lc);
            lc.reset();
            configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
    
            log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    
            MixAll.printObjectProperties(log, namesrvConfig);
            MixAll.printObjectProperties(log, nettyServerConfig);
    
            // 创建NamesrvController对象
            final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
    
            // remember all configs to prevent discard
            controller.getConfiguration().registerConfig(properties);
    
            return controller;
        }
    }
    
    • NamesrvController初始化过程主要核心流程是解析参数并创建NamesrvController对象
    • 参数解析包括:创建namesrv的启动参数解析器解析namesrv启动参数并生成namesrvConfig和nettyServerConfig
    • 解析参数赋值给namesrvConfig和nettyServerConfig通过setXxx方法的反射实现的。
    • 通过NamesrvController(namesrvConfig, nettyServerConfig)来创建NamesrvController对象。
    public class NamesrvController {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    
        private final NamesrvConfig namesrvConfig;
        private final NettyServerConfig nettyServerConfig;
    
        private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
            "NSScheduledThread"));
        // 保存namesrv的kv配置的对象
        private final KVConfigManager kvConfigManager;
        // 保存RouteInfo的对象
        private final RouteInfoManager routeInfoManager;
        private RemotingServer remotingServer;
        private BrokerHousekeepingService brokerHousekeepingService;
        private ExecutorService remotingExecutor;
        private Configuration configuration;
        private FileWatchService fileWatchService;
    
        public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
            this.namesrvConfig = namesrvConfig;
            this.nettyServerConfig = nettyServerConfig;
            this.kvConfigManager = new KVConfigManager(this);
            this.routeInfoManager = new RouteInfoManager();
            this.brokerHousekeepingService = new BrokerHousekeepingService(this);
            this.configuration = new Configuration(log, this.namesrvConfig, this.nettyServerConfig);
            this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
        }
    }
    
    • NamesrvController的变量当中重点关注KVConfigManager和RouteInfoManager。
    • KVConfigManager保存namesrv的KV配置信息。
    • RouteInfoManager保存topic和broker的相关的路由信息。

    NamesrvController的启动流程

    public class NamesrvStartup {
    
        public static NamesrvController start(final NamesrvController controller) throws Exception {
    
            if (null == controller) {
                throw new IllegalArgumentException("NamesrvController is null");
            }
            // NamesrvController对象的初始化
            boolean initResult = controller.initialize();
            if (!initResult) {
                controller.shutdown();
                System.exit(-3);
            }
    
            Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    controller.shutdown();
                    return null;
                }
            }));
            // NamesrvController对象的启动
            controller.start();
    
            return controller;
        }
    }
    
    • NamesrvController的启动流程包括初始化NamesrvController对象和启动NamesrvController对象。
    • controller.initialize()负责NamesrvController对象的初始化。
    • controller.start()负责NamesrvController对象的启动。

    NamesrvController的initialize流程

    public class NamesrvController {
    
        public boolean initialize() {
    
            this.kvConfigManager.load();
            // 创建NettyRemotingServer对象
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
            // 创建线程池remotingExecutor对象
            this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
            // 注册Namesrv的处理Processor对象
            this.registerProcessor();
            // 启动Broker存活扫描线程任务
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                }
            }, 5, 10, TimeUnit.SECONDS);
            // 启动定时打印配置的线程任务
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.kvConfigManager.printAllPeriodically();
                }
            }, 1, 10, TimeUnit.MINUTES);
    
            // 省略相关代码
    
            return true;
        }
    
    
        private void registerProcessor() {
            if (namesrvConfig.isClusterTest()) {
    
                this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                    this.remotingExecutor);
            } else {
    
                this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
            }
        }
    }
    
    • NamesrvController对象初始化过程主要是负责NettyRemotingServer对象创建、注册处理Processor、启动各类线程。
    • 创建NettyRemotingServer的remotingServer对象。
    • 创建线程池的remotingExecutor对象。
    • registerProcessor()注册Namesrv的任务处理Processor对象DefaultRequestProcessor。
    • 启动Broker存活扫描线程任务scanNotActiveBroker。
    • 启动定时打印配置的线程任务printAllPeriodically。

    NamesrvController的start流程

    public class NamesrvController {
        // remotingServer是NettyRemotingServer类型
        private RemotingServer remotingServer;
    
        public void start() throws Exception {
            this.remotingServer.start();
    
            if (this.fileWatchService != null) {
                this.fileWatchService.start();
            }
        }
    }
    
    • NamesrvController的start()方法执行的是NettyRemotingServer的start()方法。
    public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    
        @Override
        public void start() {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {
    
                    private AtomicInteger threadIndex = new AtomicInteger(0);
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });
    
            // 初始化NettyRemotingServer的server侧的handler
            prepareSharableHandlers();
    
            ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                    .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.SO_KEEPALIVE, false)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                    .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                    .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                .addLast(defaultEventExecutorGroup,
                                    encoder,
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                    connectionManageHandler,
                                    serverHandler
                                );
                        }
                    });
    
            if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
                childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            }
    
            try {
                ChannelFuture sync = this.serverBootstrap.bind().sync();
                InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
                this.port = addr.getPort();
            } catch (InterruptedException e1) {
                throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
            }
    
            if (this.channelEventListener != null) {
                this.nettyEventExecutor.start();
            }
    
            this.timer.scheduleAtFixedRate(new TimerTask() {
    
                @Override
                public void run() {
                    try {
                        NettyRemotingServer.this.scanResponseTable();
                    } catch (Throwable e) {
                        log.error("scanResponseTable exception", e);
                    }
                }
            }, 1000 * 3, 1000);
        }
    
    
        private void prepareSharableHandlers() {
            handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
            encoder = new NettyEncoder();
            connectionManageHandler = new NettyConnectManageHandler();
            serverHandler = new NettyServerHandler();
        }
    }
    
    • NettyRemotingServer的start()内部主要负责启动Netty的server侧监听。
    • NettyRemotingServer#prepareSharableHandlers方法初始化server侧的各类handler。

    相关文章

      网友评论

        本文标题:RocketMQ namesrv 启动流程解析

        本文链接:https://www.haomeiwen.com/subject/qzqxghtx.html