美文网首页
rocketMq NameServer命名服务

rocketMq NameServer命名服务

作者: 圣村的希望 | 来源:发表于2018-12-09 15:28 被阅读0次

        RocketMq中的NameServer是一个无状态的命名服务,类似于dubbo中的zookeeper的,可以多机部署,nameserver之间不需要进行连接通信。主要用于维护和保存整个集群中的元数据信息,像broker和client的注册等信息。集群中的所有broker、producer和consumer机器都要和nameServer进行长连接,用于请求或发送数据。
      NamesrvStartup是模块启动的开始:

    public static NamesrvController main0(String[] args) {
            System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
            try {
                //PackageConflictDetect.detectFastjson();
    
                Options options = ServerUtil.buildCommandlineOptions(new Options());
                commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
                if (null == commandLine) {
                    System.exit(-1);
                    return null;
                }
    
                final NamesrvConfig namesrvConfig = new NamesrvConfig();
                namesrvConfig.setRocketmqHome("/Users/liuqiang/learn/rocketmq/distribution");
                final NettyServerConfig nettyServerConfig = new NettyServerConfig();
                nettyServerConfig.setListenPort(9876);
                if (commandLine.hasOption('c')) {
                    String file = commandLine.getOptionValue('c');
                    if (file != null) {
                        InputStream in = new BufferedInputStream(new FileInputStream(file));
                        properties = new Properties();
                        properties.load(in);
                        MixAll.properties2Object(properties, namesrvConfig);
                        MixAll.properties2Object(properties, nettyServerConfig);
    
                        namesrvConfig.setConfigStorePath(file);
    
                        System.out.printf("load config properties file OK, " + file + "%n");
                        in.close();
                    }
                }
    
                if (commandLine.hasOption('p')) {
                    MixAll.printObjectProperties(null, namesrvConfig);
                    MixAll.printObjectProperties(null, nettyServerConfig);
                    System.exit(0);
                }
    
                MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
    
                if (null == namesrvConfig.getRocketmqHome()) {
                    System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
                    System.exit(-2);
                }
    
                LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
                JoranConfigurator configurator = new JoranConfigurator();
                configurator.setContext(lc);
                lc.reset();
                configurator.doConfigure("/Users/liuqiang/learn/rocketmq/distribution" + "/conf/logback_namesrv.xml");
                final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    
                MixAll.printObjectProperties(log, namesrvConfig);
                MixAll.printObjectProperties(log, nettyServerConfig);
    
                final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
    
                // remember all configs to prevent discard
                controller.getConfiguration().registerConfig(properties);
    
                boolean initResult = controller.initialize();
                if (!initResult) {
                    controller.shutdown();
                    System.exit(-3);
                }
    
                Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        controller.shutdown();
                        return null;
                    }
                }));
    
                controller.start();
    
                String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
                log.info(tip);
                System.out.printf(tip + "%n");
    
                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }
    
            return null;
        }
    

      这里做了几件事情,从命令行读取参数配置,-c指定配置文件的信息,然后初始化nameServer的controller;然后为当前namesrv进程添加钩子,当namesrv进程关闭时关闭后台一系列线程;最后启动controller对应的后台线程。

    public boolean initialize() {
    
            this.kvConfigManager.load();
    
            //负责监听端口,用来处理broker和client的各种请求
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
            
            //线程池,用来处理各种请求,其工作线程的大小可配置
            this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    
            //注册核心业务逻辑处理器
            this.registerProcessor();
    
            //扫描失效的broker
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                }
            }, 5, 10, TimeUnit.SECONDS);
    
            //打印配置信息
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.kvConfigManager.printAllPeriodically();
                }
            }, 1, 10, TimeUnit.MINUTES);
    
            return true;
        }
    

    NameServerController的初始化做了几件事情:

    1. 实例化了remotingServer监听一些端口,在里面实例化了一个channelEventListener,用于来处理broker和client的请求,对于发送来的请求交给DefaultRequestProcessor来进行处理,最后也是丢到下面创建的线程池里面进行执行
    this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
    
    public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
            this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
        }
    
    1. 创建了一个线程池,默认配置是创建8个线程大小的线程池,用于处理broker和client提交的请求。
    2. 注册核心业务处理器DefaultRequestProcessor,这里是把这个处理器注册到了remotingServer中去了,通过channelEventListener监听到broker和client的请求,然后交给DefaultRequestProcessor进行处理,最后还是丢到线程池里面处理,根据不同的request code交给不同的processor进行处理,像registerBroker等。
    3. 每隔10秒扫描失效的broker和打印配置信息,这里的周期性任务是使用的单线程进行来处理的。

      像NameServer还要进行集群元数据的存储和维护,最终它是交给RouteInfoManager来进行处理的。

    public class RouteInfoManager {
        private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
        private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    }
    

    像rocketMq集群中的元数据是读多写少的特性,所以在RouteInfoManager中采用的可重入的读写锁来控制读写这些数据的并发处理。很疑惑RocketMq中使用NameServer作为命名服务,没有使用Zookeeper,自己所知道的有两点:

    1. RocketMq本身就是中间件,不想再去依赖另一个中间件
    2. 非技术因素,自己重写一个命名服务出成绩

    相关文章

      网友评论

          本文标题:rocketMq NameServer命名服务

          本文链接:https://www.haomeiwen.com/subject/mtohhqtx.html