美文网首页
RocketMQ自动创建topic

RocketMQ自动创建topic

作者: hei禹 | 来源:发表于2020-01-26 21:29 被阅读0次

    在之前的文章《在IDEA中debug NameSrv、Broker、Producer、Consumer》中,我们debug Producer测试发送时,遇到过一个问题:Broker启动时我们没有配置NameSrv地址,发送程序会报错:No route info of this topic。但当我们配上NameSrv地址后,再次启动,可以正常发送消息。

    example.quickstart.Producer的代码是:

    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.start();
    
    Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
    );
    
    producer.send(msg);
    

    TopicTest之前并未创建过,Broker未配置NameSrv地址,无法发送,而配置NameSrv后则可以正常发送。这中间有2个问题:
    1、topic是怎么自动创建的?
    2、topic自动创建过程中Broker、NameSrv如何协作配合的?

    下面我们开始分析下这个流程。想看结论的可以直接跳到文章最后面。

    1、DefaultMQProducerImpl#sendDefaultImpl

    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        
        // 省略校验相关代码
    
        // 关键代码,如果获取到topic的路由信息,则发送,否则抛异常
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            // 省略发送相关代码
        }
    
        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
        }
    
        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }
    

    tryToFindTopicPublishInfo是发送的关键,如果获取到topic的信息,则发送,否则就异常。因此之前No route info of this topic的异常,就是Producer获取不到TopicTest的信息,导致发送失败。

    那这跟Broker配没配NameSrv地址有什么关系呢,我们接着往下看:

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        // topicPublishInfoTable是Producer本地缓存的topic信息表
        // Producer启动后,会添加默认的topic:TBW102
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            // 未获取到,从NameSrv获取该topic的信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
    
        // 获取到了,则返回
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            // 没获取到,再换种方式从NameSrv获取
            // 如果再获取不到,那后续就无法发送了
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
    

    1、Producer本地topicPublishInfoTable变量中没有TopicTest的信息,只缓存了TBW102(可以认为只有key(topic),实际也无详细信息,还是要从NameSrv获取)

    2、尝试从NameSrv获取TopicTest的信息。获取失败,NameSrv中根本没有TopicTest,因为这个topic是Producer发送时设置的,没有同步到NameSrv。

    3、再换种方式从NameSrv获取。这里就很关键了,如果获取到了,那么可以执行发送流程,如果还是没有获取到,就会抛No route info of this topic的异常了。

    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 将入参的topic转换为默认的TBW102,获取TBW102的信息
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        
                        // 省略其他代码
                        
                    } else {
                        // 直接用入参的topic去获取
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                    
                    // 省略其他代码
                    
                } catch (Exception e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }
    
        return false;
    }
    

    1、第1次获取时,isDefault传的false,defaultMQProducer传的null,因此在updateTopicRouteInfoFromNameServer会走else分支,用TopicTest去获取

    2、第2次获取时,isDefault传的true,defaultMQProducer也传值了,因此会走if分支,将入参的topic转换为默认的TBW102,获取TBW102的信息

    到这里,大概就能建立如下的推断:
    1、不管Broker配没配NameSrv地址,获取TopicTest的信息,必失败
    2、获取TBW102信息:
    2.1、Broker配置了NameSrv地址,成功
    2.2、Broker没有配置NameSrv地址,失败

    那我们进入NameSrv的源码,看看为什么如此。

    2、RouteInfoManager#pickupTopicRouteData

    updateTopicRouteInfoFromNameServer最终会发给NameSrv一个GET_ROUTEINTO_BY_TOPIC请求

    public TopicRouteData pickupTopicRouteData(final String topic) {
        TopicRouteData topicRouteData = new TopicRouteData();
        boolean foundQueueData = false;
        boolean foundBrokerData = false;
        // 省略其他代码
    
        try {
            try {
                this.lock.readLock().lockInterruptibly();
                // 从topicQueueTable获取topic信息
                // 有则有,无则null
                List<QueueData> queueDataList = this.topicQueueTable.get(topic);
                if (queueDataList != null) {
                    topicRouteData.setQueueDatas(queueDataList);
                    foundQueueData = true;
    
                    // 省略其他代码
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("pickupTopicRouteData Exception", e);
        }
    
        log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
    
        if (foundBrokerData && foundQueueData) {
            return topicRouteData;
        }
    
        return null;
    }
    

    NameSrv从本地变量topicQueueTable中获取对应的topic信息,没有则会返回null。由此可以推断出:
    1、NameSrv本地没有TopicTest的信息(显而易见)
    2、NameSrv本地记录了TBW102的topic信息

    那TBW102的topic信息,NameSrv又是从哪里获取并缓存到本地的呢?

    答案:来自REGISTER_BROKER请求

    case RequestCode.REGISTER_BROKER:
        Version brokerVersion = MQVersion.value2Version(request.getVersion());
        if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
            return this.registerBrokerWithFilterServer(ctx, request);
        } else {
            return this.registerBroker(ctx, request);
        }
    

    3、BrokerController#start

    REGISTER_BROKER请求又是谁发给NameSrv的呢?很简单,Broker发的。Broker在启动时,会向NameSrv注册,同时有一个定时任务会定时上报

    public void start() throws Exception {
        
        // ...
        
        this.registerBrokerAll(true, false, true);
    
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
        
        // ...
    }
    

    上报的内容有哪些呢?

    # BrokerOuterAPI#registerBrokerAll
    RegisterBrokerBody requestBody = new RegisterBrokerBody();
    requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
    
    # TopicConfigSerializeWrapper
    public class TopicConfigSerializeWrapper extends RemotingSerializable {
        private ConcurrentMap<String, TopicConfig> topicConfigTable;
        private DataVersion dataVersion = new DataVersion();
        
        // ...
    }
    

    上报的内容包括TopicConfigSerializeWrapper,它的结构其实跟${user.home}\store\config\topics.json是一样的:

    {
        "dataVersion":{
            "counter":2,
            "timestamp":1579838252574
        },
        "topicConfigTable":{
            "SELF_TEST_TOPIC":Object{...},
            "DefaultCluster":Object{...},
            "RMQ_SYS_TRANS_HALF_TOPIC":Object{...},
            "DESKTOP-FJIT15L":Object{...},
            "TBW102":{
                "order":false,
                "perm":7,
                "readQueueNums":8,
                "topicFilterType":"SINGLE_TAG",
                "topicName":"TBW102",
                "topicSysFlag":0,
                "writeQueueNums":8
            },
            "BenchmarkTest":Object{...},
            "OFFSET_MOVED_EVENT":Object{...}
        }
    }
    

    TopicConfigManager的构造函数中,默认创建了上面的topic,其中就有TBW102,这些都是在Broker启动的时候就完成了。

    因此,当Producer用TBW102去NameSrv获取topic信息时,是可以获取的。因为TBW102是Broker启动时默认创建的,Broker启动时会向NameSrv注册。这也是为什么Broker没配NameSrv时,获取不到TBW102的topic信息。

    那获取到了TBW102的topic信息,跟TopicTest又有什么关系呢?TopicTest的信息还是没有啊。让我们继续往下看:

    再回到MQClientInstance#updateTopicRouteInfoFromNameServer:

    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
        // ...
    
        TopicRouteData topicRouteData;
        if (isDefault && defaultMQProducer != null) {
            // ①第2次获取TopicTest的信息会走到这里,先转换topic,实际获取的是TBW102
            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                1000 * 3);
            // 获取成功,修正读写队列数    
            if (topicRouteData != null) {
                for (QueueData data : topicRouteData.getQueueDatas()) {
                    int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                    data.setReadQueueNums(queueNums);
                    data.setWriteQueueNums(queueNums);
                }
            }
        } else {
            topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
        }
        
        if (topicRouteData != null) {
            // old为null
            TopicRouteData old = this.topicRouteTable.get(topic);
            // changed为true
            boolean changed = topicRouteDataIsChange(old, topicRouteData);
            if (!changed) {
                changed = this.isNeedUpdateTopicRouteInfo(topic);
            } else {
                log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
            }
        
            // ②走到该逻辑
            if (changed) {
                TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
        
                for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                    this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                }
        
                // Update Pub info
                {
                    // ③将topic的route信息转换为publish信息。实际是用了TBW102的route信息,给TopicTest用
                    TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                    publishInfo.setHaveTopicRouterInfo(true);
                    
                    Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<String, MQProducerInner> entry = it.next();
                        MQProducerInner impl = entry.getValue();
                        if (impl != null) {
                            // ④更新到本地
                            impl.updateTopicPublishInfo(topic, publishInfo);
                        }
                    }
                }
        
                // ...
            }
        }
        
        // ...
    }
    

    在第2步获取到的数据结构为:


    TBW102的TopicRouteData数据结构.png

    第3步,转换后的数据结构为:


    TopicTest的TopicPublishInfo数据结构.png

    更新本地变量:topicPublishInfoTable

    public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
        if (info != null && topic != null) {
            TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);
            if (prev != null) {
                log.info("updateTopicPublishInfo prev is not null, " + prev.toString());
            }
        }
    }
    

    获取TBW102的topic信息,当成是TopicTest的。也可以说是,TopicTest继承了TBW102的配置信息。因此TopicTest的信息就有了。

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        // Producer启动后,会添加默认的topic:TBW102,但具体的信息还是没有,需要从NameSrv获取
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            // 未获取到TopicTest,从NameSrv获取该topic的信息,还是未获取到
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
    
    
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            // 没获取到,向NameSrv获取TBW102的信息,当成是TopicTest的信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            // 这次有TopicTest的信息了
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
    

    分析完上面所有的逻辑后,就到了发送消息的步骤。

    4、发送流程

    发送消息的请求到达Broker后,会有一步msgCheck的过程

    protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
    
        // ...
    
        // ①Broker本地没有TopicTest,得到null
        TopicConfig topicConfig =
            this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
        if (null == topicConfig) {
            
            // ...
    
            // ②自动创建topic,实际就是创建一个topicConfig对象,存放到本地map,并同步到NameSrv
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
                requestHeader.getTopic(),
                requestHeader.getDefaultTopic(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
    
            // ...
    
            // ③如果步骤2失败了,那就会报TOPIC_NOT_EXIST的错误
            if (null == topicConfig) {
                response.setCode(ResponseCode.TOPIC_NOT_EXIST);
                response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
                    + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
                return response;
            }
        }
    
        // ...
        
        return response;
    }
    

    msgCheck做了如下校验:
    1、从本地获取TopicTest的信息,得到null
    2、自动创建topic,实际就是创建一个topicConfig对象,存放到本地map,并同步到NameSrv。如果topic已经缓存在本地map了,就直接返回,不需要创建
    3、如果步骤2失败了,那就会报TOPIC_NOT_EXIST的错误,就不会进行下面的消息刷盘落地

    下面看下createTopicInSendMessageMethod的流程:

    public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
        final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
        
        // ...
        
        // 一开始topicConfigTable还未缓存TopicTest
        topicConfig = this.topicConfigTable.get(topic);
        if (topicConfig != null)
            return topicConfig;
    
        // 还是要依赖TBW102,来建立TopicTest的TopicConfig对象
        TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
        if (defaultTopicConfig != null) {
            if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                    defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
                }
            }
    
            if (PermName.isInherited(defaultTopicConfig.getPerm())) {
                topicConfig = new TopicConfig(topic);
    
                int queueNums =
                    clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
                        .getWriteQueueNums() : clientDefaultTopicQueueNums;
    
                if (queueNums < 0) {
                    queueNums = 0;
                }
    
                topicConfig.setReadQueueNums(queueNums);
                topicConfig.setWriteQueueNums(queueNums);
                int perm = defaultTopicConfig.getPerm();
                perm &= ~PermName.PERM_INHERIT;
                topicConfig.setPerm(perm);
                topicConfig.setTopicSysFlag(topicSysFlag);
                topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
            }
        }
    
        if (topicConfig != null) {
            log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]", defaultTopic, topicConfig, remoteAddress);
    
            // 缓存到本地
            this.topicConfigTable.put(topic, topicConfig);
    
            this.dataVersion.nextVersion();
    
            createNew = true;
    
            // 持久化到${user.home}\store\config\topics.json
            this.persist();
        }
    
        // 同步到NameSrv
        if (createNew) {
            this.brokerController.registerBrokerAll(false, true, true);
        }
    
        return topicConfig;
    }
    

    主要的逻辑是:
    1、topicConfigTable还未缓存TopicTest
    2、还是要依赖TBW102,来建立TopicTest的TopicConfig对象
    3、缓存到本地
    4、持久化到${user.home}\store\config\topics.json
    5、同步到NameSrv

    到这里,校验也通过了,下一步就是消息刷盘落地了,由于不在本文分析范围内,就不作展开了。

    5、TBW102是为何物?

    TBW102是Broker启动时,当autoCreateTopicEnable的配置为true时,会自动创建该默认topic

    public TopicConfigManager(BrokerController brokerController) {
        this.brokerController = brokerController;
    
        // ...
        
        {
            // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
            if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
                TopicConfig topicConfig = new TopicConfig(topic);
                this.systemTopicList.add(topic);
                topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
                topicConfig.setPerm(perm);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
        }
        
        // ...
    }
    

    autoCreateTopicEnable的默认值是true,可以同步外部配置文件,让Broker启动时加载,来改变该值。我理解的TBW102的作用是当开启自动创建topic功能,发送时用了未配置的topic,可以让该topic继承默认TBW102的配置,实现消息的发送。

    6、总结

    发送未配置topic的消息,流程图如下:


    发送未配置topic的消息流程图.png

    1、Broker配不配NameSrv的区别在于:配了NameSrv后,Broker会把启动默认创建的topic同步的NameSrv,而后续Producer发送时会向NameSrv查询topic信息,当查询未配置的topic信息时,Producer会将topic转换成默认的TBW102进行查询,让topic继承它的配置。

    2、Broker在开启autoCreateTopicEnable的配置后(默认是开启的),才会自动创建topic,同样是继承默认TBW102的配置。

    因此,要正常发送未配置topic的消息,有2个点:正确配置Broker的NameSrv地址,开启autoCreateTopicEnable。

    相关文章

      网友评论

          本文标题:RocketMQ自动创建topic

          本文链接:https://www.haomeiwen.com/subject/ojqsthtx.html