顺序消息

作者: 93张先生 | 来源:发表于2021-01-15 12:33 被阅读0次

    顺序消息

    顺序消息是指消息消费的顺序和生产者发送消息的顺序一样的。

    例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。

    分区有序

    分区有序是指这个Topic下这个队列下的消息是有顺序的,生产者发送消息的时候,将严格按照消息的顺序,将消息们发送到一个Topic下的一个队列,从而保证了生产者分区消息有序,消费者进行消费时,进行单线程单队列消费,保证了消费有序。
    适用场景:性能要求高,以 sharding key 作为分区字段,在同一个队列中严格的按照 FIFO 原则进行消息发布和消费的场景。sharding key 比如订单Id,一个订单的创建、付款、完成有序的,根据算法将这个订单的所有事件发送到同一个队列中去。

    全局有序

    全局有序是指某个Topic下的所有消息都要保证顺序,可以通过一个Topic只有一个消息队列,保证了全局有序,实际上市分区有序的变种。

    消息顺序性保证

    全局有序是分区有序的一个特列,只需要设置Topic下消息队列的个数为1即可,因此分区有序消息有序,就可以保证顺序消息。

    顺序消息保证三个条件:

    1. 生产者将消息有序的发送到同一个分区队列
    2. 同一个队列的消息是顺序存储的
    3. 消费者以这个发送顺序进行消费
    消费者顺序消费实现

    消息消费是以消费者组为维度的,一个消费者组可以消费这个topic下的所有消息队列,要保证顺序消费,这个topic下的一个消息队列只能由消费者组中的一个消费者消费,然后消费者消费这个消息队列是单线程消费的,这样就保证了顺序消息消费。

    topic下的一个消息队列只能由消费者组中的一个消费者消费,这个由Broker端对消息队列加锁来实现。加锁采用了ConcurrentHashMap。ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable,
    一个消费者组对应ConcurrentHashMap<MessageQueue, LockEntry>,LockEntry包含clientId属性,clientId代表一个消费者实例,key为消费者组,一个topic的消息队列,只能由这个消费者中的clientId的消费者消费消息。

    一个消息队列由一个线程消费,由一个消息队列一个锁、消费时synchronized关键字共同维护单线程消息消费的。

    顺序消费步骤

    并发消息消费的流程包含4个步骤:消息队列负载均衡、消息拉取、消息消费、消息消费进度存储。顺序消费略有不同,每个步骤都有加锁或并发控制。

    消息队列负均衡

    RebalanceService服务每隔20秒执行一次负载均衡方法,在负载均的过程中,针对顺序消息,lock()方法会向Broker端申请锁定MessageQueue,如果锁定失败,说明messageQueue正在消费者消费,不能被拉取消息,等待下次锁定。

    // mqSet,为这次负载均衡之后需要消费的队列
    for (MessageQueue mq : mqSet) {
        // 新的MessageQueue,新建对应的ProcessQueue
        if (!this.processQueueTable.containsKey(mq)) {
            // 顺序消息,锁定broker端的MessageQueue消息队列,锁定失败,说明messageQueue正在消费者消费,不能被拉取消息;等待下次锁定
            if (isOrder && !this.lock(mq)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }
            // 清空这个消费队列原来的消费进度
            this.removeDirtyOffset(mq);
            // 新建MessageQueue对应的消息处理队列ProcessQueue队列
            ProcessQueue pq = new ProcessQueue();
            // 计算从哪里拉取message
            long nextOffset = this.computePullFromWhere(mq);
            if (nextOffset >= 0) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } else {
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    // 一个PullRequest对应一个MessageQueue,一个ProcessQueue
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }
    
    Broker端MessageQueue加锁

    RebalanceLockManager是处理Broker的MessageQueue加锁的类,加锁采用了ConcurrentHashMap。
    ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable,
    topic下的一个消息队列只能由消费者组中的一个消费者消费,一个消费者组对应ConcurrentHashMap<MessageQueue, LockEntry>,LockEntry包含clientId属性,clientId代表一个消费者实例,key为消费者组,一个topic的消息队列,只能由这个消费者中的clientId的消费者消费消息。

    LockEntry判定一个MessageQueue是否被锁定,默认锁定60秒,60秒之后消息队列解锁,下次再去锁定。
    负载均衡时会执行MessageQueue锁定方法,默认20秒一次负载均衡定时任务,因此下次再锁定时间间隔为20秒。

    // 顺序消息,判断MessageQueue是否被锁定
    private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
        ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
        if (groupValue != null) {
            LockEntry lockEntry = groupValue.get(mq);
            if (lockEntry != null) {
                boolean locked = lockEntry.isLocked(clientId);
                if (locked) {
                    lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                }
    
                return locked;
            }
        }
    
        return false;
    }
    
    public boolean isLocked(final String clientId) {
        boolean eq = this.clientId.equals(clientId);
        return eq && !this.isExpired();
    }
    
    public boolean isExpired() {
        boolean expired =
            (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
    
        return expired;
    }
    
    /**
     * 顺序消息broker锁定消息队列集合
     * @param group
     * @param mqs
     * @param clientId
     * @return
     */
    public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
        final String clientId) {
    
        Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
        Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
    
        for (MessageQueue mq : mqs) {
            // 锁定加入到锁定队列
            if (this.isLocked(group, mq, clientId)) {
                lockedMqs.add(mq);
            } else {
                // 未锁定队列
                notLockedMqs.add(mq);
            }
        }
        // 存在未锁定队列,进行队列锁定
        if (!notLockedMqs.isEmpty()) {
            try {
                // 获取线程锁,进行锁定操作
                this.lock.lockInterruptibly();
                try {
                    // 新建被锁定组的HashMap
                    ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
                    if (null == groupValue) {
                        groupValue = new ConcurrentHashMap<>(32);
                        this.mqLockTable.put(group, groupValue);
                    }
                    // 进行队列锁定
                    for (MessageQueue mq : notLockedMqs) {
                        LockEntry lockEntry = groupValue.get(mq);
                        if (null == lockEntry) {
                            lockEntry = new LockEntry();
                            lockEntry.setClientId(clientId);
                            groupValue.put(mq, lockEntry);
                            log.info(
                                "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}",
                                group,
                                clientId,
                                mq);
                        }
                        // JVM MQClientInstance 实例锁定,一个JVM实例下,两个消费者,不能属于同一个组,
                        // 要是消费者组相同,只能是两个JVM实例,构成消费者Cluster。
                        if (lockEntry.isLocked(clientId)) {
                            lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                            lockedMqs.add(mq);
                            continue;
                        }
    
                        String oldClientId = lockEntry.getClientId();
                        // 锁定实销,重新锁定
                        if (lockEntry.isExpired()) {
                            lockEntry.setClientId(clientId);
                            lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                            log.warn(
                                "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
                                group,
                                oldClientId,
                                clientId,
                                mq);
                            lockedMqs.add(mq);
                            continue;
                        }
    
                        log.warn(
                            "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
                            group,
                            oldClientId,
                            clientId,
                            mq);
                    }
                } finally {
                    this.lock.unlock();
                }
            } catch (InterruptedException e) {
                log.error("putMessage exception", e);
            }
        }
    
        return lockedMqs;
    }
    

    消息拉取

    DefaultMQPushConsumerImpl#pullMessage为消息拉取的主要方法,在这里针对顺序消息进行了PullRequest拉取请求锁定:

    1. ProcessQueue被锁定,第一次拉取消息,pullRequest初始化为未被锁定,首先计算拉取偏移量,然后向消息服务端拉取消息。
    2. processQueue未被上锁,推迟3秒进行pullRequest提交,放入pullRequestQueue队列中,等待broker端对messageQueue进行锁定。
    // ProcessQueue被锁定
    if (processQueue.isLocked()) {
        // 第一次拉取消息,pullRequest初始化为未被锁定,首先计算拉取偏移量,然后向消息服务端拉取消息。
        if (!pullRequest.isLockedFirst()) {
            // 获取messageQueue的开始消费位置
            final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
            boolean brokerBusy = offset < pullRequest.getNextOffset();
            log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                pullRequest, offset, brokerBusy);
            if (brokerBusy) {
                log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                    pullRequest, offset);
            }
            // 设置pullRequest被锁定
            pullRequest.setLockedFirst(true);
            // 修正offset,从上次broker开始位置消费
            pullRequest.setNextOffset(offset);
        }
    } else {
        // processQueue未被上锁,推迟3秒进行pullRequest提交,放入pullRequestQueue队列中,等待broker端对messageQueue进行锁定。
        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        log.info("pull message later because not locked in broker, {}", pullRequest);
        return;
    }
    

    消费消费

    ConsumeMessageOrderlyService是消息顺序消费的类。

    MessageQueueLock messageQueueLock对象消息队列锁容器,严格保证一个消息只有一个线程消费,通过队列锁来实现,一个队列一个锁,获得锁才能进行消息消费。

    start()方法每隔20秒,执行一次锁定分配给自己的消息消费队列,该值建议与一次消息负载频率设置相同。在未锁定消息队列之前无法执行消息拉取任务,ConsumeMessageOrderlyService 以每秒20s频率对分配给自己的消息队列进行自动加锁操作,从而消费加锁成功的消息消费队列。

    持续消费消息,这个消费是以时间为维度的,每次在broker端锁定一个队列60秒,因此线程消费消息60秒。

    public void start() {
        // 默认每隔20秒,执行一次锁定分配给自己的消息消费队列,该值建议与一次消息负载频率设置相同。
        // 集群模式下顺序消息消费在创建拉取任务时并未将ProcessQueue的locked状态设置为true,(在负载均衡新建ProcessQueue时,默认locked = false)
        // 在未锁定消息队列之前无法执行消息拉取任务,ConsumeMessageOrderlyService 以每秒20s频率对分配给自己的消息队列进行自动加锁操作,
        // 从而消费加锁成功的消息消费队列。
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    // 顺序消息,每20秒,在broker端进行一次消费队列锁定
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }
    

    ConsumeRequest消费消息请求,实现了Runnable接口,可以被提交到消息消费的线程池中,被并发消费。
    这里通过messageQueueLock获取消息队列锁,保证一个消息队列一个线程消费。synchronized保证了消费过程也是单线程的。

    public void run() {
        if (this.processQueue.isDropped()) {
            log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
            return;
        }
        // 获取消息队列锁,一个线程消费一个消息队列
        final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
        synchronized (objLock) {
            // 广播模式||processQueue被锁定||processQueue没有失效
            if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                final long beginTime = System.currentTimeMillis();
                // 持续消费消息,这个是以时间为消费为维度的,每次锁定线程消费60秒;
                for (boolean continueConsume = true; continueConsume; ) {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        break;
                    }
                    // 集群模式&&processQueue未被锁定,尝试加锁,并延迟提交请求,在进行拉取消息
                    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        && !this.processQueue.isLocked()) {
                        log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                        break;
                    }
                    // 集群模式&&processQueue已失效,尝试加锁,并延迟提交请求,在进行拉取消息
                    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        && this.processQueue.isLockExpired()) {
                        log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                        break;
                    }
                    // 顺序消息消费处理逻辑,每一个ConsumeRequest消费任务不是以消费消息条数来计算的,
                    // 而是根据消费时间,默认当消费时长大于MAX_TIME_CONSUME_CONTINUOUSLY,
                    // 默认60s后,本次消费任务结束,由消费组内其他线程继续消费
                    // 消费时间间隔,每次消费任务最大持续时间,60s;延迟提交请求,在进行拉取消息
                    long interval = System.currentTimeMillis() - beginTime;
                    if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                        ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                        break;
                    }
                    // ConsumeRequest 中包含的消息条数,默认1条
                    final int consumeBatchSize =
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                    // 取出消息进行消费,并放入ProcessQueue的consumingMsgOrderlyTreeMap临时存储
                    List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                    // 还原真实的topic
                    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                    // 有消息需要消费
                    if (!msgs.isEmpty()) {
                        final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
    
                        ConsumeOrderlyStatus status = null;
                        // 消息钩子
                        ConsumeMessageContext consumeMessageContext = null;
                        if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext = new ConsumeMessageContext();
                            consumeMessageContext
                                .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                            consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
                            consumeMessageContext.setMq(messageQueue);
                            consumeMessageContext.setMsgList(msgs);
                            consumeMessageContext.setSuccess(false);
                            // init the consume context type
                            consumeMessageContext.setProps(new HashMap<String, String>());
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                        }
    
                        long beginTimestamp = System.currentTimeMillis();
                        ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                        boolean hasException = false;
                        // 申请消息消费锁,如果消息队列被丢弃,放弃该消息消费队列的消费,
                        // 然后执行消息消息监听器,调用业务方具体消息监听器执行真正的消息消费处理逻辑,
                        // 并通知RocketMQ消息消费结果。
                        // processQueue 上锁
                        try {
                            this.processQueue.getLockConsume().lock();
                            if (this.processQueue.isDropped()) {
                                log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                    this.messageQueue);
                                break;
                            }
                            // 消息消费
                            status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                        } catch (Throwable e) {
                            log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                RemotingHelper.exceptionSimpleDesc(e),
                                ConsumeMessageOrderlyService.this.consumerGroup,
                                msgs,
                                messageQueue);
                            hasException = true;
                        } finally {
                            // 释放锁
                            this.processQueue.getLockConsume().unlock();
                        }
                        // 日志
                        if (null == status
                            || ConsumeOrderlyStatus.ROLLBACK == status
                            || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                            log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
                                ConsumeMessageOrderlyService.this.consumerGroup,
                                msgs,
                                messageQueue);
                        }
                        // 设置返回状态
                        long consumeRT = System.currentTimeMillis() - beginTimestamp;
                        if (null == status) {
                            if (hasException) {
                                returnType = ConsumeReturnType.EXCEPTION;
                            } else {
                                returnType = ConsumeReturnType.RETURNNULL;
                            }
                        } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                            returnType = ConsumeReturnType.TIME_OUT;
                        } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                            returnType = ConsumeReturnType.FAILED;
                        } else if (ConsumeOrderlyStatus.SUCCESS == status) {
                            returnType = ConsumeReturnType.SUCCESS;
                        }
    
                        if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                        }
    
                        if (null == status) {
                            status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                        }
                        // 执行消息钩子
                        if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext.setStatus(status.toString());
                            consumeMessageContext
                                .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                        }
                        // 消费状态统计
                        ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                            .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
                        // 返回消费消息结果,是否进行持续消费
                        continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                    } else {
                        // 未取到消息结束本次循环
                        continueConsume = false;
                    }
                }
            } else {
                if (this.processQueue.isDropped()) {
                    log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    return;
                }
                // 尝试加锁,并延迟提交请求,在进行拉取消息
                ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
            }
        }
    }
    

    消息消费进度存储

    进行消费进度的更新,其他和并发消息一样,采用ConcurrentHashMap并发安全容器。

    相关文章

      网友评论

        本文标题:顺序消息

        本文链接:https://www.haomeiwen.com/subject/okdbaktx.html