美文网首页消息中间件选型
RocketMQ源码解析(二)-nameserv

RocketMQ源码解析(二)-nameserv

作者: 空挡 | 来源:发表于2018-12-24 17:10 被阅读0次

    NameServ是rocketMQ的注册中心,保存所有Broker、Topic的元数据。Broker启动后会向nameserv发送心跳,nameserv也会定时检测broker的可用性,并移除不可用的broker。

    Nameserv的启动过程

    启动脚本

    > nohup sh bin/mqnamesrv &
    

    nameserv启动过程会将所有初始化和启动工作交给NamesrvController来完成。

    NamesrvController

    nameserv的主要控制类,负责初始化和后台任务启动,Controller包含的主要组件都在构造函数中做了初始化
    Controller构造函数

    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
            //nameserv参数配置
            this.namesrvConfig = namesrvConfig; 
            //netty的参数配置
            this.nettyServerConfig = nettyServerConfig;
            this.kvConfigManager = new KVConfigManager(this);
            //初始化RouteInfoManager
            this.routeInfoManager = new RouteInfoManager();
            //监听客户端连接(Channel)的变化,通知RouteInfoManager检查broker是否有变化
            this.brokerHousekeepingService = new BrokerHousekeepingService(this);
            this.configuration = new Configuration(
                log,
                this.namesrvConfig, this.nettyServerConfig
            );
            //Nameserv的配置参数会保存到磁盘文件中
            this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
        }
    

    构造函数中初始化了RouteInfoManager,这个最重要的类,负责缓存整个集群的broker信息,以及topic和queue的配置信息。
    RouteInfoManager数据结构
    RouteInfoManager的所有数据通过HashMap缓存在内存中,通过读写锁来控制并发更新。这样可最大程度的提高客户端查询数据的速度。数据更新时会将数据保存到文件中,重启后可恢复数据。

        //1、Topic和broker的Map,保存了topic在每个broker上的读写Queue的个数以及读写权限
        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
        //2、注册到nameserv上的所有Broker,按照brokername分组
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
        //3、broker的集群对应关系
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
        //4、broker最新的心跳时间和配置版本号
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
        //5、broker和FilterServer的对应关系
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    

    从以上的信息可以看出:
    1)Broker使用brokerName来标识主从关系,同一个brokerName下只能由一个master。
    2)使用clusterName来判断多个broker是不是属于同一个集群。对于同一个cluster下的broker,producer在发送消息时只会选择发送给其中一个。
    3)nameserv会记录brokerAddr的最后活跃时间,如果超过一定没有心跳或其他数据交互,会认为broker已下线。
    4)nameserv和broker上都会保存DataVersion字段,当broker配置有变更时,DataVersion会+1。下次心跳时nameserv通过这个字段来判断配置是否有变更。
    【注意】因为nameserv是用brokername来区分broker,所以注册到同一个nameserv上的多个集群,brokerName和topic也是不能重复的。
    Controller initialize
    启动过程中新建了一个Controller的实例后会调用它的initialize()方法:

    public boolean initialize() {
            //1、初始化KVConfigManager
            this.kvConfigManager.load();
            //2、初始化netty server
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
            //3、客户端请求处理的线程池
            this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
           //4、注册DefaultRequestProcessor,所有的客户端请求都会转给这个Processor来处理
            this.registerProcessor();
           //5、启动定时调度,每10秒钟扫描所有Broker,检查存活状态
             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                }
            }, 5, 10, TimeUnit.SECONDS);
            //6、日志打印的调度器,定时打印kvConfigManager的内容      
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.kvConfigManager.printAllPeriodically();
                }
            }, 1, 10, TimeUnit.MINUTES);
            //7、监听ssl证书文件变化,
            if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                // Register a listener to reload SslContext
                ...
            }
    
            return true;
        }
    

    以上最重要的就是第2,4步,初始化nameserv的Server,用来接收客户端请求。所有的客户端请求都会转给第4步中注册的DefaultRequestProcessor来处理。
    第5步中,启动了一个定时器来扫描RouteInfoManager中缓存的broker信息,如果broker已经长时间没有心跳信息,则认为broker已经down掉了,将其移除。
    Controller启动:

       public void start() throws Exception {
            this.remotingServer.start();
            //监听ssl文件变化,可以实时更新证书
            if (this.fileWatchService != null) {
                this.fileWatchService.start();
            }
        }
    

    启动的过程比较简单,就是启动netty server开始接收客户端请求。

    DefaultRequestProcessor请求处理

    前面讲过nameserv最重要的两个作用,一个是路由管理,通过RouteInfoManager管理路由信息供客户端查询。一个是Broker管理,接收broker注册并通过心跳机制检查broker的可用性。
    NameServer通过DefaultRequestProcessor的processRequest方法来提供请求处理。

    @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            ...
            switch (request.getCode()) {
                ...
                //broker注册请求
                case RequestCode.REGISTER_BROKER:
                    Version brokerVersion = MQVersion.value2Version(request.getVersion());
                    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                        return this.registerBrokerWithFilterServer(ctx, request);
                    } else {
                        return this.registerBroker(ctx, request);
                    }
                 //Broker注销请求
                case RequestCode.UNREGISTER_BROKER:
                    return this.unregisterBroker(ctx, request);
                //根据topic获取broker路由信息
                case RequestCode.GET_ROUTEINTO_BY_TOPIC:
                    return this.getRouteInfoByTopic(ctx, request);
                //获取broker集群信息
                case RequestCode.GET_BROKER_CLUSTER_INFO:
                    return this.getBrokerClusterInfo(ctx, request);
                case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                    return this.wipeWritePermOfBroker(ctx, request);
                //获取所有topic信息
                case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                    return getAllTopicListFromNameserver(ctx, request);
                //删除topic
                case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                    return deleteTopicInNamesrv(ctx, request);
                ...
            }
            return null;
        }
    

    查询路由信息

    public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            final RemotingCommand response = RemotingCommand.createResponseCommand(null);
            final GetRouteInfoRequestHeader requestHeader =
                (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
            //从RouteInfoManager中获取topic的路由信息
            TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
            //如果支持顺序消息,则填充KVConfig信息
            if (topicRouteData != null) {
                if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                    String orderTopicConf =
                        this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                            requestHeader.getTopic());
                    topicRouteData.setOrderTopicConf(orderTopicConf);
                }
    
                byte[] content = topicRouteData.encode();
                response.setBody(content);
                response.setCode(ResponseCode.SUCCESS);
                response.setRemark(null);
                return response;
            }
    
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
                + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
            return response;
        }
    

    路由信息就是直接到RouteInfoManager查询,我们看下具体实现:

    public TopicRouteData pickupTopicRouteData(final String topic) {
              ...
                try {
                    //获取读锁
                    this.lock.readLock().lockInterruptibly();
                    //获取所有支持该topic的broker的queue配置
                    List<QueueData> queueDataList = this.topicQueueTable.get(topic);
                    if (queueDataList != null) {
                        topicRouteData.setQueueDatas(queueDataList);
                        foundQueueData = true;
                        //获取brokerName
                        Iterator<QueueData> it = queueDataList.iterator();
                        while (it.hasNext()) {
                            QueueData qd = it.next();
                            brokerNameSet.add(qd.getBrokerName());
                        }
                  
                        for (String brokerName : brokerNameSet) {
                            //根据brokerName获取broker主从地址信息
                            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                            if (null != brokerData) {
                                BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
                                    .getBrokerAddrs().clone());
                                brokerDataList.add(brokerDataClone);
                                foundBrokerData = true;
                                for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                                    List<String> filterServerList = this.filterServerTable.get(brokerAddr);
                                    filterServerMap.put(brokerAddr, filterServerList);
                                }
                            }
                        }
                    }
                } finally {
                    this.lock.readLock().unlock();
                }
                ...
        }
    

    获取topic路由的过程就是直接从HashMap中获取缓存的broker配置。
    Broker注册
    Broker在启动的时候会将topic和queue的配置同步给nameserv。

    public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
            ...
            //checksum,检查CRC32是否相等
            if (!checksum(ctx, request, requestHeader)) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("crc32 not match");
                return response;
            }
    
            RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
            //decode request body,如果body已压缩,则先解压。如果body为空,会将topic的版本号默认置为0.
            if (request.getBody() != null) {
                try {
                    registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
                } catch (Exception e) {
                    throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
                }
            } else {
                registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
                registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
            }
            //使用broker上报的信息更新nameserv的RouteInfo
            RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
                requestHeader.getClusterName(),
                requestHeader.getBrokerAddr(),
                requestHeader.getBrokerName(),
                requestHeader.getBrokerId(),
                requestHeader.getHaServerAddr(),
                registerBrokerBody.getTopicConfigSerializeWrapper(),
                registerBrokerBody.getFilterServerList(),
                ctx.channel());
            //如果broker是slave的话,会将master address和ha server address通过response返回给broker
            responseHeader.setHaServerAddr(result.getHaServerAddr());
            responseHeader.setMasterAddr(result.getMasterAddr());
            //将Order topic的KV配置信息通过response返回
            byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
            response.setBody(jsonValue);
    
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
    

    nameserv收到broker注册后,更新routeInfo过程

    public RegisterBrokerResult registerBroker(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final Channel channel) {
            RegisterBrokerResult result = new RegisterBrokerResult();
            try {
                try {
                    this.lock.writeLock().lockInterruptibly();
                    //更新cluster和broker对应关系
                    Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                    if (null == brokerNames) {
                        brokerNames = new HashSet<String>();
                        this.clusterAddrTable.put(clusterName, brokerNames);
                    }
                    brokerNames.add(brokerName);
    
                    boolean registerFirst = false;
                    //更新brokername和brokerdata的map
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null == brokerData) {
                        registerFirst = true;
                        brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                        this.brokerAddrTable.put(brokerName, brokerData);
                    }
                    String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                    registerFirst = registerFirst || (null == oldAddr);
                    //如果是master broker,第一次注册或者是topic信息发生变化了,更新topicQueueTable
                    if (null != topicConfigWrapper
                        && MixAll.MASTER_ID == brokerId) {
                        if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                            || registerFirst) {
                            ConcurrentMap<String, TopicConfig> tcTable =
                                topicConfigWrapper.getTopicConfigTable();
                            if (tcTable != null) {
                                for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                    this.createAndUpdateQueueData(brokerName, entry.getValue());
                                }
                            }
                        }
                    }
                    //更新broker的心跳时间
                    BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                        new BrokerLiveInfo(
                            System.currentTimeMillis(),
                            topicConfigWrapper.getDataVersion(),
                            channel,
                            haServerAddr));
                    if (null == prevBrokerLiveInfo) {
                        log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                    }
                    //更新filter server table
                    if (filterServerList != null) {
                        if (filterServerList.isEmpty()) {
                            this.filterServerTable.remove(brokerAddr);
                        } else {
                            this.filterServerTable.put(brokerAddr, filterServerList);
                        }
                    }
                    //如果是slave broker注册,如果master存在,则返回master broker信息
                    if (MixAll.MASTER_ID != brokerId) {
                        String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                        if (masterAddr != null) {
                            BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                            if (brokerLiveInfo != null) {
                                result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                                result.setMasterAddr(masterAddr);
                            }
                        }
                    }
                } finally {
                    this.lock.writeLock().unlock();
                }
            } catch (Exception e) {
                log.error("registerBroker Exception", e);
            }
    
            return result;
        }
    

    总结

    1、nameserv通过clusterName来判断broker是不是属于同一个集群
    2、nameserv通过brokerName来判断两个broker是不是主从关系
    3、对于相同的brokerName,只有一个master(id=0),不同的slave必须使用不同的Id (id>0)
    4、NameServ只会保存master的topic配置信息,因为理论上slave的topic信息是从master同步过去的
    5、所有的topic信息以broker上报为准,broker在启动的时候是不会去nameserv获取topic配置的,只会从自己持久化文件中加载。所以,一个新的broker启动后默认只有System topic信息。如果broker是新的,或者broker在挂掉一段时间重启topic不是最新的话,只能通过客户端更新topic来使broker能加入到正常的消息收发中。

    相关文章

      网友评论

        本文标题:RocketMQ源码解析(二)-nameserv

        本文链接:https://www.haomeiwen.com/subject/fuyokqtx.html