美文网首页
(八)消费者的负载均衡功能

(八)消费者的负载均衡功能

作者: guessguess | 来源:发表于2021-07-08 17:34 被阅读0次

在前面讲了消费者是如何拉取消息的,具体入口如下

public class PullMessageService extends ServiceThread {
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }
    }
}

通过一个线程,循环地从pullRequestQueue(拉取请求的队列中)获取拉取消息的请求,随后进行处理,如果没有会一直等待。那么这个队列的数据是如何生成的。这里的话,涉及到消费者中另外一个很重要的功能,负载功能。

先来说说负载功能是做啥的。从消费者的角度来说,就是定时的去更新所消费的消息队列,就是上个时刻,这个消费者所消费的队列是A,下一刻可能是B。那么为什么要这么设计?
从负载功能的实现上来看:
消费者通过所订阅的主题,从注册中心获取到该主题对应的所有消息队列,以及路由数据(每个消息队列对应的Broker的真实地址)
通过分配策略,随机选取该主题的某个broker,将该broker的部分队列分配给消费者,而以前消费的队列,则重新分配给其他消费者了,是不是之前的队列,还是要看具体的策略。

在消费者所消费的队列所映射的Broker故障的时候,那么一直重试是毫无意义的,通过负载功能的分配策略,采取合适的分配策略,可以给消费者重新分配可用的消息队列,避免资源浪费。
此外分配策略是根据主题下的所有消息队列,以及同个消费组的所有消费者进行分配。如果采取的是平均分配的策略,在消费者增多的时候,每个消费者所承担的压力会减小,反之,队列增多的时候,不会让某个消费者的压力暴增,而是平均分配。
简而言之就是实现了负载均衡,具体看策略。

下面还是先来看看这个负载均衡是如何实现的。

public class MQClientInstance {
    public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
}

上面代码比较简单,其实就是消费者启动的功能。RebalanceService的结构与PullMessageService的结构是相似的,都是runnable的子接口。

RebalanceService的结构

public class RebalanceService extends ServiceThread {
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mqClientFactory;

    public RebalanceService(MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

    @Override
    public String getServiceName() {
        return RebalanceService.class.getSimpleName();
    }
}

从代码上来看,一样是runnable的接口,所以启动的话,也比较直观了。

RebalanceService的启动

父类中已经实现好了

public abstract class ServiceThread implements Runnable {
    private Thread thread;
    public void start() {
        log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
        if (!started.compareAndSet(false, true)) {
            return;
        }
        stopped = false;
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        this.thread.start();
    }
}

还是看看如何进行负载均衡
还是回到RebalanceService的run方法,通过代码得知,负载均衡是通过客户端的实例来实现的。

public class RebalanceService extends ServiceThread {
    private final MQClientInstance mqClientFactory;
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            通过客户端实例来实现负载均衡
            this.mqClientFactory.doRebalance();
        }
        log.info(this.getServiceName() + " service end");
    }
}

public class MQClientInstance {
    客户端实例,用于记录消费者与消费者核心实现的关系。
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    public void doRebalance() {
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    往下走,实现类为DefaultMQPushConsumerImpl 
                    impl.doRebalance();
                } catch (Throwable e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }
}

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
    @Override
    public void doRebalance() {
        if (!this.pause) {
            继续往下走,关注实现即可
            this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
        }
    }
}

public abstract class RebalanceImpl {
    用于维护 消费者的订阅数据
    protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
        new ConcurrentHashMap<String, SubscriptionData>();
    public void doRebalance(final boolean isOrder) {
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    对订阅的主题进行负载均衡
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }
        this.truncateMessageQueueNotMyTopic();
    }
}

一个mq客户端中,只有一个MQClientInstance,会维系这个客户端中,每个group对应的消费者。一个客户端中,每个group的消费者只能有一个。
每个消费者则会有一个消费的实现类实例,消费的实现类实例中,则维护了对应的负载均衡服务。负载均衡则根据维护的订阅数据,进行队列的重新分配。

RebalanceService实现的细节

主要有俩种模式,集群 与 广播

集群

