美文网首页
RocketMq如何保证消费顺序

RocketMq如何保证消费顺序

作者: 飞哈飞 | 来源:发表于2020-11-10 12:22 被阅读0次

两个锁:

集群模式下锁队列保证消息被同一个consumer消费,往broker定时发送锁命令
本地消费时不论集群模式和广播模式都会有本地队列锁进行锁定
保证同一个队列只会同时被一个消费者线程锁定

public void start() {
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }
 public synchronized void lockMQPeriodically() {
        if (!this.stopped) {
            this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
        }
    }

本地消费时也按队列进行锁定操作

class ConsumeRequest implements Runnable {
        private final ProcessQueue processQueue;
        private final MessageQueue messageQueue;

        public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }

        public ProcessQueue getProcessQueue() {
            return processQueue;
        }

        public MessageQueue getMessageQueue() {
            return messageQueue;
        }

        @Override
        public void run() {
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }
             //一个队列一个锁进行锁定
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
             //广播模式就不需要要求锁broker的队列
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                  //省略消费详细逻辑
                } else {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        return;
                    }
                    //队列锁失败将会再次尝试锁定
                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }

    }

顺序消费模式下的消费逻辑

rocketmq消费.png

MQConsumerInner获取到消息就把消息放到ProcessQueue,并且会启动一个消费者任务
ProcessQueue底层用treeMap存储消息保证消息获取时按消息偏移量大小顺序获取,并在进行读写操作时用读写锁保证其线程安全性。

case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);

                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

//顺序消费当前processQueue没有进入消费状态才会返回dispatchToConsume 为true进而触发下面ConsumeRequest任务执行
//并发消费模式下则都会进行任务执行,不会判断dispatchToConsume是否为true,因为并发模式直接消费传入的MsgList,而不是从ProcessQueue里取
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);

                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }

                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            }

消费失败重试逻辑

本地消费失败就会将消息重新放入processQueue里,达到最大次数的消费就会把消息发送回broker

switch (status) {
                case SUCCESS:
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    break;
                case COMMIT:
                    commitOffset = consumeRequest.getProcessQueue().commit();
                    break;
                case ROLLBACK:
                    consumeRequest.getProcessQueue().rollback();
                    this.submitConsumeRequestLater(
                        consumeRequest.getProcessQueue(),
                        consumeRequest.getMessageQueue(),
                        context.getSuspendCurrentQueueTimeMillis());
                    continueConsume = false;
                    break;
                case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    if (checkReconsumeTimes(msgs)) {
                        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                        this.submitConsumeRequestLater(
                            consumeRequest.getProcessQueue(),
                            consumeRequest.getMessageQueue(),
                            context.getSuspendCurrentQueueTimeMillis());
                        continueConsume = false;
                    }
                    break;
                default:
                    break;
            }

相关文章

  • RocketMq如何保证消费顺序

    两个锁: 集群模式下锁队列保证消息被同一个consumer消费,往broker定时发送锁命令本地消费时不论集群模式...

  • RocketMQ(2) 顺序消息、事务消息

    RocketMQ 顺序消息:消息有序是指可以按照消息发送顺序来消费。RocketMQ 可以严格的保证消息有序,但是...

  • RocketMQ保证顺序消费demo

    上一篇 << >>RocketMQ如何动态扩容和缩容[https://www.jianshu.com/p/a33e...

  • rocketmq_顺序消息

    Q:在rocketmq语境下,如何定义【顺序】这个词? Q:为了保证这种效果,生产端应该如何做? Q:为了保证这种...

  • canal和rocketmq

    canal使用rocketmq做数据同步的问题 因为要保证数据的顺序,所以要使用rocketmq的顺序topic,...

  • RocketMQ 顺序消费

    1、前言 对于所有的 MQ 来说,必问的一道面试题就是 RocketMQ 顺序消息怎样做?原理是什么? 首先我们要...

  • Windows 安装 RocketMQ

    一、RocketMQ 介绍 1、消息顺序2、消息重复消费3、事务消息 二、RocketMQ 安装 Windows:...

  • RocketMQ实战(三):分布式事务

    接 《RocketMQ实战(一)》,《RocketMQ实战(二)》,本篇博客主要讨论的话题是:顺序消费、RMQ在分...

  • (十四)顺序消费如何保证消息消费

    集群模式下顺序消费的时候,是通过加锁的方式对队列进行占有。其实在单个消费者单个队列的情况下,加锁感觉是多余的。但是...

  • MQ随记(2)

    如何保证消息不会被重复消费(保证消息消费时的幂等性) kafka 按照数据进入kafka的顺序,kafka会给每条...

网友评论

      本文标题:RocketMq如何保证消费顺序

      本文链接:https://www.haomeiwen.com/subject/nbqavktx.html