最近整个消息队列在计划重新搭建,主要是基于前文提到之前消息队列上使用的不合理所造成,故而本次改动面积较大,为了减少工作量注意到brokerconfig
上有个关键字autoCreateTopicEnable
这个字段主要字面语义是提供自动生成topic
的能力,比如程序只需要写好消息producer
或consumer
即可,剩下的创建topic
的动作交由服务端自动完成。
但是并不敢直接用,因而需要了解一下这个字段具体能做什么,不能做什么,才能决定是否投入使用。
- 创建默认Topic是由producer决定,还是consumer决定?即启动的时候应该先启动哪个?
- 创建之后机器之间是否存在topic同步,主主之间同步,主从之间同步?
TopicConfigManager
构造器
找到BrokerConfig#isAutoCreateTopicEnable()
入口,看到TopicConfigManager
的构造器中有如下代码
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; // TBW102
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());// 默认值是8
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());// 默认值是8
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;// 可允许继承、可读、可写等权限’
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
如果broker配置了开启autoCreateTopicEnable
,则默认生成一个Topic
为TBW102
的配置项,并放入topicConfigTable
。初始化部分到此完毕。
createTopicInSendMessageMethod()
注意看该方法的名称,语义上可以做如此猜测:在发送消息的方法中创建Topic
。
if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(topic); // 根据topic从内存中查找topicConfig
if (topicConfig != null)
return topicConfig; // 找得到就返回
TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);// 找不到根据defaultTopic继续往下操作
if (defaultTopicConfig != null) {
if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {// 如果默认的topic为TBW102
if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);// 设置权限
}
}
if (PermName.isInherited(defaultTopicConfig.getPerm())) {// 如果topic允许继承,此处假设默认topic是MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC,那么是允许可继承,即此处为true
topicConfig = new TopicConfig(topic);
int queueNums =
clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
.getWriteQueueNums() : clientDefaultTopicQueueNums;/// 取写队列大小,注意去传入的队列大小进行对比,二者取小,默认队列时8
if (queueNums < 0) {
queueNums = 0;
}
topicConfig.setReadQueueNums(queueNums);
topicConfig.setWriteQueueNums(queueNums);
int perm = defaultTopicConfig.getPerm();// 默认三种全年
perm &= ~PermName.PERM_INHERIT;// 去反 后在取与
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);//
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());//
} else {
log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
}
} else {
log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]",
defaultTopic, remoteAddress);
}
if (topicConfig != null) {
log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",defaultTopic, topicConfig, remoteAddress);
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
createNew = true;
this.persist(); // 持久化至配置文件,config目录下有个topics.json文件
}
} finally {
this.lockTopicConfigTable.unlock();
}
}
该方法的核心问题在于传入的defaultTopic
值,是不是期待中的MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
,如果是,那么autoCreateTopicEnable
得核心流程就是如此,创建成功后写入到本地json文件。
沿着方法引用继续往上找可以看到,defulltTopic
是从SendMessageRequestHeader
,意味着该值由客户端上传。
DefaultMQProducerImpl#sendKernelImpl()
按照过往经验,可以指定将发送消息的方法定位到DefaultMQProducerImpl#sendKernelImpl()
.
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());// defaultTopic ->MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); // 4
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
由此可见,发送消息的默认Topic确实是MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC,那么前文的假设即成立。
从isAutoCreateTopicEnable()
方法作为入口查看,可以得出自动创建topic
跟consumer
的启动没有任何关系,即cunsumer
注册时并不会触发自动创建topic
。
那么同步的问题?
主从同步
RocketMQ
自身提供主从复制,但其实一直未多数人所诟病的是无法实现自动选举,即Master宕机后Slave无法晋升为Master,而官方也明确,Slave提供的核心能力是为Master分摊consumer
的压力,那么对于是否创建topic
有无影响?
答案是无。
通过之前的文章可以看到RocketMQ
的底层存储模型设计,HAClient
启动线程不断从Master读取数据,并写入
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
readSizeZeroTimes = 0;
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
/////
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
HAClient
从Socket
中将数据读取至private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
,READ_MAX_BUFFER_SIZE
大小为4kb
。读取之后接着写到到本地commitLog
中,而本地又有线程维护consumerQueue
,故而足以达到消息消费的需求。
而这个过程也不见同步其他的诸如topics.config
。如果RocketMQ
要解决主备切换晋升的问题,那么必须尽可能保证主从之间的状态一致性问题。
值得一提的事,RocketMQ
的主从复制直接使用Java NIO
实现,而不是基于Java NIO
封装的Netty
,这种做不需要进行额外操作,比如编解码。
主主同步
可以看到,创建成功之后,只做了本地的持久化(写入到本地json文件),那么启用此队列有可能造成两个Master
之间状态不一致,如果长期不一致即会导致所有消息都跑到某一个Master
,无法达到负载均衡的效果。
纵观RocketMQ
的架构,Master与Master之间并不会存在状态同步的操作,并且Master
与Master
所存储的数据也理应是不合理的。双Master
注册到namesrv
上,消息投递时根据负载均衡策略则其一而投递。
那么,当消息投递时,如果开启了autoCreateTopicEnable
。消息会投递到其中某一台Master Broker
,与此同时,该Master Broker
会定时向namesrv
上报topic
路由信息。如果在这30s期间,消息没有继续投递,或者投递时由于负载均衡策略的原因没有及时投递到另外的Master Broker
,那么毫无疑问,另外一台Master Broker
将永远不会自动创建对应的Topic,而produce
从namesrv
读取到新创建的topic
的路由信息只有一台Master Broker
,之后的消息投递可能而知将永远投递到该Master Broker
.
故此得出结论,autoCreateTopicEnable
仅仅适用于非生产环境。
网友评论