美文网首页
RocketMQ消息引擎关于autoCreateTopicEna

RocketMQ消息引擎关于autoCreateTopicEna

作者: 丑人林宗己 | 来源:发表于2019-06-16 09:09 被阅读0次

最近整个消息队列在计划重新搭建,主要是基于前文提到之前消息队列上使用的不合理所造成,故而本次改动面积较大,为了减少工作量注意到brokerconfig上有个关键字autoCreateTopicEnable

这个字段主要字面语义是提供自动生成topic的能力,比如程序只需要写好消息producerconsumer即可,剩下的创建topic的动作交由服务端自动完成。

但是并不敢直接用,因而需要了解一下这个字段具体能做什么,不能做什么,才能决定是否投入使用。

  • 创建默认Topic是由producer决定,还是consumer决定?即启动的时候应该先启动哪个?
  • 创建之后机器之间是否存在topic同步,主主之间同步,主从之间同步?

TopicConfigManager

构造器

找到BrokerConfig#isAutoCreateTopicEnable()入口,看到TopicConfigManager的构造器中有如下代码

// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
    String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; // TBW102
    TopicConfig topicConfig = new TopicConfig(topic);
    this.systemTopicList.add(topic);
    topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());// 默认值是8
    topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());// 默认值是8
    int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;// 可允许继承、可读、可写等权限’
    topicConfig.setPerm(perm);
    this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}

如果broker配置了开启autoCreateTopicEnable,则默认生成一个TopicTBW102的配置项,并放入topicConfigTable。初始化部分到此完毕。

createTopicInSendMessageMethod()

注意看该方法的名称,语义上可以做如此猜测:在发送消息的方法中创建Topic

if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
    try {
        topicConfig = this.topicConfigTable.get(topic); // 根据topic从内存中查找topicConfig
        if (topicConfig != null)
            return topicConfig; // 找得到就返回

        TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);// 找不到根据defaultTopic继续往下操作
        if (defaultTopicConfig != null) {
            if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {// 如果默认的topic为TBW102
                if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                    defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);// 设置权限
                }
            }

            if (PermName.isInherited(defaultTopicConfig.getPerm())) {// 如果topic允许继承,此处假设默认topic是MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC,那么是允许可继承,即此处为true
                topicConfig = new TopicConfig(topic);

                int queueNums =
                    clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
                        .getWriteQueueNums() : clientDefaultTopicQueueNums;/// 取写队列大小,注意去传入的队列大小进行对比,二者取小,默认队列时8

                if (queueNums < 0) {
                    queueNums = 0;
                }

                topicConfig.setReadQueueNums(queueNums);
                topicConfig.setWriteQueueNums(queueNums);
                int perm = defaultTopicConfig.getPerm();// 默认三种全年
                perm &= ~PermName.PERM_INHERIT;// 去反 后在取与
                topicConfig.setPerm(perm);
                topicConfig.setTopicSysFlag(topicSysFlag);// 
                topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());//
            } else {
                log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
                    defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
            }
        } else {
            log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]",
                defaultTopic, remoteAddress);
        }

        if (topicConfig != null) {
            log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",defaultTopic, topicConfig, remoteAddress);
            this.topicConfigTable.put(topic, topicConfig);
            this.dataVersion.nextVersion();
            createNew = true;
            this.persist(); // 持久化至配置文件,config目录下有个topics.json文件
        }
    } finally {
        this.lockTopicConfigTable.unlock();
    }
}

该方法的核心问题在于传入的defaultTopic值,是不是期待中的MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC,如果是,那么autoCreateTopicEnable得核心流程就是如此,创建成功后写入到本地json文件。

沿着方法引用继续往上找可以看到,defulltTopic是从SendMessageRequestHeader,意味着该值由客户端上传。

DefaultMQProducerImpl#sendKernelImpl()

按照过往经验,可以指定将发送消息的方法定位到DefaultMQProducerImpl#sendKernelImpl().

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());// defaultTopic ->MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); // 4
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);

由此可见,发送消息的默认Topic确实是MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC,那么前文的假设即成立。

isAutoCreateTopicEnable()方法作为入口查看,可以得出自动创建topicconsumer的启动没有任何关系,即cunsumer注册时并不会触发自动创建topic

那么同步的问题?

主从同步

RocketMQ自身提供主从复制,但其实一直未多数人所诟病的是无法实现自动选举,即Master宕机后Slave无法晋升为Master,而官方也明确,Slave提供的核心能力是为Master分摊consumer的压力,那么对于是否创建topic有无影响?

答案是无。

通过之前的文章可以看到RocketMQ的底层存储模型设计,HAClient启动线程不断从Master读取数据,并写入


int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
    lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
    readSizeZeroTimes = 0;
    boolean result = this.dispatchReadRequest();
    if (!result) {
        log.error("HAClient, dispatchReadRequest error");
        return false;
    }
} else if (readSize == 0) {
    if (++readSizeZeroTimes >= 3) {
        break;
    }
} else {
    log.info("HAClient, processReadEvent read socket < 0");
    return false;
}

///// 
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

HAClientSocket中将数据读取至private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);READ_MAX_BUFFER_SIZE大小为4kb。读取之后接着写到到本地commitLog中,而本地又有线程维护consumerQueue,故而足以达到消息消费的需求。

而这个过程也不见同步其他的诸如topics.config。如果RocketMQ要解决主备切换晋升的问题,那么必须尽可能保证主从之间的状态一致性问题。

值得一提的事,RocketMQ的主从复制直接使用Java NIO实现,而不是基于Java NIO封装的Netty,这种做不需要进行额外操作,比如编解码。

主主同步

可以看到,创建成功之后,只做了本地的持久化(写入到本地json文件),那么启用此队列有可能造成两个Master之间状态不一致,如果长期不一致即会导致所有消息都跑到某一个Master,无法达到负载均衡的效果。

纵观RocketMQ的架构,Master与Master之间并不会存在状态同步的操作,并且MasterMaster所存储的数据也理应是不合理的。双Master注册到namesrv上,消息投递时根据负载均衡策略则其一而投递。

那么,当消息投递时,如果开启了autoCreateTopicEnable。消息会投递到其中某一台Master Broker,与此同时,该Master Broker 会定时向namesrv上报topic路由信息。如果在这30s期间,消息没有继续投递,或者投递时由于负载均衡策略的原因没有及时投递到另外的Master Broker,那么毫无疑问,另外一台Master Broker将永远不会自动创建对应的Topic,而producenamesrv读取到新创建的topic的路由信息只有一台Master Broker,之后的消息投递可能而知将永远投递到该Master Broker.

故此得出结论,autoCreateTopicEnable仅仅适用于非生产环境。

相关文章

网友评论

      本文标题:RocketMQ消息引擎关于autoCreateTopicEna

      本文链接:https://www.haomeiwen.com/subject/zxrifctx.html