今天在公司的项目中遇到个奇怪的问题,项目中起了两个rocketmq consumer的线程来监听两个topic,刚好这两个topic的name server是不同的配置,本来线上已经运行了好几个月没问题,今天突然发现其中有一个topic的name server的ip居然写错了(根本不是一个合法的ip),这也能运行不出错,真是神奇。
其中代码大概是这样的:
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.subscribe(topic, tag);
consumer.setConsumeMessageBatchMaxSize(CONSUME_MESSAGE_BATCH_MAX_SIZE);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new Listener());
consumer.start();
LOGGER.info("tradeService start success");
}
这是一个bean的初始化会执行的代码,看起来没什么毛病,从线上日志看ip确实是错的,但是却能正常消费。于是追踪一下rocketmq consumer的源码:
从 consumer.start();开始
@Override
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
}
接着看
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}",
this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(),
this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory =
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,
this.rpcHook);
....
这里获取getAndCreateMQClientInstance是获取MQClientInstance,MQClientInstance可以理解为一个java与rocketmq的连接
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId); // 默认一个进程一个 MQClientInstance
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
} else {
// TODO log
}
}
return instance;
}
这里有个注释,默认情况下一个java进程只实例化一个MQClientInstance,看看怎么实现的:
/**
* 默认clientip + pid 做clientid .如果是单元模式,则加上 unitname.
* @return
*/
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if(!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
默认是哪ip+pid作为ConcurrentHashMap的key存一个MQClientInstance
看到这里猜想可能是这个导致的,但是为什么线上两个不同的rocketmq集群没有报错呢?问了下公司的中间件同学,原来这两个集群是由一个集群拆分而来,为了防止哪里的ip没有改掉而让两组name server可以互相访问。
这就解释的通了,通过断点调试也印证了这个猜想,最后为了防止两个集群彻底隔离开而没有通知到导致线上故障,这里加一行代码让每个集群维护各自的实例:
consumer.setInstanceName("TradeServiceListener");
给每个监听的线程设置MQClientInstance名,以区分是不同的实例。
网友评论