美文网首页
RocketMQ源码分析----Consumer消费进度相关

RocketMQ源码分析----Consumer消费进度相关

作者: _六道木 | 来源:发表于2018-12-28 16:34 被阅读26次

    在Consumer消费的时候总有几个疑问:

    • 消费完成后,这个消费进度存在哪里
    • 消费完成后,还没保存消费进度就挂了,会不会导致重复消费

    Consumer

    消费进度保存

    消费完成后,会返回一个ConsumeConcurrentlyStatus.CONSUME_SUCCESS告诉MQ消费成功,以MessageListener的consumeMessage为入口分析。
    消费的时候,是以ConsumeRequest类为Runnable对象,在线程池中进行处理的,即ConsumeRequest的run方法会处理这个状态

            @Override
            public void run() {
    
                //....
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
                // 如果这个ProcessQueue废弃了,则不处理
                if (!processQueue.isDropped()) {
                    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
                }
            }
    

    在消费完成后,将status交给processConsumeResult处理,代码如下

        public void processConsumeResult(//
                                         final ConsumeConcurrentlyStatus status, //
                                         final ConsumeConcurrentlyContext context, //
                                         final ConsumeRequest consumeRequest//
        ) {
             //....消费成功或者失败的处理
            
            // 将这批消息从ProcessQueue中移除,代表消费完毕,并返回当前ProcessQueue中的消息最小的offset
            long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
            if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
                // 更新消费进度
                this.defaultMQPushConsumerImpl.getOffsetStore()
                    .updateOffset(consumeRequest.getMessageQueue(), offset, true);
            }
        }
    

    在分析ProcessQueue的时候,说过removeMessage返回有两种情况:

    1. 如果移除这批消息之后已经没有消息了,那么返回ProcessQueue中最大的offset+1
    2. 如果还有消息,那么返回treeMap中最小的key,即未消费的消息中最小的offset

    getOffsetStore返回RemoteBrokerOffsetStore,看下其实现

        @Override
        public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
            if (mq != null) {
                // 通过MessageQueue获取本地的对应的消费进度
                AtomicLong offsetOld = this.offsetTable.get(mq);
                if (null == offsetOld) {
                    offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
                }
    
                if (null != offsetOld) {
                    //increaseOnly 为false则直接覆盖
                    //increaseOnly为true则会判断更新的值比老的值大才会进行更新
                    if (increaseOnly) {
                        MixAll.compareAndIncreaseOnly(offsetOld, offset);
                    } else {
                        offsetOld.set(offset);
                    }
                }
            }
        }
    

    这里的increaseOnly参数根据不同的情况传入不同的值,有些情况下会出现并发修改的情况,那么需要传入true,内部会进行CAS的操作,能保证正确的赋值,而一些场景下,只需要进行直接覆盖或者说没有并发修改的问题那么传入false就行了。

    消费进度持久化

    offsetTable是一个Map,其保存了消费进度,这只一个内存的结构,在Consumer启动的时候,会启动一个定时任务将本地的数据同步到broker,每persistConsumerOffsetInterval(默认为5)秒进行一次操作

        // mqs为需要持久化的队列集合
        public void persistAll(Set<MessageQueue> mqs) {
            if (null == mqs || mqs.isEmpty())
                return;
    
            final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
            if (mqs != null && !mqs.isEmpty()) {
                // 遍历本地的消费进度
                for(Map.Entry<MessageQueue, AtomicLong> entry:this.offsetTable.entrySet()){
                    MessageQueue mq = entry.getKey();
                    AtomicLong offset = entry.getValue();
                    if (offset != null) {
                        // 如果该队列在需要持久化的队列中
                        if (mqs.contains(mq)) {
                            try {
                                // 将消费进度发送到broker
                                this.updateConsumeOffsetToBroker(mq, offset.get());
                            } catch (Exception e) {
                                log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
                            }
                        } else {//废弃的消费进度
                            unusedMQ.add(mq);
                        }
                    }
                }
            }
            // 如果有废弃的MQ,则将其消费进度废弃
            if (!unusedMQ.isEmpty()) {
                for (MessageQueue mq : unusedMQ) {
                    this.offsetTable.remove(mq);
                }
            }
        }
    

    传入的是当前Consumer分配的MessageQueue列表,rebalance之后,可能分配的MessageQueue已经变化,所以offsetTable里有些消费进度的队列时不需要的,所以将它的消费进度废弃
    updateConsumeOffsetToBroker方法就是简单的网络请求,将offset发送给Broker

    消费进度提交

    除了定时提交消费进度之外,在拉取消息的时候,会顺便将本地的消费进度一起传到broker,例如查看拉取消息的方法DefaultMQPushConsumerImpl#pullMessage中的一段代码

    boolean commitOffsetEnable = false;
            long commitOffsetValue = 0L;
            // 集群消费模式
            if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
                // 通过offsetStore获取当前消费进度
                // ReadOffsetType.READ_FROM_MEMORY表示从本地获取(即offsetTable)
                commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
                if (commitOffsetValue > 0) {//
                    // 传给Broker,让其判断是否需要保存消费进度
                    commitOffsetEnable = true;
                }
            }
            // 构造一些标志位,这里主要看commitOffsetEnable值
            // 将commitOffsetEnable放到一个int类型的值中,让broker判断是否需要保存消费进度
                    int sysFlag = PullSysFlag.buildSysFlag(//
                    commitOffsetEnable, // commitOffset
                    true, // suspend
                    subExpression != null, // subscription
                    classFilter // class filter
            );
            //....
                // 通过拉取消息请求,将commitOffsetValue和sysFlag传给broker
                this.pullAPIWrapper.pullKernelImpl(//
                        pullRequest.getMessageQueue(), // 1
                        subExpression, // 2
                        subscriptionData.getSubVersion(), // 3
                        pullRequest.getNextOffset(), // 4
                        this.defaultMQPushConsumer.getPullBatchSize(), // 5
                        sysFlag, // 6
                        commitOffsetValue, // 7
                        BrokerSuspendMaxTimeMillis, // 8
                        ConsumerTimeoutMillisWhenSuspend, // 9
                        CommunicationMode.ASYNC, // 10
                        pullCallback// 11
                );
    

    具体broker对消费进度的处理看后面分析

    Broker

    消费进度保存

    RocketMQ的网络请求都有一个RequestCode,更新消费进度的Code为UPDATE_CONSUMER_OFFSET,通过查到其使用的地方,找到对应的Processor为ClientManageProcessor,其processRequest处理对应的请求

        public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
                throws RemotingCommandException {
            switch (request.getCode()) {
                case RequestCode.HEART_BEAT:
                    return this.heartBeat(ctx, request);
                case RequestCode.UNREGISTER_CLIENT:
                    return this.unregisterClient(ctx, request);
                case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
                    return this.getConsumerListByGroup(ctx, request);
                case RequestCode.UPDATE_CONSUMER_OFFSET:
                    return this.updateConsumerOffset(ctx, request);
                case RequestCode.QUERY_CONSUMER_OFFSET:
                    return this.queryConsumerOffset(ctx, request);
                default:
                    break;
            }
            return null;
        }
    

    更新消费进度的方法为updateConsumerOffset,里面解析了请求体之后又调用了ConsumerOffsetManager.commitOffset方法

        public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
            // topic@group 
            String key = topic + TOPIC_GROUP_SEPARATOR + group;
            this.commitOffset(clientHost, key, queueId, offset);
        }
    
        private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
            ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
            if (null == map) {
                map = new ConcurrentHashMap<Integer, Long>(32);
                map.put(queueId, offset);
                this.offsetTable.put(key, map);
            } else {
                Long storeOffset = map.put(queueId, offset);
                if (storeOffset != null && offset < storeOffset) {
                    log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}",
                   clientHost, key, queueId, offset, storeOffset);
                }
            }
        }
    

    逻辑也很简单就不多说了,有意思的是,Broker的保存消费进度的结构和Consumer类似,Broker多了一个维度,因为Broker接收的是所有消费者的进度,而Consumer保存的是自己的
    在Consumer的消费进度上报到Broker之后,Broker只是保存到内存,这并不可靠,大概也能猜出,和Consumer一样,也有一个定时任务将消费进度持久化。这时,先看下ConsumerOffsetManager这个类的继承关系,他的父类是ConfigManager,这个东西很重要,是几个重要配置信息持久化类,看下其继承关系:


    image.png

    分别是订阅关系管理,消费进度管理,Topic信息管理,和延迟队列信息管理,这4个配置信息都需要通过ConfigManager去持久化和加载,看下ConfigManager的几个方法

    public abstract class ConfigManager {
        // 将对象转换成json串
        public abstract String encode();
    
        //将文件里内容(json格式)的转换成对象
        public boolean load() {
            String fileName = null;
                // 获取文件地址
                fileName = this.configFilePath();
                // 将文件里的内容读取出来
                String jsonString = MixAll.file2String(fileName);
                // json转换成指定对象的数据
                this.decode(jsonString);
        }
        // 配置文件地址
        public abstract String configFilePath();
        
        // 与load类似
        private boolean loadBak() {
            String fileName = null;
                fileName = this.configFilePath();
                String jsonString = MixAll.file2String(fileName + ".bak");
                this.decode(jsonString);
            return true;
        }
        // json转换成指定对象的数据
        public abstract void decode(final String jsonString);
        // 将对象里的数据转换成json并持久化到configFilePath()文件中
        public synchronized void persist() {
            String jsonString = this.encode(true);
                String fileName = this.configFilePath();
                    MixAll.string2File(jsonString, fileName);
            
        }
    
        public abstract String encode(final boolean prettyFormat);
    

    那么ConsumerOffsetManager会实现encode和decode方法并在某个地方定时调用persist方法,查看其使用的地方,找到BrokerController的initialize方法,有段定时任务如下:

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.consumerOffsetManager.persist();
            } catch (Throwable e) {
                log.error("schedule persist consumerOffset error.", e);
            }
        }
    }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    

    可以看到,每flushConsumerOffsetInterval(默认5000)毫秒会进行一次持久化

    拉取消息的时候保存消费进度

    拉取消息的Code为RequestCode.PULL_MESSAGE,对应的Processor为PullMessageProcessor,找到其中消费进度处理的地方

    // 上面说的consumer传过来的commitOffsetEnable
    // 当Consumer本地消费进度大于0的时候这个参数为true
    final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.
    
    // brokerAllowSuspend在处理消息请求的时候为true,hold请求自己处理是false
    boolean storeOffsetEnable = brokerAllowSuspend;
    storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
    // Master才需要保存进度,slave只是同步broker的消息
    storeOffsetEnable = storeOffsetEnable
            && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
    if (storeOffsetEnable) {
        this.brokerController.getConsumerOffsetManager().commitOffset(
            RemotingHelper.parseChannelRemoteAddr(channel),
            requestHeader.getConsumerGroup(), 
            requestHeader.getTopic(), 
            requestHeader.getQueueId(), 
            requestHeader.getCommitOffset());//consumer传上来的offset
    }
    

    总的来说:
    当broker为master的时候,且Consumer消费进度大于0则在拉取消息的时候顺便将消费进度保存到broker

    问题分析

    重复消费问题

    在ProcessQueue的removeMessage的第二种情况有个问题,假设有如下情况:
    批量拉取了4条消息ABCD,分别对应的offset为400|401|402|403,此时consumeBatchSize(批量消费数量,默认为1,即一条一条消费),那么会分4个线程去消费这几个消息,出现下面消费次序
    消费D -> removeMessage -> 返回400(情况2)
    消费C -> removeMessage -> 返回400(情况2)
    消费B -> removeMessage -> 返回400(情况2)
    消费A -> removeMessage -> 返回404(情况1)

    在消费A之前,本地消费进度持久化到Broker之后,应用宕机了,那么此时Broker保存的是offset=400(准确来说,在消费完A且保存消费进度到broker之前,offset都是400)。那么会有什么问题呢?
    先假设消费完DCB且消费进度上传完成宕机,然后重启应用,这时候会先从broker获取应该从哪里消费(),因为DCB消费完成后都是保存400这个消费进度,那么返回的是400,这时候consumer会请求offset为400的消费,到这里,已经重复消费了DCB。

    消费进度保存在哪里

    1. consumer保存在内存,定时上传broker
    2. broker保存在内存,定时刷新到磁盘文件

    :以上没有特别声明的都是并发消费模式

    整体流程图

    image.png

    相关文章

      网友评论

          本文标题:RocketMQ源码分析----Consumer消费进度相关

          本文链接:https://www.haomeiwen.com/subject/lvvmhqtx.html