美文网首页rocketMq理论与实践
RocketMQ consumer 注册过程

RocketMQ consumer 注册过程

作者: 晴天哥_王志 | 来源:发表于2020-05-03 17:28 被阅读0次

    系列

    开篇

    • 这个系列的主要目的是介绍RocketMq consumer的原理和用法,在这个系列当中会介绍 consumer的启动流程、consumer Rebalance的过程、consumer注册过程、consumer 并行消费过程、consumer 有序消费过程。

    • 这篇文章介绍consumer注册过程,分析consumer注册到broker的过程。

    consumer 注册流程

    consumer的注册流程核心包括:1、consumer通知broker注册;2、broker通知consumer进行rebalance。

    consumer 注册源码分析

    public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    
        public synchronized void start() throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                   
                    this.serviceState = ServiceState.START_FAILED;
    
                    this.checkConfig();
                    // 内部会生成%RETRY%consume_group重试队列
                    this.copySubscription();
    
                    if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                        this.defaultMQPushConsumer.changeInstanceNameToPID();
                    }
    
                    this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
    
                    this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                    this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                    this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                    this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    
                    this.pullAPIWrapper = new PullAPIWrapper(
                        mQClientFactory,
                        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                    this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
    
                    if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                        this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                    } else {
                        switch (this.defaultMQPushConsumer.getMessageModel()) {
                            case BROADCASTING:
                                this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            case CLUSTERING:
                                this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            default:
                                break;
                        }
                        this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                    }
                    this.offsetStore.load();
    
                    if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                        this.consumeOrderly = true;
                        this.consumeMessageService =
                            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                    } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                        this.consumeOrderly = false;
                        this.consumeMessageService =
                            new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                    }
    
                    this.consumeMessageService.start();
                    
                    // 注册consumer group 和 对应的DefaultMQPushConsumerImpl到MQClientInstance#consumerTable当中
                    boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        this.consumeMessageService.shutdown();
                        throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }
    
                    mQClientFactory.start();
                    log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }
    
            this.updateTopicSubscribeInfoWhenSubscriptionChanged();
            this.mQClientFactory.checkClientInBroker();
            // 通过sendHeartbeatToAllBrokerWithLock通知所有的broker
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            this.mQClientFactory.rebalanceImmediately();
        }
    
    
        private void copySubscription() throws MQClientException {
            try {
                Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
                if (sub != null) {
                    for (final Map.Entry<String, String> entry : sub.entrySet()) {
                        final String topic = entry.getKey();
                        final String subString = entry.getValue();
                        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                            topic, subString);
                        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                    }
                }
    
                if (null == this.messageListenerInner) {
                    this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
                }
                // 针对CLUSTERING模式会生成retryTopic对应的订阅SubscriptionData
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        break;
                    case CLUSTERING:
                        final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                            retryTopic, SubscriptionData.SUB_ALL);
                        this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                        break;
                    default:
                        break;
                }
            } catch (Exception e) {
                throw new MQClientException("subscription exception", e);
            }
        }
    }
    
    • DefaultMQPushConsumerImpl的启动过程中会注册consumer group和对应的DefaultMQPushConsumerImpl到MQClientInstance#consumerTable中
    • 通过sendHeartbeatToAllBrokerWithLock将consumer注册信息通知所有的broker。
    • copySubscription方法内部针对CLUSTERING模式会生成retryTopic对应的订阅SubscriptionData

    MQClientInstance发送心跳注册信息

    public class MQClientInstance {
    
        private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
        private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
        private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
        private final Lock lockHeartbeat = new ReentrantLock();
    
        public boolean registerConsumer(final String group, final MQConsumerInner consumer) {
            if (null == group || null == consumer) {
                return false;
            }
    
            MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
            if (prev != null) {
                log.warn("the consumer group[" + group + "] exist already.");
                return false;
            }
    
            return true;
        }
    
        public void sendHeartbeatToAllBrokerWithLock() {
            if (this.lockHeartbeat.tryLock()) {
                try {
                    this.sendHeartbeatToAllBroker();
                    this.uploadFilterClassSource();
                } catch (final Exception e) {
                    log.error("sendHeartbeatToAllBroker exception", e);
                } finally {
                    this.lockHeartbeat.unlock();
                }
            } else {
                log.warn("lock heartBeat, but failed.");
            }
        }
    
    
        private void sendHeartbeatToAllBroker() {
            // prepareHeartbeatData 负责组装心跳注册信息
            final HeartbeatData heartbeatData = this.prepareHeartbeatData();
            final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
            final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
            if (producerEmpty && consumerEmpty) {
                log.warn("sending heartbeat, but no consumer and no producer");
                return;
            }
    
            if (!this.brokerAddrTable.isEmpty()) {
                long times = this.sendHeartbeatTimesTotal.getAndIncrement();
                Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<String, HashMap<Long, String>> entry = it.next();
                    String brokerName = entry.getKey();
                    HashMap<Long, String> oneTable = entry.getValue();
                    if (oneTable != null) {
                        for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                            Long id = entry1.getKey();
                            String addr = entry1.getValue();
                            if (addr != null) {
                                if (consumerEmpty) {
                                    if (id != MixAll.MASTER_ID)
                                        continue;
                                }
    
                                try {
                                    // 发送心跳注册信息
                                    int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
    
                                    if (!this.brokerVersionTable.containsKey(brokerName)) {
                                        this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                                    }
                                    this.brokerVersionTable.get(brokerName).put(addr, version);
    
                                } catch (Exception e) {
    
                                }
                            }
                        }
                    }
                }
            }
        }
    
    
        private HeartbeatData prepareHeartbeatData() {
            HeartbeatData heartbeatData = new HeartbeatData();
    
            // clientID
            heartbeatData.setClientID(this.clientId);
    
            // Consumer
            for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    ConsumerData consumerData = new ConsumerData();
                    consumerData.setGroupName(impl.groupName());
                    consumerData.setConsumeType(impl.consumeType());
                    consumerData.setMessageModel(impl.messageModel());
                    consumerData.setConsumeFromWhere(impl.consumeFromWhere());
                    consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
                    consumerData.setUnitMode(impl.isUnitMode());
    
                    heartbeatData.getConsumerDataSet().add(consumerData);
                }
            }
    
            // Producer
            for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
                MQProducerInner impl = entry.getValue();
                if (impl != null) {
                    ProducerData producerData = new ProducerData();
                    producerData.setGroupName(entry.getKey());
    
                    heartbeatData.getProducerDataSet().add(producerData);
                }
            }
    
            return heartbeatData;
        }
    }
    
    • MQClientInstance发送心跳信息包括组装信息和发送信息两个步骤。
    • prepareHeartbeatData 负责组装心跳注册信息。
    • sendHeartbeatToAllBroker负责将心跳注册信息发送给所有的broker信息。
    public class MQClientAPIImpl {
    
        public int sendHearbeat(
            final String addr,
            final HeartbeatData heartbeatData,
            final long timeoutMillis
        ) throws RemotingException, MQBrokerException, InterruptedException {
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
            request.setLanguage(clientConfig.getLanguage());
            request.setBody(heartbeatData.encode());
            RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    return response.getVersion();
                }
                default:
                    break;
            }
    
            throw new MQBrokerException(response.getCode(), response.getRemark());
        }
    }
    
    • MQClientAPIImpl#sendHearbeat负责创建HEART_BEAT类型的RemotingCommand并发送到broker当中。

    broker处理心跳信息

    public class ClientManageProcessor implements NettyRequestProcessor {
    
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        private final BrokerController brokerController;
    
        public ClientManageProcessor(final BrokerController brokerController) {
            this.brokerController = brokerController;
        }
    
        @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
            switch (request.getCode()) {
                case RequestCode.HEART_BEAT:
                    return this.heartBeat(ctx, request);
                case RequestCode.UNREGISTER_CLIENT:
                    return this.unregisterClient(ctx, request);
                case RequestCode.CHECK_CLIENT_CONFIG:
                    return this.checkClientConfig(ctx, request);
                default:
                    break;
            }
            return null;
        }
    
    
        public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
    
            RemotingCommand response = RemotingCommand.createResponseCommand(null);
            HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
            ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
                ctx.channel(),
                heartbeatData.getClientID(),
                request.getLanguage(),
                request.getVersion()
            );
    
            for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
    
                SubscriptionGroupConfig subscriptionGroupConfig =
                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                        data.getGroupName());
                boolean isNotifyConsumerIdsChangedEnable = true;
                if (null != subscriptionGroupConfig) {
                    isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
                    int topicSysFlag = 0;
                    if (data.isUnitMode()) {
                        topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
                    }
                    // 重试队列是以consumer group为维度的,创建重试队列topic信息
                    // topic名的格式为%RETRY%consumer_group
                    String newTopic = MixAll.getRetryTopic(data.getGroupName());
                    // 创建重试队列对应的topic=%RETRY%consumer_group
    this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                        newTopic,
                        subscriptionGroupConfig.getRetryQueueNums(),
                        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
                }
    
                // 注册consumer信息
                boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                    data.getGroupName(),
                    clientChannelInfo,
                    data.getConsumeType(),
                    data.getMessageModel(),
                    data.getConsumeFromWhere(),
                    data.getSubscriptionDataSet(),
                    isNotifyConsumerIdsChangedEnable
                );
    
                if (changed) {
                    log.info("registerConsumer info changed {} {}",
                        data.toString(),
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                    );
                }
            }
    
            // 注册producer信息
            for (ProducerData data : heartbeatData.getProducerDataSet()) {
                this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
                    clientChannelInfo);
            }
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
    }
    
    • broker侧的ClientManageProcessor#heartBeat负责处理心跳注册信息。
    • 1、ClientManageProcessor#heartBeat创建%RETRY%consumer_group的topic信息。
    • 2、ClientManageProcessor#heartBeat通过registerConsumer注册consumer信息。
    • 3、ClientManageProcessor#heartBeat通过registerProducer注册producer信息。
    public class ConsumerManager {
    
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
        private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
            new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
        private final ConsumerIdsChangeListener consumerIdsChangeListener;
    
        public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
            ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
            final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
    
            ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
            if (null == consumerGroupInfo) {
                ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
                ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
                consumerGroupInfo = prev != null ? prev : tmp;
            }
    
            // 1、更新clientChannelInfo信息
            boolean r1 =
                consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                    consumeFromWhere);
    
            // 2、更新SubscriptionData的订阅信息
            boolean r2 = consumerGroupInfo.updateSubscription(subList);
    
            if (r1 || r2) {
                if (isNotifyConsumerIdsChangedEnable) {
                  // 3、处理CHANGE事件
                  this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
                }
            }
    
            // 4、注册REGISTER事件
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
    
            return r1 || r2;
        }
    }
    
    • 1、consumerTable变量保存consumer group对应的ConsumerGroupInfo。
    • 2、registerConsumer更新clientChannelInfo信息。
    • 3、registerConsumer更新SubscriptionData的订阅信息
    • 4、registerConsumer处理CHANGE类型事件。
    • 5、registerConsumer处理REGISTER类型事件。
    public class ConsumerGroupInfo {
    
        private final String groupName;
        private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
            new ConcurrentHashMap<String, SubscriptionData>();
        private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
            new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
        private volatile ConsumeType consumeType;
        private volatile MessageModel messageModel;
        private volatile ConsumeFromWhere consumeFromWhere;
        private volatile long lastUpdateTimestamp = System.currentTimeMillis();
    
        public boolean updateSubscription(final Set<SubscriptionData> subList) {
            boolean updated = false;
    
            for (SubscriptionData sub : subList) {
                SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
                if (old == null) {
                    SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
                    if (null == prev) {
                        updated = true;
                        log.info("subscription changed, add new topic, group: {} {}",
                            this.groupName,
                            sub.toString());
                    }
                } else if (sub.getSubVersion() > old.getSubVersion()) {
                    if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
                        log.info("subscription changed, group: {} OLD: {} NEW: {}",
                            this.groupName,
                            old.toString(),
                            sub.toString()
                        );
                    }
    
                    this.subscriptionTable.put(sub.getTopic(), sub);
                }
            }
    
            Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, SubscriptionData> next = it.next();
                String oldTopic = next.getKey();
    
                boolean exist = false;
                for (SubscriptionData sub : subList) {
                    if (sub.getTopic().equals(oldTopic)) {
                        exist = true;
                        break;
                    }
                }
    
                if (!exist) {
                    log.warn("subscription changed, group: {} remove topic {} {}",
                        this.groupName,
                        oldTopic,
                        next.getValue().toString()
                    );
    
                    it.remove();
                    updated = true;
                }
            }
    
            this.lastUpdateTimestamp = System.currentTimeMillis();
    
            return updated;
        }
    
    
        public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
            MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
            boolean updated = false;
            this.consumeType = consumeType;
            this.messageModel = messageModel;
            this.consumeFromWhere = consumeFromWhere;
    
            ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
            if (null == infoOld) {
                ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
                if (null == prev) {
                    log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
                        messageModel, infoNew.toString());
                    updated = true;
                }
    
                infoOld = infoNew;
            } else {
                if (!infoOld.getClientId().equals(infoNew.getClientId())) {
                    log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
                        this.groupName,
                        infoOld.toString(),
                        infoNew.toString());
                    this.channelInfoTable.put(infoNew.getChannel(), infoNew);
                }
            }
    
            this.lastUpdateTimestamp = System.currentTimeMillis();
            infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
    
            return updated;
        }
    }
    
    • subscriptionTable保存的是以topic为维度的订阅信息SubscriptionData。
    • updateSubscription负责更新指定topic的Subscription信息。
    • channelInfoTable保存ClientChannelInfo信息。
    • updateChannel更新ClientChannelInfo信息。

    ConsumerIds变更通知

    public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
        private final BrokerController brokerController;
    
        public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
            this.brokerController = brokerController;
        }
    
        @Override
        public void handle(ConsumerGroupEvent event, String group, Object... args) {
            if (event == null) {
                return;
            }
            switch (event) {
                case CHANGE:
                    if (args == null || args.length < 1) {
                        return;
                    }
                    List<Channel> channels = (List<Channel>) args[0];
                    if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
                        for (Channel chl : channels) {
                            this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
                        }
                    }
                    break;
                case UNREGISTER:
                    this.brokerController.getConsumerFilterManager().unRegister(group);
                    break;
                case REGISTER:
                    if (args == null || args.length < 1) {
                        return;
                    }
                    Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
                    this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
                    break;
                default:
                    throw new RuntimeException("Unknown event " + event);
            }
        }
    }
    
    
    public class Broker2Client {
    
        public void notifyConsumerIdsChanged(
            final Channel channel,
            final String consumerGroup) {
            if (null == consumerGroup) {
                log.error("notifyConsumerIdsChanged consumerGroup is null");
                return;
            }
    
            NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
            requestHeader.setConsumerGroup(consumerGroup);
            RemotingCommand request =
                RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
    
            try {
                this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
            } catch (Exception e) {
                log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
            }
        }
    }
    
    • 针对CHANGE事件,brokerController.getBroker2Client().notifyConsumerIdsChanged 通知client进行rebalance。

    ClientRemotingProcessor

    public class ClientRemotingProcessor implements NettyRequestProcessor {
    
        public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            try {
                final NotifyConsumerIdsChangedRequestHeader requestHeader =
                    (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
                log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.getConsumerGroup());
    
                // 立即开启rebalance
                this.mqClientFactory.rebalanceImmediately();
            } catch (Exception e) {
                log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
            }
            return null;
        }
    }
    
    • notifyConsumerIdsChanged会立即开启consumer的rebalance。

    相关文章

      网友评论

        本文标题:RocketMQ consumer 注册过程

        本文链接:https://www.haomeiwen.com/subject/owddghtx.html