美文网首页
rocketmq之同一个进程中能不能启动多个producer或者

rocketmq之同一个进程中能不能启动多个producer或者

作者: kele2018 | 来源:发表于2020-12-02 16:51 被阅读0次

以consumer为例来记录下这个问题。
1、在consumer的启动过程中,有两个挡板,第一个是DefaultMQPushConsumerImpl,第二个是MQClientInstance,就是说,假如两个consumer使用的是同一个DefaultMQPushConsumerImpl对象,那么是不能启动成功的;第二个亦然;挡板有先后顺序,第一个挡住了,就到不了第二个了。下面记下第一个挡板的实现:

//只有当DefaultMQPushConsumerImpl对象的serviceState属性的值为CREATE_JUST,才会接着做启动的工作:开启一些线程,初始化一些数据;
public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                .......
                this.serviceState = ServiceState.RUNNING;
                .......
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
    }

DefaultMQPushConsumerImpl对象是在DefaultMQPushConsumer对象初始化的时候new出来的,所以想要不一样,使用不同的DefaultMQPushConsumer对象即可;
2、第二个挡板的实现方法和第一个是一样的,下面重点记下两个consumer如何才能拿到不同的MQClientInstance对象;MQClientInstance对象是在过了第一个挡板后初始化的,记下初始化代码:

his.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

可以看到是先拿到一个单例的MQClientManager对象,然后再获取MQClientInstance对象,接着看下代码:

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        //构建clientId
        String clientId = clientConfig.buildMQClientId();
        //第一次进来拿到的是null   假如我们在同一个进程中启动多个生产者,假如clientId一样,那么使用的MQClientInstance就是一样的
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
        
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            //因为所有线程拿到的都是同一个MQClientManager对象,所以MQClientManager对象中的数据是所有线程共享的
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }

可以看到,能不能拿到不同的MQClientInstance对象,关键在于clientId的构建,看下代码:

public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());

        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }

        return sb.toString();
    }

可以看到,假如我们在构建DefaultMQPushConsumer对象的时候,指定不同的unitName属性,就可以构建出不同的clientId,从而拿到不同的MQClientInstance,最终完成在同一个进程中启动两个consumer。

相关文章

网友评论

      本文标题:rocketmq之同一个进程中能不能启动多个producer或者

      本文链接:https://www.haomeiwen.com/subject/tvhuwktx.html