public abstract class RebalanceImpl {
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: ...省略,广播模式
            case CLUSTERING: {
-------------------------------------------------------------------------------------------------------------------------------------------
                1.获取队列,与同组的消费者id
                从注册中心获取该主题下的所有队列
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                随机从该主题下的某个broker中,获取同组的所有消费者id,因为消费者会向该主题下所有的broker发送心跳,相当于注册到broker中。
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }
-------------------------------------------------------------------------------------------------------------------------------------------
                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }
                2.进行队列的重新分配
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
--------------------------------------------------------------------------------------------------------------------------------------
                    3.判断队列分配过后,是否分配的队列与原先消费的队列一致
                      如果不一致的话,则需要清理相关的缓存,以及将缓存的数据同步
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
-------------------------------------------------------------------------------------------------------------------------------------
                }
                break;
            }
        }
    }
}

大致就分为3步。
1.获取队列,与同组的消费者id
2.进行队列的重新分配
3.更新队列缓存

1.获取队列,与同组的消费者id

先来说说这一步是怎么做的

                根据主题,先从缓存中获取队列信息。topicSubscribeInfoTable是消费者启动的时候,通过注册中心,拉取到该主题的所有路由数据,随后存储到缓存中
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                获取同个消费组的所有消费者id,那么是如何获取到同组的所有的消费者id呢
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

队列的同步,负载均衡服务中队列缓存的同步

主题的消息队列是通过启动的时候,用一个定时任务定时去更新路由数据,顺便更新订阅数据,具体代码如下,实现的话不是本次需要关注的点,所以暂时只定位代码的位置,追踪一下最后是否会同步更新topicSubscribeInfoTable

public class MQClientInstance {
    private void startScheduledTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            省略部分代码
            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
        省略部分代码
    }
}

获取同组消费者的所有id

                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

看看这句代码的实现

public class MQClientInstance {
    public List<String> findConsumerIdList(final String topic, final String group) {
        随机选取一个broker的master的地址
        String brokerAddr = this.findBrokerAddrByTopic(topic);
        if (null == brokerAddr) {
            缓存中不存在,通过注册中心再次拉取数据,并存到缓存中
            this.updateTopicRouteInfoFromNameServer(topic);
            从缓存中获取地址
            brokerAddr = this.findBrokerAddrByTopic(topic);
        }

        if (null != brokerAddr) {
            通过Broker获取同一组内,所有的消费者id,并且返回。
            try {
                return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
            } catch (Exception e) {
                log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
            }
        }

        return null;
    }

    public String findBrokerAddrByTopic(final String topic) {
        随机选取该主题下的某个broker,返回该broker的master的地址
        TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
        if (topicRouteData != null) {
            List<BrokerData> brokers = topicRouteData.getBrokerDatas();
            if (!brokers.isEmpty()) {
                int index = random.nextInt(brokers.size());
                BrokerData bd = brokers.get(index % brokers.size());
                return bd.selectBrokerAddr();
            }
        }

        return null;
    }
}

因为每个消费者都会定时地topic里的所有Broker发送心跳,顺便注册,所以该topic下的所有的broker的cids(消费者id数据)都是一样的。所以可以随机选择一个broker的消费者id集合,因为topic下的所有broker的cids都是一样的。
定时发送心跳的代码如下

