美文网首页
RocketMQ Clint 源码分析

RocketMQ Clint 源码分析

作者: 0d1b415a365b | 来源:发表于2019-04-17 18:22 被阅读0次
    启动
    • org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
        public synchronized void start() throws MQClientException {
            // CREATE_JUST, RUNNING, SHUTDOWN_ALREADY, START_FAILED;
            switch(null.$SwitchMap$org$apache$rocketmq$common$ServiceState[this.serviceState.ordinal()]) {
            case 1:
                this.log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", new Object[]{this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), Boolean.valueOf(this.defaultMQPushConsumer.isUnitMode())});
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                this.copySubscription();
                // 如果是集群消费模式,需要通过所有的 consumer pid 做负载均衡,每个 consumer 负责一部分queue
                if(this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
    
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                this.pullAPIWrapper = new PullAPIWrapper(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), this.isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(this.filterMessageHookList);
                // 根据消费模式确定 offset 怎么存储, 消费模式:BROADCASTING, CLUSTERING
                // 如果是广播消费,offset 使用本地文件存储
                // 如果是集群消费,offset 存储在 broker 端
                if(this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch(null.$SwitchMap$org$apache$rocketmq$common$protocol$heartbeat$MessageModel[this.defaultMQPushConsumer.getMessageModel().ordinal()]) {
                    case 1:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case 2:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                    }
    
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
    
                // LocalFileOffsetStore 会从本地文件读到上次消费的offset
                // RemoteBrokerOffsetStore.load 是一个空实现
                this.offsetStore.load();
                if(this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
                } else if(this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
                }
    
                // 并发消费或者顺序消费
                this.consumeMessageService.start();
                // 向 mQClientFactory 注册消费者信息,key: group value: consumer, 保证一个实例中的一个 group 只可以有一个 consumer
                // 注册成功后启动 mqClient
                boolean registerOK = this.mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if(!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null);
                } else {
                    this.mQClientFactory.start();
                    this.log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                    this.serviceState = ServiceState.RUNNING;
                }
            default:
                this.updateTopicSubscribeInfoWhenSubscriptionChanged();
                this.mQClientFactory.checkClientInBroker();
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                this.mQClientFactory.rebalanceImmediately();
                return;
            case 2:
            case 3:
            case 4:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null);
            }
        }
    
    广播模式消费位点初始化
    • org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore
        public void load() throws MQClientException {
            // 以 json(map) 的形式存储在文件中,读出并恢复至 consumer.offsetStore
            // offsetTable key: queue value: offset
            OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
            if(offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
                this.offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
                Iterator var2 = offsetSerializeWrapper.getOffsetTable().keySet().iterator();
    
                while(var2.hasNext()) {
                    MessageQueue mq = (MessageQueue)var2.next();
                    AtomicLong offset = (AtomicLong)offsetSerializeWrapper.getOffsetTable().get(mq);
                    log.info("load consumer's offset, {} {} {}", new Object[]{this.groupName, mq, Long.valueOf(offset.get())});
                }
            }
    
        }
    
    ConsumeService 启动

    其实这里跟主流程并没有多大关系,浏览一眼,先跳过。

    并发消费
    • org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
        public void start() {
            // 启动 scheduled 线程,定时清理过期消息
            // 默认 15 分钟 1 次
            this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
                public void run() {
                    ConsumeMessageConcurrentlyService.this.cleanExpireMsg();
                }
            }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
        }
    
        private void cleanExpireMsg() {
            Iterator it = this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
            
            // 遍历所有处理中的 queue,清理过期消息
            while(it.hasNext()) {
                Entry<MessageQueue, ProcessQueue> next = (Entry)it.next();
                ProcessQueue pq = (ProcessQueue)next.getValue();
                pq.cleanExpiredMsg(this.defaultMQPushConsumer);
            }
    
        }
    
    • org.apache.rocketmq.client.impl.consumer.ProcessQueue
        public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
            if(!pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
                int loop = this.msgTreeMap.size() < 16?this.msgTreeMap.size():16;
    
                for(int i = 0; i < loop; ++i) {
                    MessageExt msg = null;
    
                    try {
                        this.lockTreeMap.readLock().lockInterruptibly();
    
                        try {
                            if(this.msgTreeMap.isEmpty() || System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp((Message)this.msgTreeMap.firstEntry().getValue())) <= pushConsumer.getConsumeTimeout() * 60L * 1000L) {
                                break;
                            }
    
                            // 从 msgid 最小的开始取出
                            msg = (MessageExt)this.msgTreeMap.firstEntry().getValue();
                        } finally {
                            this.lockTreeMap.readLock().unlock();
                        }
                    } catch (InterruptedException var24) {
                        this.log.error("getExpiredMsg exception", var24);
                    }
    
                    try {
                        // 发回到重试队列
                        pushConsumer.sendMessageBack(msg, 3);
                        this.log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", new Object[]{msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), Integer.valueOf(msg.getQueueId()), Long.valueOf(msg.getQueueOffset())});
    
                        try {
                            this.lockTreeMap.writeLock().lockInterruptibly();
    
                            try {
                                // 从本地过期消息中删除,如果删除失败,下次清理还是会处理这个消息,所以这里也可能产生重复投递
                                if(!this.msgTreeMap.isEmpty() && msg.getQueueOffset() == ((Long)this.msgTreeMap.firstKey()).longValue()) {
                                    try {
                                        this.removeMessage(Collections.singletonList(msg));
                                    } catch (Exception var19) {
                                        this.log.error("send expired msg exception", var19);
                                    }
                                }
                            } finally {
                                this.lockTreeMap.writeLock().unlock();
                            }
                        } catch (InterruptedException var21) {
                            this.log.error("getExpiredMsg exception", var21);
                        }
                    } catch (Exception var22) {
                        this.log.error("send expired msg exception", var22);
                    }
                }
    
            }
        }
    
    顺序消费
        public void start() {
            if(MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    public void run() {
                        ConsumeMessageOrderlyService.this.lockMQPeriodically();
                    }
                }, 1000L, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
            }
    
        }
    
    MQClient 启动
    • org.apache.rocketmq.client.impl.factory.MQClientInstance
        public void start() throws MQClientException {
    
            synchronized (this) {
                switch (this.serviceState) {
                    case CREATE_JUST:
                        this.serviceState = ServiceState.START_FAILED;
    
                        // If not specified,looking address from name server
                        // 获取 namesrv 地址,如果没有配置,则通过暴露的 http 接口获取
                        // http 地址通过 rocketmq.namesrv.domain 和 rocketmq.namesrv.domain.subgroup 配置
                        // this.clientConfig 就是 DefaultMQPushConsumer, DefaultMQPushConsumer extends ClientConfig
                        if (null == this.clientConfig.getNamesrvAddr()) {
                            this.mQClientAPIImpl.fetchNameServerAddr();
                        }
    
                        // Start request-response channel
                        // 启动 mqClient, 就是启动一个 netty client
                        this.mQClientAPIImpl.start();
    
                        // Start various schedule tasks
                        // 从 nameserver 拉取最新的 topic 路由信息, 包括 broker,producer,consumer,默认 30s 一次
                        // 清理离线 brokers 并向所有 brokers 发送心跳, 默认 30s 一次
                        // 存储消费位点,默认 5s 一次
                        // 根据队列里acc消息数, 调整消费线程池的 coreSize, 1m 一次,不可修改
                        this.startScheduledTask();
    
                        // Start pull service
                        // 启动拉取消息线程,从 pullRequest 无界阻塞队列获取拉取请求
                        this.pullMessageService.start();
    
                        // Start rebalance service
                        // 启动负载均衡,根据 consumer 总数和 queue 总数平分, queue 会进行排序,如果消费者数量跟 broker 数量一致则正好一个消费者负责一个 broker
                        // 通过 rocketmq.client.rebalance.waitInterval 设置重新均衡间隔,默认 20s
                        this.rebalanceService.start();
    
                        // Start push service
                        // 启动一个内部 producer
                        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
    
                        log.info("the client factory [{}] start OK", this.clientId);
                        this.serviceState = ServiceState.RUNNING;
                        break;
                    case RUNNING:
                        break;
                    case SHUTDOWN_ALREADY:
                        break;
                    case START_FAILED:
                        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                    default:
                        break;
                }
            }
        }    
    

    到这里已经启动完毕。可是留下了两大疑问:PullRequestQueue 是谁放进去的,又是从何时开始进行消费的???
    往回找,可以判断消费是在 consumeService 里完成的,看一下他的构造

    • org.apache.rocketmq.client.impl.consumer
        public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
            MessageListenerConcurrently messageListener) {
            this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
            this.messageListener = messageListener;
    
            this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
            this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
            this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
    
            // 初始化消费线程池,设置的最小消费线程数为 corePoolSize, 最大消费线程数为 maxPoolSize, 任务队列为 无界 ComsumeRequest 队列
            this.consumeExecutor = new ThreadPoolExecutor(
                this.defaultMQPushConsumer.getConsumeThreadMin(),
                this.defaultMQPushConsumer.getConsumeThreadMax(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.consumeRequestQueue,
                new ThreadFactoryImpl("ConsumeMessageThread_"));
    
            this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
            this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
        }
    
        // 这个方法里会往消费线程池里提交任务
        public void submitConsumeRequest(
            final List<MessageExt> msgs,
            final ProcessQueue processQueue,
            final MessageQueue messageQueue,
            final boolean dispatchToConsume) {
            final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
            // 小于批量消费数量直接提交消费请求,否则只提交批量消费数量的任务,提交任务失败则过 5s 再重试提交
            if (msgs.size() <= consumeBatchSize) {
                ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    this.submitConsumeRequestLater(consumeRequest);
                }
            } else {
                for (int total = 0; total < msgs.size(); ) {
                    List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                    for (int i = 0; i < consumeBatchSize; i++, total++) {
                        if (total < msgs.size()) {
                            msgThis.add(msgs.get(total));
                        } else {
                            break;
                        }
                    }
    
                    ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                    try {
                        this.consumeExecutor.submit(consumeRequest);
                    } catch (RejectedExecutionException e) {
                        for (; total < msgs.size(); total++) {
                            msgThis.add(msgs.get(total));
                        }
    
                        this.submitConsumeRequestLater(consumeRequest);
                    }
                }
            }
        }
    

    找 submitConsumeRequest 的方法调用,发现在 DefaultMQPushConsumerImpl.pullMessage() 里。这里是 PullService 从 PullRequest 队列拿到拉取消息请求后调用的。现在 pull 和 consume 串起来了。
    接下来去找 PullRequest 是在哪里产生的

    • org.apache.rocketmq.client.impl.consumer.PullMessageService
        public void executePullRequestImmediately(final PullRequest pullRequest) {
            try {
                this.pullRequestQueue.put(pullRequest);
            } catch (InterruptedException e) {
                log.error("executePullRequestImmediately pullRequestQueue.put", e);
            }
        }
    

    找调用


    image.png

    做完 queue 的负载均衡后会立即进行一次 dispatchPullRequest

    • org.apache.rocketmq.client.impl.consumer.RebalanceImpl
        private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
            final boolean isOrder) {
            boolean changed = false;
    
            ...
    
            List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
            for (MessageQueue mq : mqSet) {
                if (!this.processQueueTable.containsKey(mq)) {
                    if (isOrder && !this.lock(mq)) {
                        log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                        continue;
                    }
    
                    this.removeDirtyOffset(mq);
                    ProcessQueue pq = new ProcessQueue();
                    // 寻找消费位点,发起 PullRequest
                    long nextOffset = this.computePullFromWhere(mq);
                    if (nextOffset >= 0) {
                        ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                        if (pre != null) {
                            log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                        } else {
                            log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                            PullRequest pullRequest = new PullRequest();
                            pullRequest.setConsumerGroup(consumerGroup);
                            pullRequest.setNextOffset(nextOffset);
                            pullRequest.setMessageQueue(mq);
                            pullRequest.setProcessQueue(pq);
                            pullRequestList.add(pullRequest);
                            changed = true;
                        }
                    } else {
                        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                    }
                }
            }
    
            this.dispatchPullRequest(pullRequestList);
    
            return changed;
        }
    
    确认消费位点
    • org.apache.rocketmq.client.impl.consumer.RebalancePushImpl
        public long computePullFromWhere(MessageQueue mq) {
            long result = -1;
            final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
            final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
            switch (consumeFromWhere) {
                case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
                case CONSUME_FROM_MIN_OFFSET:
                case CONSUME_FROM_MAX_OFFSET:
                case CONSUME_FROM_LAST_OFFSET: {
                    // 如果这个 consume group 不是刚启动,就从 offerStore 取上次消费位点
                    // 如果是刚启动, 就从 broker 拉取最后消费位点, 但如果是重试队列,直接从开头消费
                    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                    if (lastOffset >= 0) {
                        result = lastOffset;
                    }
                    // First start,no offset
                    else if (-1 == lastOffset) {
                        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            result = 0L;
                        } else {
                            try {
                                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                            } catch (MQClientException e) {
                                result = -1;
                            }
                        }
                    } else {
                        result = -1;
                    }
                    break;
                }
                case CONSUME_FROM_FIRST_OFFSET: {
                    // 从头消费比较简单,刚启动就从头消费,否则从上次消费位置拉取
                    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                    if (lastOffset >= 0) {
                        result = lastOffset;
                    } else if (-1 == lastOffset) {
                        result = 0L;
                    } else {
                        result = -1;
                    }
                    break;
                }
                case CONSUME_FROM_TIMESTAMP: {
                    // 跟 LAST_OFFSET 差不多,刚启动时用时间戳去 broker 取位点,区别是重试队列不从头,而是从 broker 记录的最后消费位点
                    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                    if (lastOffset >= 0) {
                        result = lastOffset;
                    } else if (-1 == lastOffset) {
                        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            try {
                                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                            } catch (MQClientException e) {
                                result = -1;
                            }
                        } else {
                            try {
                                long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                                    UtilAll.YYYYMMDDHHMMSS).getTime();
                                result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                            } catch (MQClientException e) {
                                result = -1;
                            }
                        }
                    } else {
                        result = -1;
                    }
                    break;
                }
    
                default:
                    break;
            }
    
            return result;
        }
    
    拉取消息的流程
    拉取
    • org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
        public void pullMessage(final PullRequest pullRequest) {
            ...
            前面会做一些状态检查,代码太多,这部分不贴了
            ...
    
            long cachedMessageCount = processQueue.getMsgCount().get();
            long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
    
            // ProcessQueue 是一个本地消息存储,先把消息从 broker 拉取到本地
            // 如果数量超过阈值(默认1000)或者占用内存大小超过阈值(默认100MB)就延迟处理这个 PullRequest
            if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                        this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
                }
                return;
            }
    
            if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                        this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
                }
                return;
            }
    
            // 如果缓存的消息被堵住了,导致消费不动,一样会触发流控,延迟处理 PullRequest
            if (!this.consumeOrderly) {
                if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                    if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                        log.warn(
                            "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                            processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                            pullRequest, queueMaxSpanFlowControlTimes);
                    }
                    return;
                }
            } else {
                // 如果是因为刚启动是把 queue 给锁住了,就重新计算消费位点,否则说明有别的线程在操作这个 queue,那就等会再 pull
                if (processQueue.isLocked()) {
                    if (!pullRequest.isLockedFirst()) {
                        final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                        boolean brokerBusy = offset < pullRequest.getNextOffset();
                        log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                            pullRequest, offset, brokerBusy);
                        if (brokerBusy) {
                            log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                                pullRequest, offset);
                        }
    
                        pullRequest.setLockedFirst(true);
                        pullRequest.setNextOffset(offset);
                    }
                } else {
                    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                    log.info("pull message later because not locked in broker, {}", pullRequest);
                    return;
                }
            }
    
            // 获取订阅信息,知道该从哪个 topic 去拉消息
            final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
            if (null == subscriptionData) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                log.warn("find the consumer's subscription failed, {}", pullRequest);
                return;
            }
    
            final long beginTimestamp = System.currentTimeMillis();
    
            PullCallback pullCallback = new PullCallback() {
                ...
                注册回调,单独拿出来看
                ...
            };
    
            boolean commitOffsetEnable = false;
            long commitOffsetValue = 0L;
            if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
                commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
                if (commitOffsetValue > 0) {
                    commitOffsetEnable = true;
                }
            }
    
            String subExpression = null;
            boolean classFilter = false;
            SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
            if (sd != null) {
                if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                    subExpression = sd.getSubString();
                }
    
                classFilter = sd.isClassFilterMode();
            }
    
            int sysFlag = PullSysFlag.buildSysFlag(
                commitOffsetEnable, // commitOffset
                true, // suspend
                subExpression != null, // subscription
                classFilter // class filter
            );
            // 拉消息
            try {
                this.pullAPIWrapper.pullKernelImpl(
                    pullRequest.getMessageQueue(),
                    subExpression,
                    subscriptionData.getExpressionType(),
                    subscriptionData.getSubVersion(),
                    pullRequest.getNextOffset(),
                    this.defaultMQPushConsumer.getPullBatchSize(),
                    sysFlag,
                    commitOffsetValue,
                    BROKER_SUSPEND_MAX_TIME_MILLIS,
                    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                    CommunicationMode.ASYNC,
                    pullCallback
                );
            } catch (Exception e) {
                log.error("pullKernelImpl exception", e);
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        }
    
    拉取回调
            PullCallback pullCallback = new PullCallback() {
                @Override
                public void onSuccess(PullResult pullResult) {
                    if (pullResult != null) {
                        pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                            subscriptionData);
    
                        switch (pullResult.getPullStatus()) {
                            case FOUND:
    
                                ...
    
                                if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                } else {
                                    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
    
                                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
    
                                    // 扔给 consumer 消费
                                    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                        pullResult.getMsgFoundList(),
                                        processQueue,
                                        pullRequest.getMessageQueue(),
                                        dispatchToConsume);
    
                                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                    } else {
                                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                    }
                                }
    
                                ...
    
                                break;
                            case NO_NEW_MSG:
                                // 没有新的消息,从 pullResult 修正一下位点,重新发送这个 PullRequest
                                break;
                            case NO_MATCHED_MSG:
                                // 没有匹配的消息,从 pullResult 修正一下位点,重新发送这个 PullRequest
                                break;
                            case OFFSET_ILLEGAL:
                                // 位点非法,标记 queue 为 dropper,以后这个 queue 的 PullRequest 会直接丢弃
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                pullRequest.getProcessQueue().setDropped(true);
                                // 10s 后修正位点并移除 queue
                                DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
    
                                    @Override
                                    public void run() {
                                        try {
                                            DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                                pullRequest.getNextOffset(), false);
    
                                            DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
    
                                            DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
    
                                            log.warn("fix the pull request offset, {}", pullRequest);
                                        } catch (Throwable e) {
                                            log.error("executeTaskLater Exception", e);
                                        }
                                    }
                                }, 10000);
                                break;
                            default:
                                break;
                        }
                    }
                }
    
                @Override
                public void onException(Throwable e) {
                    if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("execute the pull request exception", e);
                    }
    
                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                }
            };
    
    消费
    • org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
        class ConsumeRequest implements Runnable {
            private final List<MessageExt> msgs;
            private final ProcessQueue processQueue;
            private final MessageQueue messageQueue;
    
            public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
                this.msgs = msgs;
                this.processQueue = processQueue;
                this.messageQueue = messageQueue;
            }
    
            public List<MessageExt> getMsgs() {
                return msgs;
            }
    
            public ProcessQueue getProcessQueue() {
                return processQueue;
            }
    
            @Override
            public void run() {
                ...
    
                // 这个 listener 就是我们自己写的代码了,通过 consumer.registerMessageListener 设置的消费逻辑代码
                MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
                
                ...
    
                long beginTimestamp = System.currentTimeMillis();
                boolean hasException = false;
                ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                try {
                    ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
                    if (msgs != null && !msgs.isEmpty()) {
                        for (MessageExt msg : msgs) {
                            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                        }
                    }
                    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
                } catch (Throwable e) {
                    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                        RemotingHelper.exceptionSimpleDesc(e),
                        ConsumeMessageConcurrentlyService.this.consumerGroup,
                        msgs,
                        messageQueue);
                    hasException = true;
                }
                long consumeRT = System.currentTimeMillis() - beginTimestamp;
                if (null == status) {
                    if (hasException) {
                        returnType = ConsumeReturnType.EXCEPTION;
                    } else {
                        returnType = ConsumeReturnType.RETURNNULL;
                    }
                } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                    returnType = ConsumeReturnType.TIME_OUT;
                } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                    returnType = ConsumeReturnType.FAILED;
                } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                    returnType = ConsumeReturnType.SUCCESS;
                }
    
                ...
    
                // 处理消费结果
                if (!processQueue.isDropped()) {
                    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
                } else {
                    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
                }
            }
    
            public MessageQueue getMessageQueue() {
                return messageQueue;
            }
    
        }
    
    处理消费结果
        public void processConsumeResult(
            final ConsumeConcurrentlyStatus status,
            final ConsumeConcurrentlyContext context,
            final ConsumeRequest consumeRequest
        ) {
            int ackIndex = context.getAckIndex();
    
            if (consumeRequest.getMsgs().isEmpty())
                return;
    
            // 消费成功,ackIndex = Integer.MAX_VALUE
            // 消费失败,ackIndex = -1
            // 使用 ackIndex 控制消息的回发
            switch (status) {
                case CONSUME_SUCCESS:
                    if (ackIndex >= consumeRequest.getMsgs().size()) {
                        ackIndex = consumeRequest.getMsgs().size() - 1;
                    }
                    ...
                    break;
                case RECONSUME_LATER:
                    ackIndex = -1;
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                        consumeRequest.getMsgs().size());
                    break;
                default:
                    break;
            }
    
            // 广播模式消费失败的消息直接扔了
            // 集群模式会发回到 broker
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                        MessageExt msg = consumeRequest.getMsgs().get(i);
                        log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                    }
                    break;
                case CLUSTERING:
                    List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                        MessageExt msg = consumeRequest.getMsgs().get(i);
                        boolean result = this.sendMessageBack(msg, context);
                        if (!result) {
                            msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                            msgBackFailed.add(msg);
                        }
                    }
    
                    // 往 broker 发送失败的消息,5s 后直接重试消费
                    if (!msgBackFailed.isEmpty()) {
                        consumeRequest.getMsgs().removeAll(msgBackFailed);
    
                        this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                    }
                    break;
                default:
                    break;
            }
    
            // 更新消费位点
            long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
            if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
                this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
            }
        }
    

    相关文章

      网友评论

          本文标题:RocketMQ Clint 源码分析

          本文链接:https://www.haomeiwen.com/subject/acpgwqtx.html