美文网首页MQ消息系列
RocketMQ顺序消息消费

RocketMQ顺序消息消费

作者: 缄默的石头 | 来源:发表于2017-05-10 14:03 被阅读2541次

    RocketMQ顺序消息消费

    1. 应用场景

    消息队列中消息之间有先后的依赖关系,后一条消息的处理依赖于前一条消息的处理结果,那么比较适用于顺序消息消费

    2. 分析

    RocketMQRocketMQ

    此图可以看出Rocketmq中一个topic和它的mq之间的关系是一对多的关系,客户端向broker中发送消息是根据topic发送的,而消费方消费时也是按照topic来消费的,那么我们怎么保证消息之间的顺序性呢?首先,需要保证顺序的消息要发送到同一个messagequeue中;其次,一个messagequeue只能被一个消费者消费,这点是由消息队列的分配机制来保证的;最后,一个消费者内部对一个mq的消费要保证是有序的。
    我们要做到生产者 - messagequeue - 消费者之间是一对一对一的关系。

    3. 具体实现

    3.1 消息生产者

    Message message = new Message(topic, tags, key, body.getBytes());
    boolean result;
    try {
        SendResult sendResult = getConfigBean().getMqProducer().send(message,queueSelector,args);
        LoggerUtil.log(Level.INFO,"rocket message product result : " + sendResult.toString());
        result = true;
    }
    catch (MQClientException e) {
        throw new MQException("客户端调用状态异常", e);
    }
    catch (RemotingException | InterruptedException | MQBrokerException e) {
        throw new MQException("远程调用异常", e);
    }
    

    相比较普通消息的消费,顺序消费在向broker发送消息的时候要指定MessageQueueSelector,此接口RocketMQ提供了三种实现SelectMessageQueueByRandoom,SelectMessageQueueByHash,SelectMessageQueueByMachineRoom,可由调用方自行根据业务实现。指定将消息发送到对应的队列中去;

    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        try {
            mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        }
        catch (Throwable e) {
            throw new MQClientException("select message queue throwed exception.", e);
        }
    
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback);
        }
        else {
            throw new MQClientException("select message queue return null.", null);
        }
    }
    

    而普通消息的发送,客户端调用方无需指定队列,MQ会轮询topic下面的MessageQueue发送消息,代码如下

    /**
     * 如果lastBrokerName不为null,则寻找与其不同的MessageQueue
     */
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName != null) {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
    
            return null;
        }
        else {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            return this.messageQueueList.get(pos);
        }
    }
    

    3.2 消息消费者

    消息消费方与普通消息消费只有一个地方不同,在consumer中注册MessageListenerOrderly,而不是MessageListenerConcurrently

    this.consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
        msgs.stream().forEach(
                messageExt -> {
                    //TODO consume message
                });
        return ConsumeOrderlyStatus.SUCCESS;
    });
    

    我们进一步看下RocketMQ是怎么处理的ConsumeMessageOrderlyService在启动的时候,如果是集群模式下会启动一个单线程的定时调度任务,延迟一秒,时间间隔为20秒,执行rebalanceImpl的lockAll()方法。

    public void start() {
        // 启动定时lock队列服务
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
            .messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);
        }
    }
    

    这个方法会锁定相关broker下面的相关的messagequeue,对message对应的processQueue设置是否锁定,这个地方下面会用到

    class ConsumeRequest implements Runnable {
            private final ProcessQueue processQueue;
            private final MessageQueue messageQueue;
    
    
            public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
                this.processQueue = processQueue;
                this.messageQueue = messageQueue;
            }
    
    
            @Override
            public void run() {
                if (this.processQueue.isDroped()) {
                    log.warn("run, the message queue not be able to consume, because it's dropped. {}",
                        this.messageQueue);
                    return;
                }
    
                // 保证在当前Consumer内,同一队列串行消费
                final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
                synchronized (objLock) {
                    // 保证在Consumer集群,同一队列串行消费
                    if (MessageModel.BROADCASTING
                        .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                        final long beginTime = System.currentTimeMillis();
                        for (boolean continueConsume = true; continueConsume;) {
                            if (this.processQueue.isDroped()) {
                                log.warn("the message queue not be able to consume, because it's dropped. {}",
                                    this.messageQueue);
                                break;
                            }
    
                            if (MessageModel.CLUSTERING
                                .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
                                    .messageModel())
                                    && !this.processQueue.isLocked()) {
                                log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                                ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,
                                    this.processQueue, 10);
                                break;
                            }
    
                            if (MessageModel.CLUSTERING
                                .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
                                    .messageModel())
                                    && this.processQueue.isLockExpired()) {
                                log.warn("the message queue lock expired, so consume later, {}",
                                    this.messageQueue);
                                ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,
                                    this.processQueue, 10);
                                break;
                            }
    
                            // 在线程数小于队列数情况下,防止个别队列被饿死
                            long interval = System.currentTimeMillis() - beginTime;
                            if (interval > MaxTimeConsumeContinuously) {
                                // 过10ms后再消费
                                ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue,
                                    messageQueue, 10);
                                break;
                            }
    
                            final int consumeBatchSize =
                                    ConsumeMessageOrderlyService.this.defaultMQPushConsumer
                                        .getConsumeMessageBatchMaxSize();
    
                            List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                            if (!msgs.isEmpty()) {
                                final ConsumeOrderlyContext context =
                                        new ConsumeOrderlyContext(this.messageQueue);
    
                                ConsumeOrderlyStatus status = null;
    
                                // 执行Hook
                                ConsumeMessageContext consumeMessageContext = null;
                                if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                    consumeMessageContext = new ConsumeMessageContext();
                                    consumeMessageContext
                                        .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer
                                            .getConsumerGroup());
                                    consumeMessageContext.setMq(messageQueue);
                                    consumeMessageContext.setMsgList(msgs);
                                    consumeMessageContext.setSuccess(false);
                                    ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
                                        .executeHookBefore(consumeMessageContext);
                                }
    
                                long beginTimestamp = System.currentTimeMillis();
                                //处理队列加锁
                                try {
                                    this.processQueue.getLockConsume().lock();
                                    if (this.processQueue.isDroped()) {
                                        log.warn(
                                            "consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                            this.messageQueue);
                                        break;
                                    }
    
                                    status =
                                            messageListener.consumeMessage(Collections.unmodifiableList(msgs),
                                                context);
                                }
                                catch (Throwable e) {
                                    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",//
                                        RemotingHelper.exceptionSimpleDesc(e),//
                                        ConsumeMessageOrderlyService.this.consumerGroup,//
                                        msgs,//
                                        messageQueue);
                                }
                                finally {
                                    this.processQueue.getLockConsume().unlock();
                                }
    
                                // 针对异常返回代码打印日志
                                if (null == status //
                                        || ConsumeOrderlyStatus.ROLLBACK == status//
                                        || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                                    log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",//
                                        ConsumeMessageOrderlyService.this.consumerGroup,//
                                        msgs,//
                                        messageQueue);
                                }
    
                                long consumeRT = System.currentTimeMillis() - beginTimestamp;
    
                                // 用户抛出异常或者返回null,都挂起队列
                                if (null == status) {
                                    status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                                }
    
                                // 执行Hook
                                if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                    consumeMessageContext.setStatus(status.toString());
                                    consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status
                                            || ConsumeOrderlyStatus.COMMIT == status);
                                    ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
                                        .executeHookAfter(consumeMessageContext);
                                }
    
                                // 记录统计信息
                                ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(
                                    ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(),
                                    consumeRT);
    
                                continueConsume =
                                        ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status,
                                            context, this);
                            }
                            else {
                                continueConsume = false;
                            }
                        }
                    }
                    // 没有拿到当前队列的锁,稍后再消费
                    else {
                        if (this.processQueue.isDroped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}",
                                this.messageQueue);
                            return;
                        }
    
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,
                            this.processQueue, 100);
                    }
                }
            }
    
    
            public ProcessQueue getProcessQueue() {
                return processQueue;
            }
    
    
            public MessageQueue getMessageQueue() {
                return messageQueue;
            }
        }
    

    4. 问题

    4.1 降低了吞吐量

    4.2 前一条消息消费出现问题,后续的处理流程会阻塞

    相关文章

      网友评论

        本文标题:RocketMQ顺序消息消费

        本文链接:https://www.haomeiwen.com/subject/mqqgtxtx.html