美文网首页
RocketMQ消费者消息队列负载均衡

RocketMQ消费者消息队列负载均衡

作者: 九点半的马拉 | 来源:发表于2019-11-30 22:56 被阅读0次

    先从整体流程上简单梳理一下消息队列负载的过程。

    消息队列负载由Rebalance线程默认每隔20s进行一次消息队列负载,获取主题队列信息mqSet与消费组当前所有消费者cidAll,然后按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消息队列,同一个消息消费队列同一时间只会分配给一个消费者。此时,可以计算当前消费者分配到消息队列集合,对比原先的负载队列与当前的分配队列。如果新队列集合中不包含原来的队列,则停止原先队列消息消费并移除,如果原先队列中不包含新分配队列则创建PullRequest。

    何时会触发启动

    • 每隔20s会自动进行一次
    • 每次有新的consumer加入到消费组中时,就会执行一次。

    提供的分配算法

    • AllocateMessageQueueAveragely: 平均分配。
    • AllocateMessageQueueAveragelyByCircle: 平均轮询分配
    • AllocateMessageQueueConsistentHash: 一致性hash
    • AllocateMessageQueueByConfig: 根据配置,为每一个消费者配置固定的消息队列。
    • AllocateMessageQueueByMachineRoom: 根据Broker部署机房名,对每个消费者负责不同的Broker上的队列。

    启动

    进行负载均衡是在RebalanceService线程中启动的,一个MQClientInstance持有一个RebalanceService实现,并随着MQClientInstance的启动而启动。

     @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                //waitInterval默认为20s。
                this.waitForRunning(waitInterval);
                //定时负载均衡
                this.mqClientFactory.doRebalance();
            }
    
            log.info(this.getServiceName() + " service end");
        }
    

    执行流程

    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    
    public void doRebalance() {
            for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    try {
                        impl.doRebalance();
                    } catch (Throwable e) {
                        log.error("doRebalance exception", e);
                    }
                }
            }
        }
    
    public class DefaultMQPushConsumerImpl implements MQConsumerInner
    

    从上面可以看出,MQClientinstance遍历已注册的消费者,对消费者执行doRebalance方法。

    protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
            new ConcurrentHashMap<String, SubscriptionData>();
    
    public void doRebalance(final boolean isOrder) {
            Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
            if (subTable != null) {
                for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                    final String topic = entry.getKey();
                    try {
                        this.rebalanceByTopic(topic, isOrder);
                    } catch (Throwable e) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("rebalanceByTopic Exception", e);
                        }
                    }
                }
            }
    
            this.truncateMessageQueueNotMyTopic();
        }
    

    上面是遍历订阅信息对每个主题的队列进行重新负载。接下来将执行rebalanceByTopic方法,会根据广播模式或集群模式分别采用不同的方法进行处理。在此处,只解释集群模式下的方法。

    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
    

    获取该主题下的队列信息和该消费组内当前所有的消费者ID。每个DefaultMQPushConsumerImpl都持有一个单独的RebalanceImpl对象。

     if (mqSet != null && cidAll != null) {
         List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
         mqAll.addAll(mqSet);
    
        Collections.sort(mqAll);
        Collections.sort(cidAll);
    
        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    
        List<MessageQueue> allocateResult = null;
        try {
        //根据策略进行分配
        allocateResult = strategy.allocate(//
        this.consumerGroup, //
        this.mQClientFactory.getClientId(), //
        mqAll, //
        cidAll);
        } catch (Throwable e) {
        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
        e);
        return;
                        }
    
    public interface AllocateMessageQueueStrategy {
    
        /**
         * Allocating by consumer id
         *
         * @param consumerGroup current consumer group
         * @param currentCID current consumer id
         * @param mqAll message queue set in current topic
         * @param cidAll consumer set in current consumer group
         * @return The allocate result of given strategy
         */
        List<MessageQueue> allocate(
            final String consumerGroup,
            final String currentCID,
            final List<MessageQueue> mqAll,
            final List<String> cidAll
        );
    

    对该主题下的队列信息和该消费组内当前所有的消费者ID进行排序,确保一个消费组的成员看到的顺序是一致的,防止同一个消费队列不会被多个消费者分配。
    allocateResult记录的是当前消费者的所分配的消息队列

     Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
     if (allocateResult != null) {
     allocateResultSet.addAll(allocateResult);
     }
    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
    

    调用updateProcessQueueTableInRebalance对比消息队列是否发生变化

     private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
            boolean changed = false;
    
            Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<MessageQueue, ProcessQueue> next = it.next();
                MessageQueue mq = next.getKey();
                ProcessQueue pq = next.getValue();
    
                if (mq.getTopic().equals(topic)) {
                    if (!mqSet.contains(mq)) {
                        pq.setDropped(true);
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                            log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                        }
                    } else if (pq.isPullExpired()) {
                        switch (this.consumeType()) {
                            case CONSUME_ACTIVELY:
                                break;
                            case CONSUME_PASSIVELY:
                                pq.setDropped(true);
                                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                    it.remove();
                                    changed = true;
                                    log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                        consumerGroup, mq);
                                }
                                break;
                            default:
                                break;
                        }
                    }
                }
            }
    
            List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
            for (MessageQueue mq : mqSet) {
                if (!this.processQueueTable.containsKey(mq)) {
                    if (isOrder && !this.lock(mq)) {
                        log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                        continue;
                    }
    
                    this.removeDirtyOffset(mq);
                    ProcessQueue pq = new ProcessQueue();
                    //计算消息队列开始消费位置
                    long nextOffset = this.computePullFromWhere(mq);
                    if (nextOffset >= 0) {
                        ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                        if (pre != null) {
                            log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                        } else {
                            log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                            PullRequest pullRequest = new PullRequest();
                            pullRequest.setConsumerGroup(consumerGroup);
                            pullRequest.setNextOffset(nextOffset);
                            pullRequest.setMessageQueue(mq);
                            pullRequest.setProcessQueue(pq);
                            pullRequestList.add(pullRequest);
                            changed = true;
                        }
                    } else {
                        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                    }
                }
            }
    
            //马上执行拉请求
            this.dispatchPullRequest(pullRequestList);
    
            return changed;
        }
    
     @Override
        public void removeDirtyOffset(final MessageQueue mq) {
            this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
        }
    

    从上面看,processQueueTable记录的是当前消费者负载的消息队列缓存表,该方法里面的mqSet记录的的是当前消费者经过负载分配后的消息队列集合。如果processQueueTable中的消息队列在mqSet中不存在,说明该消息队列已经被分配给其他消费者,所以需要暂停该消息队列消息的消费,通过** pq.setDropped(true);该语句即可。
    然后通过
    removeUnnecessaryMessageQueue**方法判断是否该mq从缓存中移除。

    之后,开始遍历本次负载分配给该消费者的消息队列结合mqSet。如果processQueueTable中没有包含该消息队列,表示这是本次新增加的消息队列。
    首先从内存中移除该消息队列的消息进度,然后调用computePullFromWhere从磁盘中读取该消息队列的消费进度,创建一个PullRequest对象。

    public long computePullFromWhere(MessageQueue mq) {
            long result = -1;
            final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
            final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
            switch (consumeFromWhere) {
                case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
                case CONSUME_FROM_MIN_OFFSET:
                case CONSUME_FROM_MAX_OFFSET:
                case CONSUME_FROM_LAST_OFFSET: {
                    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                    if (lastOffset >= 0) {
                        result = lastOffset;
                    }
                    // First start,no offset
                    else if (-1 == lastOffset) {
                        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            result = 0L;
                        } else {
                            try {
                                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                            } catch (MQClientException e) {
                                result = -1;
                            }
                        }
                    } else {
                        result = -1;
                    }
                    break;
                }
                case CONSUME_FROM_FIRST_OFFSET: {
                    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                    if (lastOffset >= 0) {
                        result = lastOffset;
                    } else if (-1 == lastOffset) {
                        result = 0L;
                    } else {
                        result = -1;
                    }
                    break;
                }
                case CONSUME_FROM_TIMESTAMP: {
                    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                    if (lastOffset >= 0) {
                        result = lastOffset;
                    } else if (-1 == lastOffset) {
                        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            try {
                                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                            } catch (MQClientException e) {
                                result = -1;
                            }
                        } else {
                            try {
                                long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                                    UtilAll.YYYYMMDDHHMMSS).getTime();
                                result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                            } catch (MQClientException e) {
                                result = -1;
                            }
                        }
                    } else {
                        result = -1;
                    }
                    break;
                }
    
                default:
                    break;
            }
    
            return result;
        }
    
    

    从上面看出,主要有三种计算消息进度的方法,有些大同小异。

    • CONSUME_FROM_LAST_OFFSET:从队列最新偏移量开始消费
      首先从磁盘中获取该消息队列的消费进度,如果大于0,说明该消息队列已经被消费过了,下次消费从该位置继续消费。如果等于-1,说明是首次消费,则从该消息队列的最大偏移量开始消费,如果小于-1,则说明该消息进度文件中存储了错误的偏移量,返回-1。

    • CONSUME_FROM_FIRST_OFFSET: 从头开始消费
      首先从磁盘中获取该消息队列的消费进度,如果大于0,说明该消息队列已经被消费过了,下次消费从该位置继续消费。如果等于-1,说明是首次消费,则返回0,从头开始消费,如果小于-1,则说明该消息进度文件中存储了错误的偏移量,返回-1。

    • CONSUME_FROM_TIMESTAMP: 从消费者启动的时间戳对应的消费进度开始消费

    首先从磁盘中获取该消息队列的消费进度,如果大于0,说明该消息队列已经被消费过了,下次消费从该位置继续消费。如果等于-1,尝试去操作消息存储时间戳作为消费者启动的时间戳,如果能找到则返回找到的偏移量,找不到则返回0;如果小于-1,则说明该消息进度文件中存储了错误的偏移量,返回-1。

    this.dispatchPullRequest(pullRequestList);
    
    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
            for (PullRequest pullRequest : pullRequestList) {
                //马上执行拉请求
                this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
                log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
            }
        }
    

    在该方法的最后,会调用dispatchPullRequest方法,将PullRequest加入到PullMessageService中,以唤醒PullMessageService线程,进行消息拉取。

    到这里,消费者负载均衡方面就结束了。

    相关文章

      网友评论

          本文标题:RocketMQ消费者消息队列负载均衡

          本文链接:https://www.haomeiwen.com/subject/uemawctx.html