美文网首页
RocketMQ消息消费源码分析

RocketMQ消息消费源码分析

作者: hcq0514 | 来源:发表于2020-12-31 15:39 被阅读0次

    消息消费流程图

    当消息存储到broker后,启动消费者消费,每个消费者都是一个DefaultMQPushConsumer,都要实现consumeMessage方法,每次去拉取新消息,

    消费者启动初始化

    • DefaultMQPushConsumerImpl#start方法
        public synchronized void start() throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                            this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                    this.serviceState = ServiceState.START_FAILED;
    //检测配置是否合法
                    this.checkConfig();
    //复制订阅信息
                    this.copySubscription();
    //设置消费者客户端实例名称为进程ID
                    if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                        this.defaultMQPushConsumer.changeInstanceNameToPID();
                    }
    //创建MQClient实例用于请求broker端
                    this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
    //创建负载均衡器
                    this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                    this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                    this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                    this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    
                    this.pullAPIWrapper = new PullAPIWrapper(
                            mQClientFactory,
                            this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                    this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
    
                    if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                        this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                    } else {
                        switch (this.defaultMQPushConsumer.getMessageModel()) {
                            //消息消费广播模式,将消费进度保存在本地
                            case BROADCASTING:
                                this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            //消息消费集群模式,将消费进度保存在远端Broker,默认为集群模式
                            case CLUSTERING:
                                this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            default:
                                break;
                        }
                        this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                    }
                    this.offsetStore.load();
                    //创建顺序消息消费服务
                    if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                        this.consumeOrderly = true;
                        this.consumeMessageService =
                                new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                    }
                    //创建并发消息消费服务
                    else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                        this.consumeOrderly = false;
                        //把我们实例里面的consume方法注册到该方法里的messageListener初始化
                        this.consumeMessageService =
                                new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                    }
                    //消息消费服务启动
                    this.consumeMessageService.start();
                    //注册消费者实例
                    boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                        throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                                + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                null);
                    }
                    //启动消费者客户端
                    mQClientFactory.start();
                    log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                            + this.serviceState
                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                            null);
                default:
                    break;
            }
    
            this.updateTopicSubscribeInfoWhenSubscriptionChanged();
            this.mQClientFactory.checkClientInBroker();
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            this.mQClientFactory.rebalanceImmediately();
        }
    
    • mQClientFactory.start()这个方法(MQClientInstance#start)
       public void start() throws MQClientException {
    
            synchronized (this) {
                switch (this.serviceState) {
                    case CREATE_JUST:
                        this.serviceState = ServiceState.START_FAILED;
                        // If not specified,looking address from name server
                        if (null == this.clientConfig.getNamesrvAddr()) {
                            this.mQClientAPIImpl.fetchNameServerAddr();
                        }
                        // Start request-response channel
                        this.mQClientAPIImpl.start();
                        // Start various schedule tasks
                        this.startScheduledTask();
                        // 拉数据
                        this.pullMessageService.start();
                        // Start rebalance service
                        this.rebalanceService.start();
                        // Start push service
                        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                        log.info("the client factory [{}] start OK", this.clientId);
                        this.serviceState = ServiceState.RUNNING;
                        break;
                    case START_FAILED:
                        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                    default:
                        break;
                }
            }
        }
    
    • 向broker发送消息请求最新

    相关文章

      网友评论

          本文标题:RocketMQ消息消费源码分析

          本文链接:https://www.haomeiwen.com/subject/mhzcoktx.html