美文网首页
(九)消费者的消费方式---并发消费

(九)消费者的消费方式---并发消费

作者: guessguess | 来源:发表于2021-07-20 16:17 被阅读0次

    对于消费者如何拉取消息的方式,大家应该起码有一个概念了。
    无非就是通过RebalanceService先进行队列的重新分配,随后通过PullMessageService一直轮询地去拉取消息。
    那么拉取到消息如何进行消费呢?

    首先定位一下入口,通过拉取消息的回调,来看看获取消息之后如何进行消费。

    public class DefaultMQPushConsumerImpl implements MQConsumerInner {
        public void pullMessage(final PullRequest pullRequest) {
            。。。省略部分代码
            PullCallback pullCallback = new PullCallback() {
                @Override
                public void onSuccess(PullResult pullResult) {
                    if (pullResult != null) {
                        pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                            subscriptionData);
    
                        switch (pullResult.getPullStatus()) {
                            case FOUND:
                                long prevRequestOffset = pullRequest.getNextOffset();
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                long pullRT = System.currentTimeMillis() - beginTimestamp;
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullRT);
    
                                long firstMsgOffset = Long.MAX_VALUE;
                                如果确实是没有消息可以被消费
                                if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                    再次发起拉取消息的请求
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                } else {
                                    拉取到的消息的最小偏移量
                                    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
    
                                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                    这个函数主要的作用就是将待消费的消息,进行一个临时的存储,放入到处理队列(processQueue)的成员变量(msgTreeMap)中
                                    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                    提交消费请求---同拉取相似,一样用的是线程池的方式。
                                    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                        pullResult.getMsgFoundList(),
                                        processQueue,
                                        pullRequest.getMessageQueue(),
                                        dispatchToConsume);
                                    提交消费请求后,决定是否继续拉取消息
                                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                    } else {
                                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                    }
                                }
                                break;
                                省略没有拉取到消息  以及偏移量修正的代码逻辑。
                        }
                    }
    
                }
            };
            进行拉取消息,更新回调
            try {
                this.pullAPIWrapper.pullKernelImpl(
                    pullRequest.getMessageQueue(),
                    subExpression,
                    subscriptionData.getExpressionType(),
                    subscriptionData.getSubVersion(),
                    pullRequest.getNextOffset(),
                    this.defaultMQPushConsumer.getPullBatchSize(),
                    sysFlag,
                    commitOffsetValue,
                    BROKER_SUSPEND_MAX_TIME_MILLIS,
                    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                    CommunicationMode.ASYNC,
                    pullCallback
                );
            } catch (Exception e) {
                log.error("pullKernelImpl exception", e);
                this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            }
    }
    

    从上面的代码可以得知,获取到消息之后,也是通过发送消费消息的请求,以进行消息的消费。
    ConsumeMessageService的submitConsumeRequest方法。
    在看如何消费之前,需要先看看消费服务的结构。

    ConsumeMessageService

    结构

    public interface ConsumeMessageService {
        void start();
    
        void shutdown(long awaitTerminateMillis);
    
        void updateCorePoolSize(int corePoolSize);
    
        void incCorePoolSize();
    
        void decCorePoolSize();
    
        int getCorePoolSize();
    
        ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
    
        void submitConsumeRequest(
            final List<MessageExt> msgs,
            final ProcessQueue processQueue,
            final MessageQueue messageQueue,
            final boolean dispathToConsume);
    }
    

    从这个接口的功能来看,必然也是线程池,可以控制核心线程数,以及开始/关闭。
    另外提供了2个消费方法,提交消费申请或者直接消费。
    接下来看看实现类。因为本次看的是并发消费,所以直接看看这个接口对应的实现类---并发消费。

    ConsumeMessageConcurrentlyService---并发消费

    结构上相对来说比较简单,所以就不直接画图了。先看看对应的成员变量

    public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
        private static final InternalLogger log = ClientLogger.getLog();//日志
        private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
        private final DefaultMQPushConsumer defaultMQPushConsumer;
        private final MessageListenerConcurrently messageListener;//消息监听器
        private final BlockingQueue<Runnable> consumeRequestQueue;//消费请求队列
        private final ThreadPoolExecutor consumeExecutor;//消费的线程池
        private final String consumerGroup;//消费组
        private final ScheduledExecutorService scheduledExecutorService;//定时消费线程池,主要用于延时提交消费请求---单线程线程池
        private final ScheduledExecutorService cleanExpireMsgExecutors;//清理过期消息---单线程线程池
    }
    

    从成员变量上来看,功能还是挺多的,可以延迟提交消费请求,清理过期消息
    接下来看看并发消费服务的实例化

        public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
            MessageListenerConcurrently messageListener) {
            this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
            this.messageListener = messageListener;
    
            this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
            this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
            无界队列
            this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
            消费线程池,为定长线程池,默认20线程
            this.consumeExecutor = new ThreadPoolExecutor(
                this.defaultMQPushConsumer.getConsumeThreadMin(),
                this.defaultMQPushConsumer.getConsumeThreadMax(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.consumeRequestQueue,
                new ThreadFactoryImpl("ConsumeMessageThread_"));
            
            this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
            this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
        }
    

    如何提交消费请求

    首先还是看看提交消费请求的实现

    public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
        @Override
        public void submitConsumeRequest(
            final List<MessageExt> msgs,
            final ProcessQueue processQueue,
            final MessageQueue messageQueue,
            final boolean dispatchToConsume) {
            默认为1.
            final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
            consumeBatchSize,消息每次最多拉取32条,可以通过配置调整。
            if (msgs.size() <= consumeBatchSize) {
                ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    this.submitConsumeRequestLater(consumeRequest);
                }
            } else {
                for (int total = 0; total < msgs.size(); ) {
                    List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                    for (int i = 0; i < consumeBatchSize; i++, total++) {
                        if (total < msgs.size()) {
                            msgThis.add(msgs.get(total));
                        } else {
                            break;
                        }
                    }
    
                    ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                    try {
                        this.consumeExecutor.submit(consumeRequest);
                    } catch (RejectedExecutionException e) {
                        for (; total < msgs.size(); total++) {
                            msgThis.add(msgs.get(total));
                        }
    
                        this.submitConsumeRequestLater(consumeRequest);
                    }
                }
            }
        }
    }
    

    从上面的代码来看,其实每一个消息都会封装成一个消费的请求。
    那么是如何进行消费的?由于是线程池消费的,所以我们只需要关注ConsumeRequest即可。

    如何进行消费

    由于是线程池消费的,所以我们只需要关注ConsumeRequest即可。
    还是先来看看结构

    ConsumeRequest的结构

    public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
        class ConsumeRequest implements Runnable {
            private final List<MessageExt> msgs;
            private final ProcessQueue processQueue;
            private final MessageQueue messageQueue;
         }
    }
    

    从结构上来看比较简单,由3个成员变量组成。本次消费的消息,对应的消息队列,以及处理队列。
    再来看看消费的实现。这个时候关注run方法即可。

    public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
        class ConsumeRequest implements Runnable {
            private final List<MessageExt> msgs;
            private final ProcessQueue processQueue;
            private final MessageQueue messageQueue;
         }
    
            @Override
            public void run() {
                if (this.processQueue.isDropped()) {
                如果刚刚好进行队列的重新分配,processQueue废弃状态为true,那么说明该队列也不由该消费者消费,直接返回
                    return;
                }
    
                MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
                ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
                ConsumeConcurrentlyStatus status = null;
                defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
    
                ConsumeMessageContext consumeMessageContext = null;
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext = new ConsumeMessageContext();
                    consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
                    consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
                    consumeMessageContext.setProps(new HashMap<String, String>());
                    consumeMessageContext.setMq(messageQueue);
                    consumeMessageContext.setMsgList(msgs);
                    consumeMessageContext.setSuccess(false);
                    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                }
    
                long beginTimestamp = System.currentTimeMillis();
                boolean hasException = false;
                ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                try {
                    if (msgs != null && !msgs.isEmpty()) {
                        for (MessageExt msg : msgs) {
                            给消息的属性中设置开始消费的时间。
                            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                        }
                    }
                    通过监听器进行消费,并且返回消费状态
                    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
                } catch (Throwable e) {
                    hasException = true;
                }
                long consumeRT = System.currentTimeMillis() - beginTimestamp;
                if (null == status) {
                    if (hasException) {
                        returnType = ConsumeReturnType.EXCEPTION;
                    } else {
                        returnType = ConsumeReturnType.RETURNNULL;
                    }
                } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                    returnType = ConsumeReturnType.TIME_OUT;
                } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                    returnType = ConsumeReturnType.FAILED;
                } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                    returnType = ConsumeReturnType.SUCCESS;
                }
    
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                }
    
                if (null == status) {
                    消费状态为空,说明消费失败,待会重试
                    status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
    
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext.setStatus(status.toString());
                    consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                }
    
                ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
                    .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
                如果处理队列没有被废弃
                if (!processQueue.isDropped()) {
                    处理消费结果
                    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
                } else {
                    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
                }
            }
    }
    

    消息监听器会返回消费的结果,最后根据结果再做处理。

    对于消费结果的处理

    代码如下

    public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
        public void processConsumeResult(
            final ConsumeConcurrentlyStatus status,
            final ConsumeConcurrentlyContext context,
            final ConsumeRequest consumeRequest
        ) {
            int ackIndex = context.getAckIndex();
    
            if (consumeRequest.getMsgs().isEmpty())
                return;
    
            switch (status) {
    --------------------------------------------------------------------------------------------------------------------------------
                消费成功
                case CONSUME_SUCCESS:
                    if (ackIndex >= consumeRequest.getMsgs().size()) {
                        ackIndex 为消息列表长度-1
                        ackIndex = consumeRequest.getMsgs().size() - 1;
                    }
                    int ok = ackIndex + 1;
                    int failed = consumeRequest.getMsgs().size() - ok;
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                    break;
    ----------------------------------------------------------------------------------------------------------------------------------
                消费失败,稍后重试
                case RECONSUME_LATER:
                    ackIndex设置为-1
                    ackIndex = -1;
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                        consumeRequest.getMsgs().size());
                    break;
    ---------------------------------------------------------------------------------------------------------------------------------
                default:
                    break;
            }
    
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    如果消费成功,ackIndex=size -1,自然不会开始遍历,为-1就会开始遍历
                    从代码来看,只会打印日志
                    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                        MessageExt msg = consumeRequest.getMsgs().get(i);
                        log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                    }
                    break;
                case CLUSTERING:
                    用于存储sendback失败的消息
                    List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                    如果消费成功,ackIndex=size -1,自然不会开始遍历,为-1就会开始遍历
                    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                        MessageExt msg = consumeRequest.getMsgs().get(i);
                        将消费失败的消息发送回broker
                        boolean result = this.sendMessageBack(msg, context);
                        if (!result) {
                            如果sendback失败,保存起来, 重复消费次数+1
                            msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                            msgBackFailed.add(msg);
                        }
                    }
                    对sendback失败的消息进行再次消费。
                    if (!msgBackFailed.isEmpty()) {
                        consumeRequest.getMsgs().removeAll(msgBackFailed);
                        this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                    }
                    break;
                default:
                    break;
            }
            将本次消费的消息移除,避免重复消费,返回处理队列中最小的偏移量。
            long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
            处理队列没有被废弃且偏移量大于0
            if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            将偏移量进行同步
           this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
            }
    }
    
    并发消费的过程

    从代码逻辑来看,并发消费,其实是通过线程池对每一个消费请求进行消费。

    相关文章

      网友评论

          本文标题:(九)消费者的消费方式---并发消费

          本文链接:https://www.haomeiwen.com/subject/uhjlmltx.html