美文网首页
RocketMQ源码解读——同一消费组下不同消费者订阅关系不同时

RocketMQ源码解读——同一消费组下不同消费者订阅关系不同时

作者: Zal哥哥 | 来源:发表于2021-01-05 14:39 被阅读0次

    RocketMQ源码解读——同一消费组下不同消费者订阅关系不同时
    @(rocketmq源码解读)

    先解释一下题目,我们假设有一个Producer和两个Consumer,Producer向TOPICA和TOPICB发送消息,两个Consumer分别订阅两个topic。我们看下这时候会出现的问题,以及根据源码分析一下为什么出现问题。

    现象
    现象其实还是比较隐蔽的,broker上会打印:the consumer's subscription not exist,group ...的日志(Consumer端也会打印类似的日志)。

    还会有一些subscription changed, group: ...类似的日志,并且如果仔细的话还会发现,其中一个消费者消费消息时,另外一个就不会消费。

    源码分析
    我们看一下为什么会导致这样的问题,一开始生看或者debug都是很难下手,这时候可能就需要使用必杀技(一般不外传那种)——问。

    问天问地,谷歌百度必应。我直接问了一个大神——芋艿。大神说这种情况会出问题,具体原因他也记不清了,导致这种现象的问题应该是消费关系不停地相互覆盖。

    好了,听到这句话我们就有入口了,至少知道应该从Broker上找起。

    顺藤摸瓜找到了原因,下面一起看一下源码。

    首先我们知道,消费者的两种实现(推和拉)中都维护一个MQClientInstance,这个类非常重要,在启动消费者的时候,都会去启动这个类,我们看下启动的代码,其中有这么一部分:

    // Start various schedule tasks
    this.startScheduledTask();
    复制代码
    这里启动了好多定时任务,我们追进去看一下:

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            MQClientInstance.this.cleanOfflineBroker();
            //定时发送心跳
            MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
        } catch (Exception e) {
            log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
        }
    }
    

    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    复制代码
    这里我们看到,消费者会定时发送心跳给Broker,我们继续追进去,最后找到sendHeartbeatToAllBroker方法:

    //给所有的broker发送心跳
    if (!this.brokerAddrTable.isEmpty()) {
    long times = this.sendHeartbeatTimesTotal.getAndIncrement();
    Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
    while (it.hasNext()) {
    Entry<String, HashMap<Long, String>> entry = it.next();
    String brokerName = entry.getKey();
    HashMap<Long, String> oneTable = entry.getValue();
    if (oneTable != null) {
    for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
    Long id = entry1.getKey();
    String addr = entry1.getValue();
    if (addr != null) {
    if (consumerEmpty) {
    if (id != MixAll.MASTER_ID)
    continue;
    }

                    try {
                        //真正发送心跳的部分
                        int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                        if (!this.brokerVersionTable.containsKey(brokerName)) {
                            this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                        }
                        this.brokerVersionTable.get(brokerName).put(addr, version);
                        if (times % 20 == 0) {
                            log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                            log.info(heartbeatData.toString());
                        }
                    } catch (Exception e) {
                        if (this.isBrokerInNameServer(addr)) {
                            log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
                        } else {
                            log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                id, addr);
                        }
                    }
                }
            }
        }
    }
    

    }
    复制代码
    这里会向所有的Broker发送心跳,我们根据我们的例子,这时候Broker是一台,我们再去Broker上看一下Broker如何处理心跳消息,我们根据发送的是HEART_BEAT类型的消息,可以在Broker上看到,这类消息使用ClientManageProcessor处理,我们看下处理心跳的部分(heartBeat方法):

    //循环所有发送过来的数据
    for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
    //根据消费组的名字获取broker上记录的消费消息
    SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());
    boolean isNotifyConsumerIdsChangedEnable = true;
    if (null != subscriptionGroupConfig) {
    isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
    int topicSysFlag = 0;
    if (data.isUnitMode()) {
    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
    }
    String newTopic = MixAll.getRetryTopic(data.getGroupName());
    this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
    newTopic,
    subscriptionGroupConfig.getRetryQueueNums(),
    PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
    }
    //注册消费者
    boolean changed = this.brokerController.getConsumerManager().registerConsumer(
    data.getGroupName(),
    clientChannelInfo,
    data.getConsumeType(),
    data.getMessageModel(),
    data.getConsumeFromWhere(),
    data.getSubscriptionDataSet(),
    isNotifyConsumerIdsChangedEnable
    );

    if (changed) {
        log.info("registerConsumer info changed {} {}",
            data.toString(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel())
        );
    }
    

    }
    复制代码
    我们可以看到,broker会根据consumer放过来的消息,获取自己这边记录的消费者订阅的信息,注意,获取时是按照消费组获取的,我们看下registerConsumer:

    //根据消费组获取消费者信息
    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    if (null == consumerGroupInfo) {
    ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
    //注意这里,这里consumerTable的键就是group
    ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
    consumerGroupInfo = prev != null ? prev : tmp;
    }
    boolean r1 =
    consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
    consumeFromWhere);
    boolean r2 = consumerGroupInfo.updateSubscription(subList);
    if (r1 || r2) {
    if (isNotifyConsumerIdsChangedEnable) {
    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
    }
    }
    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
    return r1 || r2;
    复制代码
    我们注意ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);这里,这句话告诉我们consumerTable中存放的消费者信息是按照消费组来的,那么一个组的消费信息如果不一样,按照我们的例子中,则订阅了TOPICA的消费者心跳信息告诉Broker:我们组订阅的是TOPICA!然后Broker就记录下来了。过了一会订阅了TOPICB的消费者心跳信息高速Broker:我们订阅的是TOPICB!

    这里就导致了订阅消息相互覆盖,那么拉取消息时,肯定有一个消费者没法拉到消息,因为Broker上查询不到订阅信息。

    至此我们就知道了导致上述现象的原因。

    https://blog.csdn.net/weixin_33922670/article/details/87988121?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-15&spm=1001.2101.3001.4242

    相关文章

      网友评论

          本文标题:RocketMQ源码解读——同一消费组下不同消费者订阅关系不同时

          本文链接:https://www.haomeiwen.com/subject/urrhoktx.html