美文网首页
RocketMQ源码之Producer获取topicPublis

RocketMQ源码之Producer获取topicPublis

作者: 激情的狼王 | 来源:发表于2018-04-25 15:08 被阅读0次

    RocketMQ架构解析中我们了解到了RocketMQ的架构设计原理,接下来我们根据架构图来解析各个步骤的源码,探索RocketMQ是怎么实现相关功能的,从Producer发送消息开始。
    下面是Producer发送一条消息的流程图

    01.png

    1.从本地获取该条消息应该发送给哪个broker,对应的topic等等信息
    2.如果获取不到,就通过与NameSrv交互进行获取
    3.获取到相关信息后,进行消息发送,返回结果

    本文重点来看获取topicInfo的方法tryToFindTopicPublishInfo

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
            //步骤1.1
            TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            //步骤1.2
                this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //步骤1.3
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
            }
    
            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
                return topicPublishInfo;
            } else {
            //步骤1.4
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
                return topicPublishInfo;
            }
    }
    

    1.首先从topicPublishInfoTable(本地存储的路由信息表)中获取
    2.如果获取不到,先在topicPublishInfoTable插入一个new出来的TopicPublishInfo,key为topic
    3.然后通过updateTopicRouteInfoFromNameServer方法从NameServer中获取相关信息
    4.如果NameServer中也拿不到,使用 {DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。目的是当 Broker 开启自动创建 Topic开关时,Broker 接收到消息后自动创建Topic

    步骤1.1、1.2就是操作topicPublishInfoTable的get和putIfAbsent方法,不再细分,我们来看步骤1.3

        public boolean updateTopicRouteInfoFromNameServer(final String topic) {
            return updateTopicRouteInfoFromNameServer(topic, false, null);
        }
        public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
                DefaultMQProducer defaultMQProducer) {
                try {
                    if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                        try {
                            TopicRouteData topicRouteData;
    //步骤1.3.1:判断条件
                            if (isDefault && defaultMQProducer != null) {
                                topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                                    1000 * 3);
                                if (topicRouteData != null) {
                                    for (QueueData data : topicRouteData.getQueueDatas()) {
                                        int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                        data.setReadQueueNums(queueNums);
                                        data.setWriteQueueNums(queueNums);
                                    }
                                }
                            } else {
    //步骤1.3.2:通过MQClient来获取该topic对应的在Namesrv的信息
                                topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                            }
                            if (topicRouteData != null) {
                                TopicRouteData old = this.topicRouteTable.get(topic);
                                boolean changed = topicRouteDataIsChange(old, topicRouteData);
                                if (!changed) {
                                    changed = this.isNeedUpdateTopicRouteInfo(topic);
                                } else {
                                    log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                                }
    //步骤1.3.3:更新TopicRouteData 和相关信息
                                if (changed) {
                                    TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
    
                                    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                        this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                                    }
    
                                    // Update Pub info
                                    {
                                        TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                        publishInfo.setHaveTopicRouterInfo(true);
                                        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                        while (it.hasNext()) {
                                            Entry<String, MQProducerInner> entry = it.next();
                                            MQProducerInner impl = entry.getValue();
                                            if (impl != null) {
                                                impl.updateTopicPublishInfo(topic, publishInfo);
                                            }
                                        }
                                    }
    
                                    // Update sub info
                                    {
                                        Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                        while (it.hasNext()) {
                                            Entry<String, MQConsumerInner> entry = it.next();
                                            MQConsumerInner impl = entry.getValue();
                                            if (impl != null) {
                                                impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                            }
                                        }
                                    }
                                    log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                                    this.topicRouteTable.put(topic, cloneTopicRouteData);
                                    return true;
                                }
                            } else {
                                log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                            }
                        } catch (Exception e) {
                            if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                                log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                            }
                        } finally {
                            this.lockNamesrv.unlock();
                        }
                    } else {
                        log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
                    }
                } catch (InterruptedException e) {
                    log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                }
    
                return false;
            }
    

    这个方法的三个主要步骤已经在源码里注释了,最后通过MQClient来访问Namesrv来获取topicRouteData,由于这是第一次进行发送信息,所以Namesrv是空的,获取不到。

    执行完1.3的源码,由于此时topicPublishInfo 还是不满足条件所以会进入1.4步骤

    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    

    这个方法调用的就是1.3的方法,只是入参不同而已,此时会进入步骤1.3.1和1.3.2之间的代码

         topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                                    1000 * 3);
         if (topicRouteData != null) {
                   for (QueueData data : topicRouteData.getQueueDatas()) {
                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                    }
         }
    

    这时将会请求Namesrv并返回{DefaultMQProducer#createTopicKey} 对应的topicRouteData ,然后再根据入参传进来的defaultMQProducer的配置初始化topicRouteData的QueueDatas配置,之后执行步骤1.3.3后面的代码

    //Update Pub info
    {
        TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
        publishInfo.setHaveTopicRouterInfo(true);
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQProducerInner> entry = it.next();
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                impl.updateTopicPublishInfo(topic, publishInfo);
            }
        }
    }
    

    注意:我们最终是想得到TopicPublishInfo的,在这里循环this.producerTable,然后把TopicPublishInfo赋值给了producerTable里的每一个impl,此时我们需要搞清楚这几个东西之间的关系和含义了

    一个生产者拥有一个topicPublishInfoTable

    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
            new ConcurrentHashMap<String, TopicPublishInfo>();
    

    topicPublishInfoTable里是key为topic,value为TopicPublishInfo的元素
    一个生产者组拥有一个producerTable

    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    

    key为生产者组名称,value为MQProducerInner,对应着一个生产者

    我们再来看上述循环的核心代码
    impl.updateTopicPublishInfo(topic, publishInfo);

    public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
            if (info != null && topic != null) {
                TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);
                if (prev != null) {
                    log.info("updateTopicPublishInfo prev is not null, " + prev.toString());
                }
            }
        }
    

    在这里,为生产者的topicPublishInfoTableput进去了我们所需要的TopicPublishInfo,这时我们在执行步骤1.4后续代码时就可以拿到我们要的TopicPublishInfo了。

    获取TopicPublishInfo总结

    1.先拿本地内存中该topic的TopicPublishInfo
    2.本地内存没有,查询Namesrv
    3.如果Namesrv中也没用,就使用默认的DefaultMQProducer对应的TopicPublishInfo
    4.发送成功后,当 Broker 开启自动创建 Topic开关时,Broker 接收到消息后自动创建Topic,这时候,该生产者的TopicPublishInfo就会存在于broker和Namesrv中了。

    上述就是sendMessage获取TopicPublishInfo对象的分析了。

    相关文章

      网友评论

          本文标题:RocketMQ源码之Producer获取topicPublis

          本文链接:https://www.haomeiwen.com/subject/mgvelftx.html