美文网首页
rocketmq-namesvr

rocketmq-namesvr

作者: 一抹斜阳丶 | 来源:发表于2018-03-09 17:07 被阅读170次

服务监听

启动步骤

  • NamesrvStartup.main0() 中初始化参数。组装netty服务参数;从properties中加载配置。
  • 构建NamesrvController,传递namesvrConfig, nettyServerConfig。
  • 通过NamesrvController.initialize() 启动netty服务。

相关代码

public static NamesrvController main0(String[] args) {

    //初始化netty参数
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    nettyServerConfig.setListenPort(9876);
        
    //初始化namesvr配置
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);  
    if (commandLine.hasOption('c')) {   
        String file = commandLine.getO  ptionValue('c');
      if (file != null) {   
            InputStream  in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);
                        
            namesrvConfig.setConfigStorePath(file);

        System.out.printf("load config properties file OK, " + file + "%n");
        in.close();
    }
}

    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServ erConfig);
    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);

    boolean initResult = controller.initialize();
}

启动namesvr

    public boolean initialize() {

        this.kvConfigManager.load();

        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        this.registerProcessor();

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        return true;
    }

namesvr处理哪些数据

  • namesvr存储活跃的broker列表,包含master, slave。
  • namesvr存储所有broker的filter列表。
  • 心跳监测,如果超时则释放broker相关信息。如连接,需要通过netty重新向namesvr上传broker信息。
  • 使用HashMap是因为用读写锁控制并发顺序。

RouteInfoManager.java

private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

存储topic的数据 QueueData.java

    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
    private int perm;
    private int topicSynFlag;
rocketmq各模块通信.png

相关文章

  • rocketmq-namesvr

    服务监听 启动步骤 NamesrvStartup.main0() 中初始化参数。组装netty服务参数;从prop...

网友评论

      本文标题:rocketmq-namesvr

      本文链接:https://www.haomeiwen.com/subject/sdtpfftx.html