美文网首页
RocketMq如何保证消费顺序

RocketMq如何保证消费顺序

作者: 飞哈飞 | 来源:发表于2020-11-10 12:22 被阅读0次

    两个锁:

    集群模式下锁队列保证消息被同一个consumer消费,往broker定时发送锁命令
    本地消费时不论集群模式和广播模式都会有本地队列锁进行锁定
    保证同一个队列只会同时被一个消费者线程锁定

    public void start() {
            if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        ConsumeMessageOrderlyService.this.lockMQPeriodically();
                    }
                }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
            }
        }
     public synchronized void lockMQPeriodically() {
            if (!this.stopped) {
                this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
            }
        }
    

    本地消费时也按队列进行锁定操作

    class ConsumeRequest implements Runnable {
            private final ProcessQueue processQueue;
            private final MessageQueue messageQueue;
    
            public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
                this.processQueue = processQueue;
                this.messageQueue = messageQueue;
            }
    
            public ProcessQueue getProcessQueue() {
                return processQueue;
            }
    
            public MessageQueue getMessageQueue() {
                return messageQueue;
            }
    
            @Override
            public void run() {
                if (this.processQueue.isDropped()) {
                    log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    return;
                }
                 //一个队列一个锁进行锁定
                final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
                synchronized (objLock) {
                 //广播模式就不需要要求锁broker的队列
                    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                      //省略消费详细逻辑
                    } else {
                        if (this.processQueue.isDropped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            return;
                        }
                        //队列锁失败将会再次尝试锁定
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                    }
                }
            }
    
        }
    

    顺序消费模式下的消费逻辑

    rocketmq消费.png

    MQConsumerInner获取到消息就把消息放到ProcessQueue,并且会启动一个消费者任务
    ProcessQueue底层用treeMap存储消息保证消息获取时按消息偏移量大小顺序获取,并在进行读写操作时用读写锁保证其线程安全性。

    case FOUND:
                                long prevRequestOffset = pullRequest.getNextOffset();
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                long pullRT = System.currentTimeMillis() - beginTimestamp;
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullRT);
    
                                long firstMsgOffset = Long.MAX_VALUE;
                                if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                } else {
                                    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
    
                                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
    
    //顺序消费当前processQueue没有进入消费状态才会返回dispatchToConsume 为true进而触发下面ConsumeRequest任务执行
    //并发消费模式下则都会进行任务执行,不会判断dispatchToConsume是否为true,因为并发模式直接消费传入的MsgList,而不是从ProcessQueue里取
                                    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                        pullResult.getMsgFoundList(),
                                        processQueue,
                                        pullRequest.getMessageQueue(),
                                        dispatchToConsume);
    
                                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                    } else {
                                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                    }
                                }
    
                                if (pullResult.getNextBeginOffset() < prevRequestOffset
                                    || firstMsgOffset < prevRequestOffset) {
                                    log.warn(
                                        "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                        pullResult.getNextBeginOffset(),
                                        firstMsgOffset,
                                        prevRequestOffset);
                                }
    

    消费失败重试逻辑

    本地消费失败就会将消息重新放入processQueue里,达到最大次数的消费就会把消息发送回broker

    switch (status) {
                    case SUCCESS:
                        this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                        break;
                    case COMMIT:
                        commitOffset = consumeRequest.getProcessQueue().commit();
                        break;
                    case ROLLBACK:
                        consumeRequest.getProcessQueue().rollback();
                        this.submitConsumeRequestLater(
                            consumeRequest.getProcessQueue(),
                            consumeRequest.getMessageQueue(),
                            context.getSuspendCurrentQueueTimeMillis());
                        continueConsume = false;
                        break;
                    case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                        this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                        if (checkReconsumeTimes(msgs)) {
                            consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                            this.submitConsumeRequestLater(
                                consumeRequest.getProcessQueue(),
                                consumeRequest.getMessageQueue(),
                                context.getSuspendCurrentQueueTimeMillis());
                            continueConsume = false;
                        }
                        break;
                    default:
                        break;
                }
    

    相关文章

      网友评论

          本文标题:RocketMq如何保证消费顺序

          本文链接:https://www.haomeiwen.com/subject/nbqavktx.html