美文网首页rocketMq理论与实践
RocketMQ consumer Rebalance过程

RocketMQ consumer Rebalance过程

作者: 晴天哥_王志 | 来源:发表于2020-05-03 15:29 被阅读0次

    系列

    开篇

    • 这个系列的主要目的是介绍RocketMq consumer的原理和用法,在这个系列当中会介绍 consumer的启动流程、consumer Rebalance的过程、consumer注册过程、consumer 并行消费过程、consumer 有序消费过程。

    • 这篇文章介绍consumer Rebalance的过程,介绍consumer的重平衡过程,Rebalance过程按照订阅的topics依次进行重平衡过程。

    consumer example

    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            // 1、创建DefaultMQPushConsumer对象
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
            // 2、设置消费位移
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 3、订阅topic
            consumer.subscribe("TopicTest", "*");
            // 4、设置消费回调
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 5、启动DefaultMQPushConsumer
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }
    
    • 介绍rocketmq consumer的用法,按照下列步骤进行。
    • 创建DefaultMQPushConsumer对象。
    • 设置consumer的消费位移。
    • 设置consumer的订阅topic。
    • 设置consumer的消费回调。
    • 启动consumer。

    订阅过程

    public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    
        private final InternalLogger log = ClientLogger.getLog();
        private final DefaultMQPushConsumer defaultMQPushConsumer;
    
        // 核心的重平衡过程RebalanceImpl
        private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
    
        private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
        private final long consumerStartTimestamp = System.currentTimeMillis();
        private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
        private final RPCHook rpcHook;
        private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
        private MQClientInstance mQClientFactory;
        private PullAPIWrapper pullAPIWrapper;
        private volatile boolean pause = false;
        private boolean consumeOrderly = false;
        private MessageListener messageListenerInner;
        private OffsetStore offsetStore;
        private ConsumeMessageService consumeMessageService;
        private long queueFlowControlTimes = 0;
        private long queueMaxSpanFlowControlTimes = 0;
    
    
        public void subscribe(String topic, String subExpression) throws MQClientException {
            try {
                // 1、构建订阅对象SubscriptionData
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                    topic, subExpression);
    
                // 2、注册topic和对应的订阅关系到RebalanceImpl当中
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
    
                if (this.mQClientFactory != null) {
                    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                }
            } catch (Exception e) {
                throw new MQClientException("subscription exception", e);
            }
        }
    }
    
    
    public abstract class RebalanceImpl {
        protected static final InternalLogger log = ClientLogger.getLog();
        protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
        protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
            new ConcurrentHashMap<String, Set<MessageQueue>>();
        // 保存订阅关系
        protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
            new ConcurrentHashMap<String, SubscriptionData>();
        protected String consumerGroup;
        protected MessageModel messageModel;
        protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
        protected MQClientInstance mQClientFactory;
    
        public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
            return subscriptionInner;
        }
    }
    
    • consumer的订阅过程主要步骤如上源码所示。
    • 1、构建订阅对象SubscriptionData。
    • 2、注册topic和对应的订阅关系到RebalanceImpl当中。
    • consumer的订阅信息以topic维度进行保存,保存在RebalanceImpl中。
    public class FilterAPI {
    
        public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
            String subString) throws Exception {
            SubscriptionData subscriptionData = new SubscriptionData();
            subscriptionData.setTopic(topic);
            subscriptionData.setSubString(subString);
    
            if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
                subscriptionData.setSubString(SubscriptionData.SUB_ALL);
            } else {
                String[] tags = subString.split("\\|\\|");
                if (tags.length > 0) {
                    for (String tag : tags) {
                        if (tag.length() > 0) {
                            String trimString = tag.trim();
                            if (trimString.length() > 0) {
                                subscriptionData.getTagsSet().add(trimString);
                                subscriptionData.getCodeSet().add(trimString.hashCode());
                            }
                        }
                    }
                } else {
                    throw new Exception("subString split error");
                }
            }
    
            return subscriptionData;
        }
    }
    
    
    public class SubscriptionData implements Comparable<SubscriptionData> {
        public final static String SUB_ALL = "*";
        private boolean classFilterMode = false;
        private String topic;
        private String subString;
        private Set<String> tagsSet = new HashSet<String>();
        private Set<Integer> codeSet = new HashSet<Integer>();
        private long subVersion = System.currentTimeMillis();
        private String expressionType = ExpressionType.TAG;
    
        @JSONField(serialize = false)
        private String filterClassSource;
    }
    
    • SubscriptionData的数据结构主要包含topic、subString、tags等信息。
    • FilterAPI#buildSubscriptionData负责提供构建方法,不传tags默认表示订阅所有tags。

    Rebalance过程

    RebalanceService

    public class RebalanceService extends ServiceThread {
        private static long waitInterval =
            Long.parseLong(System.getProperty(
                "rocketmq.client.rebalance.waitInterval", "20000"));
    
        private final InternalLogger log = ClientLogger.getLog();
        private final MQClientInstance mqClientFactory;
    
        public RebalanceService(MQClientInstance mqClientFactory) {
            this.mqClientFactory = mqClientFactory;
        }
    
        @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
    
            while (!this.isStopped()) {
                this.waitForRunning(waitInterval);
                this.mqClientFactory.doRebalance();
            }
    
            log.info(this.getServiceName() + " service end");
        }
    }
    
    • MQClientInstance#start过程中执行rebalanceService.start()启动重平衡服务
    • RebalanceService以20s的间隔执行mqClientFactory.doRebalance()调用MQClientInstance#doRebalance。

    MQClientInstance

    public class MQClientInstance {
    
        private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
        private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
        private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
    
        public void doRebalance() {
            for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    try {
                        impl.doRebalance();
                    } catch (Throwable e) {
                        log.error("doRebalance exception", e);
                    }
                }
            }
        }
    
        public boolean registerConsumer(final String group, final MQConsumerInner consumer) {
            if (null == group || null == consumer) {
                return false;
            }
    
            MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
            if (prev != null) {
                log.warn("the consumer group[" + group + "] exist already.");
                return false;
            }
    
            return true;
        }
    }
    
    • MQClientInstance的consumerTable保存以consumer group作为key,DefaultMQPushConsumerImpl作为value的KV数据结构。
    • MQClientInstance的consumerTable通过registerConsumer方法来注册consumer group和对应的DefaultMQPushConsumerImpl对象。
    • MQClientInstance#doRebalance遍历所有consumer group,依次调用DefaultMQPushConsumerImpl#doRebalance来实现consumer的重平衡过程。

    DefaultMQPushConsumerImpl

    public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    
        private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
    
        @Override
        public void doRebalance() {
            if (!this.pause) {
                this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
            }
        }
    }
    
    RebalanceImpl
    • DefaultMQPushConsumerImpl包含RebalanceImpl对象。
    • RebalanceImpl的实现类包括RebalanceLitePullImpl、RebalancePullImpl、RebalancePushImpl。
    • 针对消费测我们关注的是RebalancePushImpl对象的方法,通用方法在父类RebalanceImpl当中

    RebalanceImpl

    public abstract class RebalanceImpl {
        // 保存topic和对应的订阅信息
        protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
            new ConcurrentHashMap<String, SubscriptionData>();
    
        public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
            return subscriptionInner;
        }
    
        public void doRebalance(final boolean isOrder) {
            Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
            if (subTable != null) {
                for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                    final String topic = entry.getKey();
                    try {
                        this.rebalanceByTopic(topic, isOrder);
                    } catch (Throwable e) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("rebalanceByTopic Exception", e);
                        }
                    }
                }
            }
    
            this.truncateMessageQueueNotMyTopic();
        }
    
    
        private void rebalanceByTopic(final String topic, final boolean isOrder) {
            switch (messageModel) {
                case BROADCASTING: {
                    // 执行广播模式下的负载均衡
                    break;
                }
                case CLUSTERING: {
                    // 执行集群模式下的负载均衡
                    break;
                }
                default:
                    break;
            }
        }
    }
    
    • RebalanceImpl#subscriptionInner保存topic和对应的订阅信息。
    • 遍历subscriptionInner当中的所有topic及其对应的订阅信息依次执行rebalanceByTopic过程。
    • rebalanceByTopic过程区分 有序和无序 以及 广播和集群 的两两组合。
    • 暂时关注集群模式下rebalance过程。

    集群模式下rebalance过程

    public abstract class RebalanceImpl {
    
        private void rebalanceByTopic(final String topic, final boolean isOrder) {
            switch (messageModel) {
    
                case BROADCASTING: {
                    // 省略相关代码
                }
    
                case CLUSTERING: {
                    // 1、获取topic下所有的MessageQueue
                    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                    // 2、获取topic下该consumerGroup下所有的consumer对象
                    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                    if (null == mqSet) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                        }
                    }
    
                    if (null == cidAll) {
                        log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                    }
                    // 开始重新分配进行rebalance
                    if (mqSet != null && cidAll != null) {
                        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                        mqAll.addAll(mqSet);
                        // 3、针对mqAll和cidAll进行排序
                        Collections.sort(mqAll);
                        Collections.sort(cidAll);
    
                        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    
                        List<MessageQueue> allocateResult = null;
                        try {
                            // 4、通过分配策略重新进行分配
                            allocateResult = strategy.allocate(
                                this.consumerGroup,
                                this.mQClientFactory.getClientId(),
                                mqAll,
                                cidAll);
                        } catch (Throwable e) {
                            log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                                e);
                            return;
                        }
    
                        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                        if (allocateResult != null) {
                            allocateResultSet.addAll(allocateResult);
                        }
                        // 5、根据分配结果执行真正的rebalance动作
                        boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                        if (changed) {
                            // 6、将rebalance的结果通知broker
                            this.messageQueueChanged(topic, mqSet, allocateResultSet);
                        }
                    }
                    break;
                }
                default:
                    break;
            }
        }
    }
    
    • 针对单个topic的整体rebalance操作如上图代码所示。
    • 1、获取topic下所有的MessageQueue。
    • 2、获取topic下该consumerGroup下所有的consumer的cid(如192.168.0.8@15958)。
    • 3、针对mqAll和cidAll进行排序,mqAll排序顺序按照先brokerName后brokerId,cidAll排序按照字符串排序。
    • 4、通过分配策略AllocateMessageQueueStrategy重新进行分配。
    • 5、根据分配结果执行真正的rebalance动作。
    • 6、将rebalance的结果通知broker。

    AllocateMessageQueueStrategy

    AllocateMessageQueueStrategy
    public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
        private final InternalLogger log = ClientLogger.getLog();
    
        @Override
        public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
            List<String> cidAll) {
            if (currentCID == null || currentCID.length() < 1) {
                throw new IllegalArgumentException("currentCID is empty");
            }
            if (mqAll == null || mqAll.isEmpty()) {
                throw new IllegalArgumentException("mqAll is null or mqAll empty");
            }
            if (cidAll == null || cidAll.isEmpty()) {
                throw new IllegalArgumentException("cidAll is null or cidAll empty");
            }
    
            List<MessageQueue> result = new ArrayList<MessageQueue>();
            if (!cidAll.contains(currentCID)) {
                log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                    consumerGroup,
                    currentCID,
                    cidAll);
                return result;
            }
    
            // 核心逻辑计算开始
    
            // 计算当前cid的下标
            int index = cidAll.indexOf(currentCID); 
            
            // 计算多余的模值
            int mod = mqAll.size() % cidAll.size(); 
    
            // 计算平均大小
            int averageSize =
                mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                    + 1 : mqAll.size() / cidAll.size());
            // 计算起始下标 
            int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; 
            // 计算范围大小
            int range = Math.min(averageSize, mqAll.size() - startIndex);
            // 组装结果
            for (int i = 0; i < range; i++) {
                result.add(mqAll.get((startIndex + i) % mqAll.size()));
            }
            return result;
        }
        // 核心逻辑计算结束
    
        @Override
        public String getName() {
            return "AVG";
        }
    }
    

    分配计算逻辑举例

    rocketMq的集群存在3个broker,分别是broker_a、broker_b、broker_c。
    
    rocketMq上存在名为topic_demo的topic,writeQueue为3,分布在3个broker。
    排序后的mqAll的大小为9,依次为
    [broker_a_0、broker_a_1、broker_a_2、
     broker_b_0、broker_b_1、broker_b_2、
     broker_c_0、broker_c_1、broker_c_2]
    
    rocketMq存在包含4个consumer的consumer_group,排序后cidAll依次为
    [192.168.0.6@15956、192.168.0.7@15957、192.168.0.8@15958、192.168.0.9@15959]
    
    192.168.0.6@15956 的分配MessageQueue结算过程
    index:0
    mod:9%4=1
    averageSize:9 / 4 + 1 = 3
    startIndex:0
    range:3
    messageQueue:[broker_a_0、broker_a_1、broker_a_2]
    
    
    192.168.0.6@15957 的分配MessageQueue结算过程
    index:1
    mod:9%4=1
    averageSize:9 / 4 = 2
    startIndex:3
    range:2
    messageQueue:[broker_b_0、broker_b_1]
    
    
    192.168.0.6@15958 的分配MessageQueue结算过程
    index:2
    mod:9%4=1
    averageSize:9 / 4 = 2
    startIndex:5
    range:2
    messageQueue:[broker_b_2、broker_c_0]
    
    
    192.168.0.6@15959 的分配MessageQueue结算过程
    index:3
    mod:9%4=1
    averageSize:9 / 4 = 2
    startIndex:7
    range:2
    messageQueue:[broker_c_1、broker_c_2]
    

    ConsumerIdList获取逻辑

    public class MQClientInstance {
    
        public List<String> findConsumerIdList(final String topic, final String group) {
            String brokerAddr = this.findBrokerAddrByTopic(topic);
            if (null == brokerAddr) {
                this.updateTopicRouteInfoFromNameServer(topic);
                brokerAddr = this.findBrokerAddrByTopic(topic);
            }
    
            if (null != brokerAddr) {
                try {
                    return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
                } catch (Exception e) {
                    log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
                }
            }
    
            return null;
        }
    
    
        public String findBrokerAddrByTopic(final String topic) {
            TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
            if (topicRouteData != null) {
                List<BrokerData> brokers = topicRouteData.getBrokerDatas();
                if (!brokers.isEmpty()) {
                    int index = random.nextInt(brokers.size());
                    BrokerData bd = brokers.get(index % brokers.size());
                    return bd.selectBrokerAddr();
                }
            }
    
            return null;
        }
    }
    
    • ConsumerIdList的获取是随机选择一台broker进行通信,从broker中获取该consumerGroup对应的consumers。
    • ConsumerIdList是从broker当中获取的,保存在broker当中,而非namesrv

    生效rebalance结果

    public class MQClientInstance {
    
        private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
            final boolean isOrder) {
            boolean changed = false;
    
            // 遍历已有的processQueueTable,删除不在此次rebalance结果里面的MessageQueue对应的processQueue。
            Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<MessageQueue, ProcessQueue> next = it.next();
                MessageQueue mq = next.getKey();
                ProcessQueue pq = next.getValue();
    
                if (mq.getTopic().equals(topic)) {
                    // 针对不在本次rebalance结果mqSet当中的情况,设置删除
                    if (!mqSet.contains(mq)) {
                        pq.setDropped(true);
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                            log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                        }
                    } else if (pq.isPullExpired()) {
                        switch (this.consumeType()) {
                            case CONSUME_ACTIVELY:
                                break;
                            case CONSUME_PASSIVELY:
                                pq.setDropped(true);
                                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                    it.remove();
                                    changed = true;
                                    log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                        consumerGroup, mq);
                                }
                                break;
                            default:
                                break;
                        }
                    }
                }
            }
    
            // 针对本次新增的MessageQueue,创建ProcessQueue并进行添加
            List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
            for (MessageQueue mq : mqSet) {
                // 针对本次新增的MessageQueue创建对应ProcessQueue。
                if (!this.processQueueTable.containsKey(mq)) {
                    if (isOrder && !this.lock(mq)) {
                        log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                        continue;
                    }
    
                    this.removeDirtyOffset(mq);
                    ProcessQueue pq = new ProcessQueue();
                    long nextOffset = this.computePullFromWhere(mq);
                    if (nextOffset >= 0) {
                        ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                        if (pre != null) {
                            log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                        } else {
                            log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                            PullRequest pullRequest = new PullRequest();
                            pullRequest.setConsumerGroup(consumerGroup);
                            pullRequest.setNextOffset(nextOffset);
                            pullRequest.setMessageQueue(mq);
                            pullRequest.setProcessQueue(pq);
                            pullRequestList.add(pullRequest);
                            changed = true;
                        }
                    } else {
                        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                    }
                }
            }
    
            this.dispatchPullRequest(pullRequestList);
    
            return changed;
        }
    }
    
    • 生效rebalance结果的过程阶段一:删除不负责的MessageQueue对应的ProcessQueue。
    • 生效rebalance结果的过程阶段二:添加新负责的MessageQueue对应的ProcessQueue。

    通知生效rebalance结果

    public class RebalancePushImpl extends RebalanceImpl {
    
        @Override
        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
    
            SubscriptionData subscriptionData = this.subscriptionInner.get(topic);
            long newVersion = System.currentTimeMillis();
            log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
            subscriptionData.setSubVersion(newVersion);
    
            int currentQueueCount = this.processQueueTable.size();
            if (currentQueueCount != 0) {
                int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
                if (pullThresholdForTopic != -1) {
                    int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount);
                    log.info("The pullThresholdForQueue is changed from {} to {}",
                        this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal);
                    this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);
                }
    
                int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
                if (pullThresholdSizeForTopic != -1) {
                    int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount);
                    log.info("The pullThresholdSizeForQueue is changed from {} to {}",
                        this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal);
                    this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
                }
            }
    
            // notify broker
            this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
        }
    }
    
    public class MQClientInstance {
    
        public void sendHeartbeatToAllBrokerWithLock() {
            if (this.lockHeartbeat.tryLock()) {
                try {
                    this.sendHeartbeatToAllBroker();
                    this.uploadFilterClassSource();
                } catch (final Exception e) {
                    log.error("sendHeartbeatToAllBroker exception", e);
                } finally {
                    this.lockHeartbeat.unlock();
                }
            } else {
                log.warn("lock heartBeat, but failed.");
            }
        }
    
    
        private void sendHeartbeatToAllBroker() {
            final HeartbeatData heartbeatData = this.prepareHeartbeatData();
            final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
            final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
    
            if (!this.brokerAddrTable.isEmpty()) {
                long times = this.sendHeartbeatTimesTotal.getAndIncrement();
    
                // 遍历所有的brokerAddrTable依次通知
                Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<String, HashMap<Long, String>> entry = it.next();
                    String brokerName = entry.getKey();
                    HashMap<Long, String> oneTable = entry.getValue();
                    if (oneTable != null) {
                        for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                            Long id = entry1.getKey();
                            String addr = entry1.getValue();
                            if (addr != null) {
                                if (consumerEmpty) {
                                    if (id != MixAll.MASTER_ID)
                                        continue;
                                }
    
                                try {
                                    // 获取broker地址后发送消息
                                    int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                                    if (!this.brokerVersionTable.containsKey(brokerName)) {
                                        this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                                    }
                                    this.brokerVersionTable.get(brokerName).put(addr, version);
                                } catch (Exception e) {
    
                                }
                            }
                        }
                    }
                }
            }
        }
    
    
        private HeartbeatData prepareHeartbeatData() {
            HeartbeatData heartbeatData = new HeartbeatData();
    
            // clientID
            heartbeatData.setClientID(this.clientId);
    
            // Consumer
            for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    ConsumerData consumerData = new ConsumerData();
                    consumerData.setGroupName(impl.groupName());
                    consumerData.setConsumeType(impl.consumeType());
                    consumerData.setMessageModel(impl.messageModel());
                    consumerData.setConsumeFromWhere(impl.consumeFromWhere());
                    consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
                    consumerData.setUnitMode(impl.isUnitMode());
    
                    heartbeatData.getConsumerDataSet().add(consumerData);
                }
            }
    
            // Producer
            for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
                MQProducerInner impl = entry.getValue();
                if (impl != null) {
                    ProducerData producerData = new ProducerData();
                    producerData.setGroupName(entry.getKey());
    
                    heartbeatData.getProducerDataSet().add(producerData);
                }
            }
    
            return heartbeatData;
        }
    }
    
    • consumer通过prepareHeartbeatData组装心跳数据,遍历所有的broker依次进行发送告知。

    相关文章

      网友评论

        本文标题:RocketMQ consumer Rebalance过程

        本文链接:https://www.haomeiwen.com/subject/iqbrghtx.html