美文网首页
(一)namesrv是如何启动以及初始化的

(一)namesrv是如何启动以及初始化的

作者: guessguess | 来源:发表于2021-06-08 16:37 被阅读0次

在之前使用rocketmq时,很多细节并没有了解到。后面直接从网上下载了4.8版本的源码,直接进行阅读。
入口还是很好找的,不过感觉看这块源码,需要一些nio的基础,最好有使用或者了解过Netty的源码,阅读起来会更方便。

感觉直接贴图比较方便一点。下面是namesrv的启动类的位置。


namesrv的启动类

先来说说namesrv是做什么的。这个模块其实相当于一个注册中心的功能,实时监控broker的存活状态,而客户端(生产者 或者消费者)只需要从注册中心,去获取broker的真实地址,便可以直接与Broker进行通信, 进行消息的发送,消费,存储等。

在讲如何启动之前,要先讲一下俩个基本的控制类NamesrvConfig与NettyServerConfig
这俩个类其实就是一个配置类,用于存放从配置文件中读取到的配置项

NamesrvConfig

结构如下

public class NamesrvConfig {
    //rocketmqhome的位置,默认通过环境变量获取
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    //kvConfig.json的位置,目前暂时不知道这个文件是做什么的
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
    //namesrv.properties配置文件存放的位置
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";
    private boolean clusterTest = false;
    //默认不支持消息的有序
    private boolean orderMessageEnable = false;
}

NettyServerConfig

其实这一块是底层通信模块,netty的配置。

public class NettyServerConfig implements Cloneable {
    //监听的端口,默认是8888
    private int listenPort = 8888;
    //netty处理业务的线程池
    private int serverWorkerThreads = 8;
    //处理消息消费,心跳发送,消息发送等。这里是网上找到的概述,暂时不那么清楚作用
    private int serverCallbackExecutorThreads = 0;
    //维护selector轮询的线程
    private int serverSelectorThreads = 3;
    //同步发送支持的一次发送的最大消息数
    private int serverOnewaySemaphoreValue = 256;
    //异步发送支持的一次发送的最大消息数
    private int serverAsyncSemaphoreValue = 64;
    //通道的最大空闲时间
    private int serverChannelMaxIdleTimeSeconds = 120;
    //网络发送区域的缓存区大小
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    //网络接收区域的缓存区大小
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    //是否开启缓存
    private boolean serverPooledByteBufAllocatorEnable = true;
    //linux下要开启epoll Io模型
    private boolean useEpollNativeSelector = false;
}

接下来说说NamesrvController
其实里面有一堆成员变量,但是暂时只是猜到做什么的。还没有仔细去看

NamesrvController

public class NamesrvController {
    注册中心的配置
    private final NamesrvConfig namesrvConfig;
    底层通信配置(底层是使用netty)
    private final NettyServerConfig nettyServerConfig;
    其实还有一些成员变量,但是由于没有不是本次要讲的重点,就先省略......
}

接下来直接从源码入手

启动类是如何启动的?

public class NamesrvStartup {
    通过main方法启动
    public static void main(String[] args) {
        main0(args);
    }

    public static NamesrvController main0(String[] args) {
        try {
            先构建controller
            NamesrvController controller = createNamesrvController(args);
            再启动controller
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }
        return null;
    }
}

从上面的代码看到,还是比较简单的,先构建,再启动,所以下面看一下如何构建

如何构建NamesrvController

代码如下
public class NamesrvStartup {
    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        。。。省略
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        监听端口设置为9876(默认)
        nettyServerConfig.setListenPort(9876);
        如果启动的命令行中,有c参数,指定配置文件,那么就会加载配置文件,并且封装到对应的配置类中
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                读取配置文件,进行加载
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                通过反射,设置到配置类中的成员变量
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                设置配置文件的路径
                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }
        命令行可以输入一些参数,对配置进行覆盖,我们可以通过命令行启动进行覆盖
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
        //没有设置环境变量会报错,直接退出程序
        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }
        // 。。。省略部分代码
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
        return controller;
    }
}
NamesrvController的构建流程如下:

1.配置类的构建NamesrvConfig,NettyServerConfig。这里主要是涉及到底层通信的配置,以及注册中心文件的一些存放路径
2.命令行如果有指定配置文件的存放路径,则将配置文件解析,并且将配置设置到配置类(NamesrvConfig,NettyServerConfig)的成员变量
3.读取命令行里面的参数,如果有的话,对namesrvConfig的配置,可以进行覆盖
所以可以通过配置文件的方式去设置相关配置,也可以通过命令行的方式去设置相关配置。

NamesrvController是如何启动的?

接下来还是先回到一开始的入口

public class NamesrvStartup {
    public static void main(String[] args) {
        main0(args);
    }

    public static NamesrvController main0(String[] args) {

        try {
            //NamesrvController的构建
            NamesrvController controller = createNamesrvController(args);
            启动。。。。。。。。。。接下来看看这个方法是如何运行的
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }
}

启动方法的代码

public class NamesrvStartup {
    public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }
        先进行初始化
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        添加一个回调,用于暂停运行的时候,停止controller
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));
        启动
        controller.start();
        return controller;
    }
}

先来看看如何初始化,代码如下

NamesrvController的初始化
public class NamesrvController {
    用于远程通信的服务类,以netty为底层
    private RemotingServer remotingServer;
    public boolean initialize() {
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        线程池,用于处理远程通信
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        注册处理器,其实看过netty底层的可以理解成一个处理请求的channelHandler,或者说是springmvc里面的dispatcher,用于处理请求的处理器。
        this.registerProcessor();
        心跳检查,通过定时任务,10秒一次,将失效的broker移除。这里如何移除的话,每个broker都有一个最新更新时间,与当前时间对比,如果超过120秒了,那说明失效了。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        。。。省略部分代码
        return true;
    }
}


心跳检查的代码
public class RouteInfoManager {
    public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                超过10秒,则将与broker的通信切断,同时将broker移除
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }
}

初始化的过程中,会初始化一系列的组件。
比如远程通信的服务,还有用于远程通信的线程池,以及注册处理远程通信的handler。
另外也会添加心跳检测,用于移除失效的broker。初始化完之后,就是如何启动了

NamesrvController如何启动

代码如下

public class NamesrvController {
    public void start() throws Exception {
        在初始化的过程中,已经将该服务给初始化了,由于底层是netty,所以那会也用了nettyServerConfig作为参数传入。
        this.remotingServer.start();

        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }
}

启动的代码如下。
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    @Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        prepareSharableHandlers();
        看到这一步,nettyServerConfig的配置都刚刚好被使用上了。
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
}

至此,启动的过程就完成了。
namesrv在启动的过程中经历几步。
1.controller的实例化。需要通过配置文件,或者参数,读取相关的配置,如底层netty需要的配置,以及注册中心的配置,并且进行设置
2.controller的初始化。主要是对底层的远程通信服务进行初始化
3.启动,将第二步中的通信服务进去启动即可。

相关文章

网友评论

      本文标题:(一)namesrv是如何启动以及初始化的

      本文链接:https://www.haomeiwen.com/subject/dqtjeltx.html