美文网首页
rocketmq客户端消费流程

rocketmq客户端消费流程

作者: Cc_7397 | 来源:发表于2019-11-07 18:07 被阅读0次

    rocketmq客户端消费流程

    只关注于集群模式下并发消费的push模式

    组件概述

    DefaultMQPushConsumerImpl
    • 负载均衡实现 RebalanceImpl
    • 拉取消息. PullAPIWrapper
    • 消费进度存储 OffsetStore
    • 消费服务 ConsumeMessageService
    • MQClientInstance 客户端核心实现
    MQClientInstance
    • netty 客户端 业务线程池和回调线程池隔离
    • 定时任务
    • 负载均衡调度 RebalanceService
    • 拉消息任务调度 pullMessageService
    • 内部生产者 defaultMQProducer

    MQClientInstance 和 消费者为一对多关系。使用InstanceName相同的生产者消费者都使用同一个MQClientInstance。

    启动 DefaultMQPushConsumerImpl.start()

    1. 生成InstanceName,如果用户未设置则为pid。

    2. 创建 MQClientInstance,使用InstanceName相同的生产者消费者都使用同一个MQClientInstance。MQClientInstance是客户端的核心。

      就是说一个MQClientInstance下会与多个消费者。MQClientInstance统一调度他们。

    this.mQClientFactory =
    MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook);
    
    //后面会将消费者注册到mQClientFactory,让mQClientFactory有所有同一InstanceName消费者的引用。
    boolean registerOK =     mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
             
    
    1. 为负载均衡实现rebalanceImpl 赋值
    2. 创建PullAPIWrapper 负责拉取消息
    3. 根据消费模式创建 OffsetStore
       switch (this.defaultMQPushConsumer.getMessageModel()) {
                   case BROADCASTING://广播存储在本地
                       this.offsetStore =
                               new LocalFileOffsetStore(this.mQClientFactory,
                                   this.defaultMQPushConsumer.getConsumerGroup());
                       break;
                   case CLUSTERING://集群进度存储在远程
                       this.offsetStore =
                               new RemoteBrokerOffsetStore(this.mQClientFactory,
                                   this.defaultMQPushConsumer.getConsumerGroup());
                       break;
                   default:
                       break;
                   }
    

    ​ OffsetStore 负责读取消费进度和同步消费进度

    1. 根据消费模式创建ConsumeMessageService 并启动

      并发消费不启动线程。

      顺序消费下启动定时任务,会调用消费者的RebalanceImpl的lockAll 方法。向broker发生请求锁住分配给他的队列。

                        if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
        
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                            new ConsumeMessageOrderlyService(this,
                                (MessageListenerOrderly) this.getMessageListenerInner());
                }
                else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently)
                {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                            new ConsumeMessageConcurrentlyService(this,
                                (MessageListenerConcurrently) this.getMessageListenerInner());
                }
    
    1. 启动MQClientInstance,多消费者引用同一个MQClientInstanceMQClientInstance只会启动一次

      mQClientFactory.start();
      
    2. 初始化

      //向nameser 拉取所关心的topic的路由信息  
      this.updateTopicSubscribeInfoWhenSubscriptionChanged();
      //向所有路由信息里的所有broker发送心跳
      this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
      //唤醒mQClientFactory的负责均衡服务,
      this.mQClientFactory.rebalanceImmediately();
      

    启动 MQClientInstance.start()

    一个MQClientInstance 只会启动一次。

    1.启动netty 客户端

    this.mQClientAPIImpl.start();//内置netty客户端
    

    2.启动定时任务

    this.startScheduledTask();//会启动5个定时任务
    
    • 从远程获取nameServer地址 发生变动时可以更新本地nameServer

    远程地址被写死,暂时没有用。

    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
    
    • 更新topic路由信息,topic路由发送变动时可以感知

      MQClientInstance.this.updateTopicRouteInfoFromNameServer();
      
    • 更新消费进度到broker 最终调用 DefaultMQPushConsumerImpl.offsetStore.persistAll

    这里可以看出更新消费进度是异步的,这也是出现重复消息的原因之一

    MQClientInstance.this.persistAllConsumerOffset();
    
    • 向broker发送心跳
    MQClientInstance.this.cleanOfflineBroker();//清理下线的broker
    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();//发送心跳
    
    • 动态调整线程池 根据DefaultMQPushConsumer 的 adjustThreadPoolNumsThreshold 参数和消息在消费者内部的堆积调整
    MQClientInstance.this.adjustThreadPool(); 
    
    1. 启动调度服务
    //拉消息线程
    this.pullMessageService.start();
    //Start rebalance service
    //重负载线程
    this.rebalanceService.start();
    //Start push service 内部生产者用于消费失败时,发送重试消息
    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
    

    拉消息流程

    拉消息的流程是先从负载均衡开始的。MQClientInstance的rebalanceService启动后会定时调用,所有消费者的doRebalance 方法。间隔10s

            private static long WaitInterval = 1000 * 10;//间隔10s    
            @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
    
            while (!this.isStoped()) {
                this.waitForRunning(WaitInterval);//等待
                this.mqClientFactory.doRebalance();
            }
    
            log.info(this.getServiceName() + " service end");
        }
    
        public void doRebalance() {
          //调用所有消费者的doRebalance
            for (String group : this.consumerTable.keySet()) {//consumerTable 消费者引用
                MQConsumerInner impl = this.consumerTable.get(group);
                if (impl != null) {
                    try {
                        impl.doRebalance();
                    } catch (Exception e) {
                        log.error("doRebalance exception", e);
                    }
                }
            }
        }
            //消费者最终会调用自己的负载均衡实现的doRebalance方法
            @Override
        public void doRebalance() {
            if (this.rebalanceImpl != null) { //消费者调用自己的rebalanceImpl
                this.rebalanceImpl.doRebalance();
            }
        }
    
    
    负载均衡实现

    先拿到topic路由信息,然后循环对topic做负载

    public void doRebalance() {
        //取得关心等待topic
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    //对topic做负载
                    this.rebalanceByTopic(topic);
                } catch (Exception e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }
        //当topic变动时,移除多余topic对应的ProcessQueue
        this.truncateMessageQueueNotMyTopic();
    }
    

    负载分集群和广播模式,广播模式不讨论

    在rocketmq中一个topic有多个队列。负载均衡就是将队列合理的分配给一个消费组的所有消费者。

    有多种分配算法,继承AllocateMessageQueueStrategy,默认为AllocateMessageQueueAveragely

    //先获取负载所需要的参数
    //topic对应的所有队列
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    //topic对应的所有客户端
    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
    

    然后调用,返回的list就是分配给当前消费者的队列

       public List<MessageQueue> allocate(//
                                           final String consumerGroup,//
                                           final String currentCID,//
                                           final List<MessageQueue> mqAll,//
                                           final List<String> cidAll//
        );
    

    而区分不同客户端的cidAll 就是每个客户端的ip@InstanceName ,使用同一ip下不能有相同的InstanceName。

    比如AllocateMessageQueueAveragely有这一行

    //取自己在客户端集合的下标,如果两个客户端InstanceName相同,那么index都一样,分配的队列也相同
    int index = cidAll.indexOf(currentCID);
    

    而这个负载算法是没有同步和校验等操作的,不同客户端不会进行通信。客户端不知道别人分配了哪些队列。全靠“自觉”,同一组内都使用同一策略那么分配是合理的,如果同一组内使用不同策略,队列的分配就会发生混乱。

    拉取任务

    rocketmq为每个分配给它的队列生成一个 拉取任务 ProcessQueue

    将其存储在PullMessageService 的pullRequestQueue中,这是一个LinkedBlockingQueue

    PullMessageService 启动后会从堵塞队列中取出拉取任务,然后进行消息的拉取。

    分配队列完成后

     //返回队列是否发生了变化
    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);
                    
    
    
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet) {//mqSet 分配给当前消费者的队列
            boolean changed = false;
                    //存储上次分配的队列和对应的ProcessQueue拉取任务 
            //processQueueTable 是ConcurrentHashMap
            Iterator<Entry<MessageQueue, ProcessQueue>> it =
              this.processQueueTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<MessageQueue, ProcessQueue> next = it.next();
                MessageQueue mq = next.getKey();
                ProcessQueue pq = next.getValue();
    
                if (mq.getTopic().equals(topic)) {//topic 是否相等
                    if (!mqSet.contains(mq)) { //上次分配队列,这次没分配给我
                        pq.setDropped(true);//禁用拉取任务 修改dropped属性。是volatile变量
                        //移除OffsetStore中存储的队列进度,移除前先提交进度
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                            log.info("doRebalance, {}, remove unnecessary mq, {}", 
                                     consumerGroup, mq);
                        }
                    }
             
                   //据上次拉取间隔 120000ms 也移除它
                    else if (pq.isPullExpired()) {
                        switch (this.consumeType()) {
                            case CONSUME_ACTIVELY:
                                break;
                            case CONSUME_PASSIVELY:
                                pq.setDropped(true);
                                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                    it.remove();
                                    changed = true;
                                    log.error(
                                            "[BUG]doRebalance, {}, remove unnecessary mq, {},
                                      because pull is pause, so try to fixed it",
                                            consumerGroup, mq);
                                }
                                break;
                            default:
                                break;
                        }
                    }
                }
            }
                    //新队列 处理
            List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
            for (MessageQueue mq : mqSet) {
                if (!this.processQueueTable.containsKey(mq)) {
                    //生成拉取任务
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(new ProcessQueue());
                                    //计算下次拉取的偏移
                    long nextOffset = this.computePullFromWhere(mq);
                    if (nextOffset >= 0) {
                        pullRequest.setNextOffset(nextOffset);
                        pullRequestList.add(pullRequest);
                        changed = true;
                        //记录下 用于下次对比
                        this.processQueueTable.put(mq, pullRequest.getProcessQueue());
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    } else {
                        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                    }
                }
            }
                    //将拉取任务压入堵塞队列
            //最终调用 
            //PullMessageService.executePullRequestImmediately 
            //this.pullRequestQueue.put(pullRequest);
            this.dispatchPullRequest(pullRequestList);
            return changed;
        }
    
    拉取消息

    现在知道一个队列对应一个拉取任务ProcessQueue,存放在堵塞队列中,如果禁用了会将dropped属性修改为true。

    谁来执行拉取呢,MQClientInstance.PullMessageService。

    PullMessageService 启动后从堵塞队列取出拉取任务,找到对应的组调用pullMessage

    PullMessageService 为单线程,所有拉取消息时为单线程拉取

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
    
        while (!this.isStoped()) {
              //从堵塞队列中取出1
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
        }
    
        log.info(this.getServiceName() + " service end");
    }
    
      private void pullMessage(final PullRequest pullRequest) {
            //找到对应的组调用pullMessage
            final MQConsumerInner consumer = 
              this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
            if (consumer != null) {
                DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
                //调用消费者的pullMessage,最终调用pullAPIWrapper.pullKernelImpl
                impl.pullMessage(pullRequest);
            }
        }
    
    DefaultMQPushConsumerImpl.pullMessage

    先进行限流等检查,如果不能通过会调用executePullRequestLater() 将任务放回队列,下次消费。

    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        //提交到定时任务中
        this.scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {//待会在放入队列
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    }
    

    也会检查是否禁用。正常的任务拉取完成会放回队列,等待下次拉取。

    final ProcessQueue processQueue = pullRequest.getProcessQueue();
    if (processQueue.isDropped()) {//检查dropped属性。volatile修饰
        log.info("the pull request[{}] is droped.", pullRequest.toString());
       //被禁用直接抛弃 没被禁用的用完会放回队列
        return;
    }
    

    都完成后创建一个回调函数 PullCallback,然后异步拉取

    因为网络层是netty,所以其实所有请求都是异步。同步的操作只是做了异步转同步而已。

    this.pullAPIWrapper.pullKernelImpl(//
        pullRequest.getMessageQueue(), // 1
        subExpression, // 2
        subscriptionData.getSubVersion(), // 3
        pullRequest.getNextOffset(), // 4
        this.defaultMQPushConsumer.getPullBatchSize(), // 5
        sysFlag, // 6
        commitOffsetValue,// 7
        BrokerSuspendMaxTimeMillis, // 8
        ConsumerTimeoutMillisWhenSuspend, // 9
        CommunicationMode.ASYNC, // 10
        pullCallback// 11
        );
    

    请求成功后触发回调函数。主要看 case FOUND,就可以了。其他代表没有新消息,偏移量不对等

    //这里有一个mq自己实现的性能统计。我们在外部也可以拿到
    consumer.getDefaultMQPushConsumerImpl().getConsumerStatsManager()
    
    PullCallback pullCallback = new PullCallback() {
        @Override
        public void onSuccess(PullResult pullResult) {
            if (pullResult != null) {
                pullResult =
                        DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
                            pullRequest.getMessageQueue(), pullResult, subscriptionData);
    
                switch (pullResult.getPullStatus()) {
                case FOUND:
                    long prevRequestOffset = pullRequest.getNextOffset();
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                    long pullRT = System.currentTimeMillis() - beginTimestamp;
                    //性能统计
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(
                        pullRequest.getConsumerGroup(),         
                      pullRequest.getMessageQueue().getTopic(), pullRT);
                    long firstMsgOffset = Long.MAX_VALUE;
                    if (pullResult.getMsgFoundList() == null || 
                        pullResult.getMsgFoundList().isEmpty()) {
                      //空消息,放回队列
                      DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    }
                    else {
                        firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                      //性能统计
                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(
                            pullRequest.getConsumerGroup(), 
                            pullRequest.getMessageQueue().getTopic(),
                            pullResult.getMsgFoundList().size());
                        boolean dispathToConsume = 
                          processQueue.putMessage(pullResult.getMsgFoundList());
                   //开始消费       
                   DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                            pullResult.getMsgFoundList(), //
                            processQueue, //
                            pullRequest.getMessageQueue(), //
                            dispathToConsume);
                case NO_NEW_MSG:
                case NO_MATCHED_MSG:
                case OFFSET_ILLEGAL:
                default:
                    break;
                }
            }
        }
    
    开始消费

    这里有一个分批消费的逻辑,根据consumeMessageBatchMaxSize拆分

    取决于这个参数private int consumeMessageBatchMaxSize = 1;

    如果设置大于1那么这批消息消费时只能全部成功或者全部失败

    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
    else {
        for (int total = 0; total < msgs.size();) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                }
                else {
                    break;
                }
            }
                    //创建一个消费job
            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue,
                                                               messageQueue);
            //提交到线程池
            this.consumeExecutor.submit(consumeRequest);
        }
    }
    
    //ConsumeRequest 是Runnable的实现
    ConsumeRequest implements Runnable 
    

    ConsumeRequest的run方法

    @Override
    public void run() {
       //又进行了队列禁用的校验
        if (this.processQueue.isDropped()) {
            log.info("the message queue not be able to consume, because it's dropped {}",
                this.messageQueue);
            return;
        }
            //用户的消费listener 实现
        MessageListenerConcurrently listener =    
        ConsumeMessageConcurrentlyService.this.messageListener;
        //创建Context
        ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
        ConsumeConcurrentlyStatus status = null;
        //这个Context 用于hook,在4.5的消息追踪中是借助此hook和Context实现的
        ConsumeMessageContext consumeMessageContext = null;
        if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext
                .setConsumerGroup(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer
                    .getConsumerGroup());
            consumeMessageContext.setMq(messageQueue);
            consumeMessageContext.setMsgList(msgs);
            consumeMessageContext.setSuccess(false);
            //调用hook:ConsumeMessageHook
            ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl
                .executeHookBefore(consumeMessageContext);
        }
    
        long beginTimestamp = System.currentTimeMillis();
    
        try {
            //将重试消息的topic替换为原来的topic
            ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
            //调用用户方法
            status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
        }
        catch (Throwable e) {
            log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",//
                RemotingHelper.exceptionSimpleDesc(e),//
                ConsumeMessageConcurrentlyService.this.consumerGroup,//
                msgs,//
                messageQueue);
        }
    
        long consumeRT = System.currentTimeMillis() - beginTimestamp;
    
        if (null == status) {//返回null 或者异常设置为失败
            log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",//
                ConsumeMessageConcurrentlyService.this.consumerGroup,//
                msgs,//
                messageQueue);
            status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        
        // add by fuhaining@yolo24.com
        if (consumeMessageLog.isInfoEnabled()) {
            StringBuilder keys = new StringBuilder();
            for (MessageExt msg : msgs) {
                keys.append(msg.getMsgId()).append(",");
            }
            consumeMessageLog.info("concurrently - " + status.name() + " : " + 
                                   keys.deleteCharAt(keys.length() - 1).toString());
        }
    
        if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
            consumeMessageContext.setStatus(status.toString());
            consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == 
                                             status);
           //调用hook
            ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl
                .executeHookAfter(consumeMessageContext);
        }
    
        ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(
            ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(),
          consumeRT);
        //再次校验
        if (!processQueue.isDropped()) {
            //处理结果
            ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
        }
        else {
            log.warn(
                "processQueue is dropped without process consume result. messageQueue={}, 
              msgTreeMap={}, msgs={}",
                new Object[] { messageQueue, processQueue.getMsgTreeMap(), msgs });
        }
    }
    
    重试消息
    //ConsumeMessageConcurrentlyService.processConsumeResult方法   
    //在前面会进行性能统计
    
            switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING://广播略过
            break;
        case CLUSTERING:
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>
              (consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                //发送重试消息
                boolean result = this.sendMessageBack(msg, context);
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }
    
            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);
                            //发送失败的进定时任务,重试
                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(),
                    consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
        }
    
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0) { 
       // 将消费进度提交到OffsetStore
       // OffsetStore 只会将进度记下,由前面说的定时任务同步给broker
    this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }
    

    将要重试的消息发会broker。只是把原来的id发回去。broker在会根据id读取原来消息的消息体

    生成重试消息

    public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        try {
            //消息原来存哪,发会到哪
            String brokerAddr =(null != brokerName) ? 
              this.mQClientFactory.findBrokerAddressInPublish(brokerName)
              : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
    
            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
              this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000);
        }
        catch (Exception e) {
          
         //如果发送失败,使用内部生产者发送
          this.mQClientFactory.getDefaultMQProducer().send(newMsg);
        }
      
     //  consumerSendMessageBack  方法
      
            ConsumerSendMsgBackRequestHeader requestHeader = new
                ConsumerSendMsgBackRequestHeader();
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
            requestHeader.setGroup(consumerGroupWithProjectGroup);
                //原来的topic
            requestHeader.setOriginTopic(msg.getTopic());
                //原消息的偏移
            requestHeader.setOffset(msg.getCommitLogOffset());
              //重试级别
            requestHeader.setDelayLevel(delayLevel);
                //记录原来的id
            requestHeader.setOriginMsgId(msg.getMsgId());
                //通过netty发送
            RemotingCommand response = this.remotingClient.invokeSync(addr, 
            request, timeoutMillis);
            assert response != null;
            switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break;
            }
    
    

    流程总结

    1. MQClientInstancerebalanceService 线程启动。定时调用消费者的负载均衡实现RebalanceImpldoRebalance方法。

    2. RebalanceImpl根据负载策略AllocateMessageQueueStrategy计算属于自己的队列

    3. 根据队列的变化,生成新的拉取任务 ProcessQueue 或者将原来的ProcessQueue禁用

    4. 将新的 ProcessQueue放入MQClientInstancePullMessageServicepullRequestQueue这是一个LinkedBlockingQueue

    5. PullMessageService的线程会从队列中取出,然后调用对应消费者的PullAPIWrapperpullKernelImpl方法发送请求拉取

    6. 拉取为异步,在回调中将消息封装成ConsumeMessageConcurrentlyService.ConsumeRequest任务提交到ConsumeMessageConcurrentlyService的线程池ScheduledExecutorService

    7. 最终调用用户的实现进行消费

    8. 将消费失败消息发回broker生成重试消息

    9. 消费成功将进度写入消费者的OffsetStore 定时回写broker

    相关文章

      网友评论

          本文标题:rocketmq客户端消费流程

          本文链接:https://www.haomeiwen.com/subject/jmaiyctx.html