美文网首页
RocketMQ源码分析----ProcessQueue

RocketMQ源码分析----ProcessQueue

作者: _六道木 | 来源:发表于2018-12-18 01:56 被阅读69次

    RockerMQ里有个非常重要的数据结构叫ProcessQueue,很多功能,例如消费进度,消费等等功能的底层核心数据保存都是有ProcessQueue提供,下面介绍一下ProcessQueue提供的功能,而整个涉及的流程不会在这展开,在另外的功能分析文章如果涉及才会深入分析

    看下代码上的注释:

    Queue consumption snapshot

    即消息快照的意思,为什么要这样形容呢?主要是因为在消息拉取到的时候,会把消息存放在其中。另外在拉取消息的时候,使用是的PullRequest去请求,其内部结构如下:

    public class PullRequest {
        private String consumerGroup;
        private MessageQueue messageQueue;
        private ProcessQueue processQueue;
        private long nextOffset;
        private boolean lockedFirst = false;
    }
    

    可以看到,ProcessQueue和一个MessageQueue是对应的,即一个队列会有一个ProcessQueue的数据结构,看下其主要的字段

    public class ProcessQueue {
        public final static long RebalanceLockMaxLiveTime =
                Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
        public final static long RebalanceLockInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
        private final static long PullMaxIdleTime = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
        // 用来保存拉取到的消息
        private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
        // 当前保存的消息数,放进来的时候会加,移除的时候会减
        private final AtomicLong msgCount = new AtomicLong();
        // 消费锁,主要在顺序消费和移除ProcessQueue的时候使用
        private final Lock lockConsume = new ReentrantLock();
        // 顺序消费的时候使用
        private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
        // 记录了废弃ProcessQueue的时候lockConsume的次数
        private final AtomicLong tryUnlockTimes = new AtomicLong(0);
        // ProcessQueue中保存的消息里的最大offset,为ConsumeQueue的offset
        private volatile long queueOffsetMax = 0L;
        // 该数据结构里的消息是否废弃
        private volatile boolean dropped = false;
        // 上次执行拉取消息的时间
        private volatile long lastPullTimestamp = System.currentTimeMillis();
        // 上次消费完消息后记录的时间
        private volatile long lastConsumeTimestamp = System.currentTimeMillis();
        
        private volatile boolean locked = false;
        // 上次锁定的时间
        private volatile long lastLockTimestamp = System.currentTimeMillis();
        // 是否正在消息
        private volatile boolean consuming = false;
        // 该参数为调整线程池的时候提供了数据参考
        private volatile long msgAccCnt = 0;
    }
    

    isLockExpired

        public boolean isLockExpired() {
            boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > RebalanceLockMaxLiveTime;
            return result;
        }
    

    顺序消费的时候使用,消费之前会判断一下ProcessQueue锁定时间是否超过阈值(默认30000ms),如果没有超时,代表还是持有锁,具体细节在顺序消费的时候会详细说明.
    负载

    isPullExpired

        public boolean isPullExpired() {
            boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PullMaxIdleTime;
            return result;
        }
    

    在拉取的时候更新lastPullTimestamp的值,然后在rebalance的时候会去判断ProcessQueue已经超过一定的时间没有去拉取消息,如果是的话,则将ProcessQueue废弃(setDropped(true))且从ProcessQueue和MessageQueue的对应关系中移除该ProcessQueue,代码细节如下:

    if (pq.isPullExpired()) {
        switch (this.consumeType()) {
            case CONSUME_ACTIVELY:
                break;
            case CONSUME_PASSIVELY:
                pq.setDropped(true);
                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                    it.remove();
                    changed = true;
                    log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                            consumerGroup, mq);
                }
                break;
            default:
                break;
        }
    }
    

    根据打的日志推测,这个应该是个BUG,在某种情况下,拉取会停止,导致时间没有更新,这时候重建ProcessQueue,具体是什么原因,这点不太清楚

    cleanExpiredMsg

        public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
            if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {// 顺序消费不处理
                return;
            }
            // 最多处理16条消息
            int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
            for (int i = 0; i < loop; i++) {
                MessageExt msg = null;
                try {
                    this.lockTreeMap.readLock().lockInterruptibly();
                    try {
                        // 存在待处理的消息
                        // 且offset最小的消息消费时间大于consumeTimeout() * 60 * 1000(默认15分钟)
                        if (!msgTreeMap.isEmpty() 
                    && System.currentTimeMillis() -
                     Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) 
                    > pushConsumer.getConsumeTimeout() * 60 * 1000) {
                            msg = msgTreeMap.firstEntry().getValue();
                        } else {
                            break;
                        }
                    } finally {
                        this.lockTreeMap.readLock().unlock();
                    }
                } catch (InterruptedException e) {
                    log.error("getExpiredMsg exception", e);
                }
    
                try {
                    // 将消息发回Broker,等待重试,且延迟级别为3
                    // 该效果是消费失败重试原理类似
                    pushConsumer.sendMessageBack(msg, 3);
                    try {
                        this.lockTreeMap.writeLock().lockInterruptibly();
                        try {
                            // 如果这个时候,ProcessQueue里offset最小的消息还等于上面取到的消息
                            // 那么就将其移除,有可能在上面取出消息处理的过程中,消息已经被消费,且从ProcessQueue中移除
                            if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
                                try {
                                    msgTreeMap.remove(msgTreeMap.firstKey());
                                } catch (Exception e) {
                                    log.error("send expired msg exception", e);
                                }
                            }
                        } finally {
                            this.lockTreeMap.writeLock().unlock();
                        }
                    } catch (InterruptedException e) {
                        log.error("getExpiredMsg exception", e);
                    }
                } catch (Exception e) {
                    log.error("send expired msg exception", e);
                }
            }
        }
    

    上面是并发消费模式下,定时清理消费时间超过15分钟的消息的逻辑,在消费者启动的时候,就好开启一个定时任务定时调用该方法

        public void start() {
            this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    cleanExpireMsg();
                }
    
            }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
        }
    

    15分钟执行一次

    putMessage

        public boolean putMessage(final List<MessageExt> msgs) {
            // 返回值,顺序消费有用,返回true表示可以消费
            boolean dispatchToConsume = false;
            try {
                this.lockTreeMap.writeLock().lockInterruptibly();
                try {
                    int validMsgCnt = 0;
                    for (MessageExt msg : msgs) {
                        // 以offset为key,放到treemap中
                        MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                        if (null == old) {// 正常
                            validMsgCnt++;
                            // 更新当前ProcessQueue中消息最大的offset
                            this.queueOffsetMax = msg.getQueueOffset();
                        }
                    }
                    // 新增消息数量
                    msgCount.addAndGet(validMsgCnt);
                    // 如果ProcessQueue有需要处理的消息(从上可知,如果msgs不为空那么msgTreeMap不为空)
                    // 如果consuming为false,将其设置为true,表示正在消费
                    // 这个值在放消息的时候会设置为true,在顺序消费模式,取不到消息则设置为false
                    if (!msgTreeMap.isEmpty() && !this.consuming) {
                       // 有消息,且为未消费状态,则顺序消费模式可以消费
                        dispatchToConsume = true;
                        this.consuming = true;
                    }
    
                    if (!msgs.isEmpty()) {
                        MessageExt messageExt = msgs.get(msgs.size() - 1);
                        // property为ConsumeQueue里最大的offset
                        String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                        if (property != null) {
                            long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
                            if (accTotal > 0) {// 当前消息的offset与最大消息的差值,相当于还有多少offset没有消费
                                this.msgAccCnt = accTotal;
                            }
                        }
                    }
                } finally {
                    this.lockTreeMap.writeLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("putMessage exception", e);
            }
    
            return dispatchToConsume;
        }
    

    getMaxSpan

        public long getMaxSpan() {
            try {
                this.lockTreeMap.readLock().lockInterruptibly();
                try {
                    if (!this.msgTreeMap.isEmpty()) {
                        return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
                    }
                }
                finally {
                    this.lockTreeMap.readLock().unlock();
                }
            }
            catch (InterruptedException e) {
                log.error("getMaxSpan exception", e);
            }
    
            return 0;
        }
    

    获取当前这批消息中最大最小offset之前的差距,这个方法主要在拉取消息的时候,用来判断当前有多少消息未处理,如果大于某个值(默认2000),则进行流控处理

    //DefaultMQPushConsumerImpl.pullMessage
            if (!this.consumeOrderly) {
                // 大于2000
                if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                    //  将PullRequest存起来一会再执行(PullRequest是用来发起拉取消息请求的参数载体)
                    //PullTimeDelayMillsWhenFlowControl默认为50
                    this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenFlowControl);
                    // 流控次数加一,每1000次则打印日志
                    if ((flowControlTimes2++ % 1000) == 0) {
                        log.warn(
                                "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                                processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                                pullRequest, flowControlTimes2);
                    }
                    return;
                }
            } else {
                //....
            }
        //DefaultMQPushConsumerImpl.executePullRequestLater
         private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
            this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
        }
        //PullMessageService.executePullRequestLater
        public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
              // timeDelay毫秒后再执行拉取请求
            this.scheduledExecutorService.schedule(new Runnable() {
                @Override
                public void run() {
                    // 将PullRequest放回拉取消息的队列中,这样拉取线程就会取到,马上进行请求
                    PullMessageService.this.executePullRequestImmediately(pullRequest);
                }
            }, timeDelay, TimeUnit.MILLISECONDS);
        }
    

    removeMessage

        public long removeMessage(final List<MessageExt> msgs) {
            // 返回给外部的值,代表当前消费进度的offset
            long result = -1;
            final long now = System.currentTimeMillis();
            try {
                this.lockTreeMap.writeLock().lockInterruptibly();
                this.lastConsumeTimestamp = now;
                try {
                    if (!msgTreeMap.isEmpty()) {
                        result = this.queueOffsetMax + 1;
                        int removedCnt = 0;
                        // 遍历消息,将其从TreeMap中移除
                        for (MessageExt msg : msgs) {
                            MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                            if (prev != null) {// 不为空证明移除成功
                                removedCnt--;// 移除消息数
                            }
                        }
                       // msgCount是ProcessQueue中的消息数量,移除了则需要减去该值,即加上该值的负数
                        msgCount.addAndGet(removedCnt);
                        // 如果还有消息存在,则使用当前最小的offset作为消费进度
                        // 如果已经没有消息了,则使用之前ProcessQueue里最大的offset作为消费进度
                        if (!msgTreeMap.isEmpty()) {
                            result = msgTreeMap.firstKey();
                        }
                    }
                } finally {
                    this.lockTreeMap.writeLock().unlock();
                }
            } catch (Throwable t) {
            }
            return result;
        }
    

    这里的返回值和消费进度有很大的关系,在后面分析消费进度的时候会再深入分析

    takeMessags

        public List<MessageExt> takeMessags(final int batchSize) {
            List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
            final long now = System.currentTimeMillis();
            try {
                this.lockTreeMap.writeLock().lockInterruptibly();
                this.lastConsumeTimestamp = now;
                try {
                    if (!this.msgTreeMap.isEmpty()) {
                        // 从treeMap中获取batchSize条数据,每次都返回offset最小的那条并移除
                        for (int i = 0; i < batchSize; i++) {
                            Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
                            if (entry != null) {
                                // 放到返回列表和一个临时用的treemapp中
                                result.add(entry.getValue());
                                msgTreeMapTemp.put(entry.getKey(), entry.getValue());
                            } else {
                                break;
                            }
                        }
                    }
                    // 取到消息了就会开始进行消费,如果没取到,则不需要消费,那么consuming设为false
                    if (result.isEmpty()) {
                        consuming = false;
                    }
                } finally {
                    this.lockTreeMap.writeLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("take Messages exception", e);
            }
    
            return result;
        }
    

    该方法顺序消费模式中使用的,取到该消息后就会调用我们定义的MessageListener进行消费

    commit

    在顺序消费模式下,调用takeMessages取消息,其内部逻辑中,将treeMap的消息放到一个临时用的treeMap里,然后进行消费,消费完成后需要将这个临时的map清除,则是调用commit方法

        public long commit() {
            try {
                this.lockTreeMap.writeLock().lockInterruptibly();
                try {
                    // msgTreeMapTemp是这次消费的消息集合,lastKey代表当前消费的进度
                    Long offset = this.msgTreeMapTemp.lastKey();
                    // 消费完成,减去该批次的消息数量
                    msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
                    // 清除消息
                    this.msgTreeMapTemp.clear();
                    if (offset != null) {
                        // 返回消费进度
                        return offset + 1;
                    }
                } finally {
                    this.lockTreeMap.writeLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("commit exception", e);
            }
    
            return -1;
        }
    

    makeMessageToCosumeAgain

    当顺序模式返回SUSPEND_CURRENT_QUEUE_A_MOMENT,那么可能会调用该方法,该方法名称意思为:让消息重新消费。
    回顾一下上面说的流程:

    1. 取消息:从treeMap里取出消息然后放到临时treeMap,等待消费成功
    2. 消费成功:删除临时treeMap

    从上面两部可以猜出,当消费失败的时候,不能无视临时treeMap和treeMap,应该要将临时treeMap的消息放回去,如果不放回去的话,一会重新消费的时候,从treeMap中就取不到原来那批消费失败的数据了,具体逻辑在后面分析

        public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
            try {
                this.lockTreeMap.writeLock().lockInterruptibly();
                try {
                    // 将这批没消费成功的消息从临时treeMap中移除
                    // 并放回treeMap,等待下次消费
                    for (MessageExt msg : msgs) {
                        this.msgTreeMapTemp.remove(msg.getQueueOffset());
                        this.msgTreeMap.put(msg.getQueueOffset(), msg);
                    }
                } finally {
                    this.lockTreeMap.writeLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("makeMessageToCosumeAgain exception", e);
            }
        }
    

    isDropped

    返回true代表这个ProcessQueue被废弃了,具体出现的原因大概如下:

    1. rebalance之后,原来存在的MessageQueue不在本地新分配的MessageQueue之中,则把对应的ProcessQueue废弃。
      举个栗子:0123这个4个队列,一开始分配给A消费者,这时候启动一个B消费者后,A消费者分配了01这两个队列,那么原来34队列的ProcessQueue就会设置成true
    2. rebalance的时候,会将未订阅的topic下对应的ProcessQueue设置成true
    3. 还有就是上面isPullExpired讨论的情况
    4. 当拉取消息的时候,如果broker返回OFFSET_ILLEGAL,那么这时候将对应的ProcessQueue废弃
    5. consumer关闭(调用shutdown方法)的时候也会废弃

    上面就是ProcessQueue提供的一些功能,有很多上层的功能都依赖他的实现,看别的东西前要先了解ProcessQueue,所以上面对ProcessQueue的功能进行了分析,稍微发散了一下,因为涉及的面比较广,所以相关的细节没有展开,待后面文章遇到了再进行分析

    ProcessQueue相关的知识点:

    1. 消费模式:顺序、并行
    2. 消费进度管理
    3. rebalance负载均衡
    4. 等等....

    相关文章

      网友评论

          本文标题:RocketMQ源码分析----ProcessQueue

          本文链接:https://www.haomeiwen.com/subject/ecwmhqtx.html