美文网首页RocketMQ
RocketMQ源码分析(三)生产者的启动

RocketMQ源码分析(三)生产者的启动

作者: 甘_ | 来源:发表于2021-04-03 03:22 被阅读0次

    RocketMQ 支持 3 种消息发送方式 :同 步(sync )、异步(async)、单向(oneway )。这些大家应该都比较了解了,我们从生产者的启动开始聊起。
    DefaultMQProducer 是默认的消息生产者实现,他实现了MQAdmin接口。

    生产者的启动,常规是用 DefaultMQProducerlmpl的start()方法进行追踪,以下为主要步骤:

    1. 检查生产者组是否符合要求,并改变生产者的实例名为进程ID。
    2. 创建 MQClientlnstance 实例 。 整个JVM实例中只存在一个MQClientManager 实
      ,同时维护了一个MQClientlnstance缓存表,最终通过clientId(ip+pid)生成一个实例。
    3. 向MQClientInstance注册,并加入其管理。
    4. 启动实例。
    public void start(final boolean startFactory) throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // 1.检查是否符合要求
                    this.checkConfig();
                    if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                        //改变生产者名字为进程id
                        this.defaultMQProducer.changeInstanceNameToPID();
                    }
                    // 通过MQClientManager获取mqclient工厂(同时创建生产者实例)
                    this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                    // 向mqclient工厂注册生产者
                    boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }
    省略很多代码……
    

    相关文章

      网友评论

        本文标题:RocketMQ源码分析(三)生产者的启动

        本文链接:https://www.haomeiwen.com/subject/riqwhltx.html