美文网首页
rocketmq之同一个进程中能不能启动多个producer或者

rocketmq之同一个进程中能不能启动多个producer或者

作者: kele2018 | 来源:发表于2020-12-02 16:51 被阅读0次

    以consumer为例来记录下这个问题。
    1、在consumer的启动过程中,有两个挡板,第一个是DefaultMQPushConsumerImpl,第二个是MQClientInstance,就是说,假如两个consumer使用的是同一个DefaultMQPushConsumerImpl对象,那么是不能启动成功的;第二个亦然;挡板有先后顺序,第一个挡住了,就到不了第二个了。下面记下第一个挡板的实现:

    //只有当DefaultMQPushConsumerImpl对象的serviceState属性的值为CREATE_JUST,才会接着做启动的工作:开启一些线程,初始化一些数据;
    public synchronized void start() throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                    .......
                    this.serviceState = ServiceState.RUNNING;
                    .......
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }
        }
    

    DefaultMQPushConsumerImpl对象是在DefaultMQPushConsumer对象初始化的时候new出来的,所以想要不一样,使用不同的DefaultMQPushConsumer对象即可;
    2、第二个挡板的实现方法和第一个是一样的,下面重点记下两个consumer如何才能拿到不同的MQClientInstance对象;MQClientInstance对象是在过了第一个挡板后初始化的,记下初始化代码:

    his.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
    

    可以看到是先拿到一个单例的MQClientManager对象,然后再获取MQClientInstance对象,接着看下代码:

    public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
            //构建clientId
            String clientId = clientConfig.buildMQClientId();
            //第一次进来拿到的是null   假如我们在同一个进程中启动多个生产者,假如clientId一样,那么使用的MQClientInstance就是一样的
            MQClientInstance instance = this.factoryTable.get(clientId);
            if (null == instance) {
            
                instance =
                    new MQClientInstance(clientConfig.cloneClientConfig(),
                        this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
                //因为所有线程拿到的都是同一个MQClientManager对象,所以MQClientManager对象中的数据是所有线程共享的
                MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
                if (prev != null) {
                    instance = prev;
                    log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
                } else {
                    log.info("Created new MQClientInstance for clientId:[{}]", clientId);
                }
            }
    
            return instance;
        }
    

    可以看到,能不能拿到不同的MQClientInstance对象,关键在于clientId的构建,看下代码:

    public String buildMQClientId() {
            StringBuilder sb = new StringBuilder();
            sb.append(this.getClientIP());
    
            sb.append("@");
            sb.append(this.getInstanceName());
            if (!UtilAll.isBlank(this.unitName)) {
                sb.append("@");
                sb.append(this.unitName);
            }
    
            return sb.toString();
        }
    

    可以看到,假如我们在构建DefaultMQPushConsumer对象的时候,指定不同的unitName属性,就可以构建出不同的clientId,从而拿到不同的MQClientInstance,最终完成在同一个进程中启动两个consumer。

    相关文章

      网友评论

          本文标题:rocketmq之同一个进程中能不能启动多个producer或者

          本文链接:https://www.haomeiwen.com/subject/tvhuwktx.html