public class MQClientInstance {
    private void sendHeartbeatToAllBroker() {
        final HeartbeatData heartbeatData = this.prepareHeartbeatData();
        final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
        final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
        if (producerEmpty && consumerEmpty) {
            log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
            return;
        }

        if (!this.brokerAddrTable.isEmpty()) {
            long times = this.sendHeartbeatTimesTotal.getAndIncrement();
            Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, HashMap<Long, String>> entry = it.next();
                String brokerName = entry.getKey();
                HashMap<Long, String> oneTable = entry.getValue();
                if (oneTable != null) {
                    for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                        Long id = entry1.getKey();
                        String addr = entry1.getValue();
                        if (addr != null) {
                            if (consumerEmpty) {
                                if (id != MixAll.MASTER_ID)
                                    continue;
                            }

                            try {
                                int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                                if (!this.brokerVersionTable.containsKey(brokerName)) {
                                    this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                                }
                                this.brokerVersionTable.get(brokerName).put(addr, version);
                                if (times % 20 == 0) {
                                    log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                                    log.info(heartbeatData.toString());
                                }
                            } catch (Exception e) {
                                if (this.isBrokerInNameServer(addr)) {
                                    log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
                                } else {
                                    log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                        id, addr, e);
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}

2.进行队列的重新分配

这里分配策略其实是比较复杂的。暂时不往细讲。

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

3.更新队列的缓存

分配完队列之后,要做的就是判断是否与之前维护的消息队列不一致,不一致的话,则需要将旧的队列信息进行清理。同时旧队列相关的操作也要停止

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }

具体来看看是如何更新队列的

public abstract class RebalanceImpl {
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
        boolean changed = false;
---------------------------------------------------------------------------------------------------------------------------------------------
        1.移除旧的队列
        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        通过遍历processQueueTable,通过比较processQueueTable中的数据,与新分配的消息队列,将旧的队列清理
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            MessageQueue mq = next.getKey();
            ProcessQueue pq = next.getValue();
            if (mq.getTopic().equals(topic)) {
                如果新分配的队列中不包含该队列,说明该队列已经由其他消费者来进行维护
                if (!mqSet.contains(mq)) {
                    将pq的废弃标记设为true
                    pq.setDropped(true);
                    清理缓存,并且移除出队列
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                    }
                } else if (pq.isPullExpired()) {
                    switch (this.consumeType()) {
                        case CONSUME_ACTIVELY:
                            break;
                        case CONSUME_PASSIVELY:
                            pq.setDropped(true);
                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                it.remove();
                                changed = true;
                                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                    consumerGroup, mq);
                            }
                            break;
                        default:
                            break;
                    }
                }
            }
        }
-----------------------------------------------------------------------------------------------------------------------------------------------
    2.将新分配的消息队列,生成新的拉取任务列表。
    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }
                移除旧的偏移量
                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                计算出该消息队列的偏移量
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
--------------------------------------------------------------------------------------------------------------------------------------------
        3.将新的拉取任务列表进行分发
        this.dispatchPullRequest(pullRequestList);
-------------------------------------------------------------------------------------------------------------------------------------------
        return changed;
    }
}

3.1移除旧的队列

从上面的代码代码来看,通过新队列中是否包含旧队列,就知道是否要将该旧队列移除
那么问题来了,如何移除改旧队列,除了移除出processQueueTable,还需要做什么操作。
此前说,集群模式下的偏移量是保存在Broker的,那么移除旧队列的时候,是否要将该消息队列的消费情况同步到原先的broker中呢?不然后面消费该队列的消费者,怎么知道这个队列的消费进度,从何处开始消费呢
看看下面的代码

public class RebalancePushImpl extends RebalanceImpl {
    @Override
    public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
        偏移量持久化
        this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
        移除本地缓存
        this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
        省略部分代码
        return true;
    }
}

持久化偏移量,以及移除偏移量缓存的实现

public class RemoteBrokerOffsetStore implements OffsetStore {
    @Override
    public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        }

        if (findBrokerResult != null) {
            UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setConsumerGroup(this.groupName);
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setCommitOffset(offset);

            if (isOneway) {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            } else {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            }
        } else {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
        }
    }

    public void removeOffset(MessageQueue mq) {
        if (mq != null) {
            this.offsetTable.remove(mq);
            log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq,
                offsetTable.size());
        }
    }
}

3.2将新分配的消息队列,生成新的拉取任务列表

第二步的逻辑很清楚,就跳过

3.3.将新的拉取任务列表进行分发

第三步逻辑很简单, 将新生成的拉取请求列表,进行执行

public class RebalancePushImpl extends RebalanceImpl {
    @Override
    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {
            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
            log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
        }
    }
}

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    public void executePullRequestImmediately(final PullRequest pullRequest) {
        this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
    }
}

相对来说广播模式比较简单,消费者只需要关心消息队列的缓存是否发送变化,然后是否要去移除旧的队列即可,也不需要将消息队列的偏移量进行同步,因为广播的消息偏移量只需要客户端自己知晓即可。

相关文章

网友评论

      本文标题:(八)消费者的负载均衡功能

      本文链接:https://www.haomeiwen.com/subject/ikizultx.html