美文网首页
rocketmq-namesvr

rocketmq-namesvr

作者: 一抹斜阳丶 | 来源:发表于2018-03-09 17:07 被阅读170次

    服务监听

    启动步骤

    • NamesrvStartup.main0() 中初始化参数。组装netty服务参数;从properties中加载配置。
    • 构建NamesrvController,传递namesvrConfig, nettyServerConfig。
    • 通过NamesrvController.initialize() 启动netty服务。

    相关代码

    public static NamesrvController main0(String[] args) {
    
        //初始化netty参数
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
            
        //初始化namesvr配置
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            nettyServerConfig.setListenPort(9876);  
        if (commandLine.hasOption('c')) {   
            String file = commandLine.getO  ptionValue('c');
          if (file != null) {   
                InputStream  in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                            
                namesrvConfig.setConfigStorePath(file);
    
            System.out.printf("load config properties file OK, " + file + "%n");
            in.close();
        }
    }
    
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServ erConfig);
        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);
    
        boolean initResult = controller.initialize();
    }
    

    启动namesvr

        public boolean initialize() {
    
            this.kvConfigManager.load();
    
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    
            this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    
            this.registerProcessor();
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                }
            }, 5, 10, TimeUnit.SECONDS);
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.kvConfigManager.printAllPeriodically();
                }
            }, 1, 10, TimeUnit.MINUTES);
    
            return true;
        }
    

    namesvr处理哪些数据

    • namesvr存储活跃的broker列表,包含master, slave。
    • namesvr存储所有broker的filter列表。
    • 心跳监测,如果超时则释放broker相关信息。如连接,需要通过netty重新向namesvr上传broker信息。
    • 使用HashMap是因为用读写锁控制并发顺序。

    RouteInfoManager.java

    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    

    存储topic的数据 QueueData.java

        private String brokerName;
        private int readQueueNums;
        private int writeQueueNums;
        private int perm;
        private int topicSynFlag;
    
    rocketmq各模块通信.png

    相关文章

      网友评论

          本文标题:rocketmq-namesvr

          本文链接:https://www.haomeiwen.com/subject/sdtpfftx.html