以consumer为例来记录下这个问题。
1、在consumer的启动过程中,有两个挡板,第一个是DefaultMQPushConsumerImpl,第二个是MQClientInstance,就是说,假如两个consumer使用的是同一个DefaultMQPushConsumerImpl对象,那么是不能启动成功的;第二个亦然;挡板有先后顺序,第一个挡住了,就到不了第二个了。下面记下第一个挡板的实现:
//只有当DefaultMQPushConsumerImpl对象的serviceState属性的值为CREATE_JUST,才会接着做启动的工作:开启一些线程,初始化一些数据;
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
.......
this.serviceState = ServiceState.RUNNING;
.......
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
DefaultMQPushConsumerImpl对象是在DefaultMQPushConsumer对象初始化的时候new出来的,所以想要不一样,使用不同的DefaultMQPushConsumer对象即可;
2、第二个挡板的实现方法和第一个是一样的,下面重点记下两个consumer如何才能拿到不同的MQClientInstance对象;MQClientInstance对象是在过了第一个挡板后初始化的,记下初始化代码:
his.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
可以看到是先拿到一个单例的MQClientManager对象,然后再获取MQClientInstance对象,接着看下代码:
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
//构建clientId
String clientId = clientConfig.buildMQClientId();
//第一次进来拿到的是null 假如我们在同一个进程中启动多个生产者,假如clientId一样,那么使用的MQClientInstance就是一样的
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
//因为所有线程拿到的都是同一个MQClientManager对象,所以MQClientManager对象中的数据是所有线程共享的
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
可以看到,能不能拿到不同的MQClientInstance对象,关键在于clientId的构建,看下代码:
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
可以看到,假如我们在构建DefaultMQPushConsumer对象的时候,指定不同的unitName属性,就可以构建出不同的clientId,从而拿到不同的MQClientInstance,最终完成在同一个进程中启动两个consumer。
网友评论