美文网首页
RocketMQ系列1:元数据通信

RocketMQ系列1:元数据通信

作者: 过去今天和未来 | 来源:发表于2020-12-27 11:19 被阅读0次

    最近一段时间由于公司做一些LDC(阿里称为逻辑数据中心,)架构升级,所以有幸参与中间件RocketMQ改造,所以整这个机会会把RocketMQ整个流程梳理一份。

    image

    先简单概述一下RocketMQ中成员

    • Producer :消息生产者,可集群部署。它会先和 NameServer 集群中的随机一台建立⻓连接,得 知当前要发送的 Topic 存在哪台 Broker Master上,然后再与其建立⻓连接,支持多种负载平衡模式 发送消息。

    • Consumer :消息消费者,可集群部署。它会先和 NameServer 集群中的随机一台建立⻓连接,得 知当前要消费的 Topic 存在哪台 Broker Master、Slave上,然后它们建立⻓连接,支持集群消费和广 播消费消息。

    • Broker :主要负责消息的存储、查询消费,支持主从部署,一个 Master 可以对应多个 Slave,Master 支持读写,Slave 只支持读。Broker 会向集群中的每一台 NameServer 注册自己的路由信息。

    • NameServer : Topic 路由注册中心,支持 Broker 的动态注册和发现,保存 Topic 和 Borker之间的关系。通常是集群部署,但是各 NameServer 之间不会互相通信, 各 NameServer 都有完整的 路由信息。

    以下则是本系列开篇,首先会梳理一下各个成员之间的路由信息通信


    一、Broker上报元数据流程

    启动过程

    1. BrokerController创建

    主要是加载类成员,包括配置类、管理类、服务类、任务类、策略类等。后续在分析这些。
    

    2. 启动过程

    主要包括消息存储服务、远程通信服务、心跳服务等启动过程,我们重点看一下定时线程池,启动后10s执行。时间间隔则是在10-60s期间,可以在conf文件中配置。

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
         BrokerController.this.registerBrokerAll(true, false,brokerConfig.isForceRegister());
      }
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    

    3. BrokerController#registerBrokerAll

    • 判断Broker是否有读写权限进行

    • broker侧 needRegister 发起一次远程请求确认数据版本是否有变化

    List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.getHAServerAddr(),
                topicConfigWrapper,
                this.filterServerManager.buildNewFilterServerList(),
                oneway,
                this.brokerConfig.getRegisterBrokerTimeoutMills(),
                this.brokerConfig.isCompressedRegister());
    

    4. 真正进行Broker元信息上报
    BrokerController#doRegisterBrokerAll

    • 首先获取nameserver地址信息,开始是通过配置读取,如果其中一个挂掉则会更新nameserver列表信息。

    • 接着Broker会将请求参数拼接,包括BrokerID、Name、集群名称、主节点地址等元数据,并转换为二进制进行远程通信将BrokerData上报,这里采用CountDownLatch+线程池并发执行上报,采用remotingClient进行Netty通信,最终将结果添加到List中

     List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
     final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
                for (final String namesrvAddr : nameServerAddressList) {
                    brokerOuterExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                                if (result != null) {
                                    registerBrokerResultList.add(result);
                                }
                            } catch (Exception e) {
                                log.warn("registerBroker Exception, {}", namesrvAddr, e);
                            } finally {
                                countDownLatch.countDown();
                            }
                        }
                    });
                }
                try {
                    countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                }
    

    二、Nameserver接受Broker请求处理

    DefaultRequestProcessor#processRequest

    该方法采用策略分配请求处理,对于Broker上报请求Code则是Register_Broker,最终到DefaultRequestProcessor#registerBroker ,该方法主要更新Nameserver元数据基本都是Map对象结构。

    • 加锁控制,避免更新时客户端读到的数据不一致
    • 更新clusterAddrTable
    • 更新brokerAddrTable
    • 更新 brokerAddrTable 中的 brokerData
    • 如果是新注册的 Master Broker,或者 Broker 中的路由信息变了,需要更新 topicQueueTable
    • 更新 brokerLiveTable
    • 更新 filterServerTable
    • 如果是 Slave Broker,需要在返回的信息中带上 master 的相关信息
    • 返回结果RegisterBrokerResult
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    

    三、客户端拉取Namesrv元数据

    1. 生产端启动

    MQClientInstance#start

    • 启动 scheduledExecutorService进行和NameServer建立连接,每隔2*60s
    • 定时线程池更新topic路由信updateTopicRouteInfoFromNameServer,每个topic对应的TopicRouteData。如果Topic路由信息变化则进行更新,发送端、消费端同时更新topicPublishInfoTable、topicSubscribeInfoTable最终缓存topicRouteTable
     // 元数据
     public class TopicRouteData extends RemotingSerializable {
        private List<QueueData> queueDatas;
        private List<BrokerData> brokerDatas;
    }
    

    2. 发送消息

    • tryToFindTopicPublishInfo
      当producer调用send()会判断是否有路由信息,没有则去更新路由信息,最终还是调用updateTopicRouteInfoFromNameServer.
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
            TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
                this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
            }
    
            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
                return topicPublishInfo;
            } else {
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
                return topicPublishInfo;
            }
        }
    
    • selectOneMessageQueue

      选择messageQueue传入topic发送端topicPublishInfo。queueDatas存放主题中所有队列信息,brokerDatas保存主题相关所有Broker信息。客户端选定队列后,在对应的QueueData找到BrokerName同时可以得到BrokerData,最终确定好MessageQueue(BrokerId和Address)。

    public MessageQueue selectOneMessageQueue() {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            return this.messageQueueList.get(pos);
        }
    

    四、NameServer处理客户端请求
    DefaultRequestProcessor#processRequest
    客户端请求获取路由信息,请求Code是Get_RouteInfo_By_Topic。接着处理客户端获取的请求,调用pickupTopicRouteData,核心流程主要根据topic获取QueueData获取BrokerNameSet,遍历Set后得到BrokerData最终返回最新的topicRouteData。

         try {
                   this.lock.readLock().lockInterruptibly();
                    List<QueueData> queueDataList = this.topicQueueTable.get(topic);
                    if (queueDataList != null) {
                        topicRouteData.setQueueDatas(queueDataList);
                        foundQueueData = true;
                        Iterator<QueueData> it = queueDataList.iterator();
                        while (it.hasNext()) {
                            QueueData qd = it.next();
                            brokerNameSet.add(qd.getBrokerName());
                        }
                        for (String brokerName : brokerNameSet) {
                            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                            if (null != brokerData) {
                                BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
                                    .getBrokerAddrs().clone());
                                brokerDataList.add(brokerDataClone);
                                foundBrokerData = true;
                                for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                                    List<String> filterServerList = this.filterServerTable.get(brokerAddr);
                                    filterServerMap.put(brokerAddr, filterServerList);
                                }
                            }
                        }
                    }
                } finally {
                    this.lock.readLock().unlock();
                }
    

    相关文章

      网友评论

          本文标题:RocketMQ系列1:元数据通信

          本文链接:https://www.haomeiwen.com/subject/ejhonktx.html