RocketMq 心跳报文

作者: 晴天哥_王志 | 来源:发表于2020-07-01 17:36 被阅读0次

    开篇

    • 这篇文章的主要目的是分析RocketMq的集群中各个组件之间的定时心跳任务。
    • producer/consumer 和 broker之间通过心跳报文来维持连接。
    • broker 和 namesrv之间通过定时上报来维持连接。
    • producer/consumer 和 namesrv之间通过定时拉取Topic路由信息来维持连接。
    心跳报文

    producer/consumer心跳包

    • producer和consumer通过发送心跳包来维持broker的连接关系。

    consumer/producer

    public class MQClientInstance {
    
        private void startScheduledTask() {
            
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.cleanOfflineBroker();
                        MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                    } catch (Exception e) {
                        log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                    }
                }
            }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
        }
    
    
        public void sendHeartbeatToAllBrokerWithLock() {
            if (this.lockHeartbeat.tryLock()) {
                try {
                    // 给所有的broker发送心跳包
                    this.sendHeartbeatToAllBroker();
                    this.uploadFilterClassSource();
                } catch (final Exception e) {
                    log.error("sendHeartbeatToAllBroker exception", e);
                } finally {
                    this.lockHeartbeat.unlock();
                }
            } else {
                log.warn("lock heartBeat, but failed.");
            }
        }
    
    
    
        private void sendHeartbeatToAllBroker() {
            // 组装HeartbeatData心跳数据
            final HeartbeatData heartbeatData = this.prepareHeartbeatData();
            final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
            final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
            if (producerEmpty && consumerEmpty) {
                log.warn("sending heartbeat, but no consumer and no producer");
                return;
            }
            // producer只向broker的master节点发送心跳包
            // consumer向broker的master/slave节点发送心跳包
            if (!this.brokerAddrTable.isEmpty()) {
                long times = this.sendHeartbeatTimesTotal.getAndIncrement();
                Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<String, HashMap<Long, String>> entry = it.next();
                    String brokerName = entry.getKey();
                    HashMap<Long, String> oneTable = entry.getValue();
                    if (oneTable != null) {
                        for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                            Long id = entry1.getKey();
                            String addr = entry1.getValue();
                            if (addr != null) {
                                // 针对producer只向master发送心跳包
                                if (consumerEmpty) {
                                    if (id != MixAll.MASTER_ID)
                                        continue;
                                }
    
                                try {
                                    // 发送真正的心跳包
                                    int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                                    if (!this.brokerVersionTable.containsKey(brokerName)) {
                                        this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                                    }
                                    this.brokerVersionTable.get(brokerName).put(addr, version);
                                } catch (Exception e) {
                                }
                            }
                        }
                    }
                }
            }
        }
    
    
        // 负责准备组装心跳报文
        private HeartbeatData prepareHeartbeatData() {
            HeartbeatData heartbeatData = new HeartbeatData();
    
            // clientID
            heartbeatData.setClientID(this.clientId);
    
            // Consumer
            for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    // Consumer的配置信息ConsumerData
                    ConsumerData consumerData = new ConsumerData();
                    consumerData.setGroupName(impl.groupName());
                    consumerData.setConsumeType(impl.consumeType());
                    consumerData.setMessageModel(impl.messageModel());
                    consumerData.setConsumeFromWhere(impl.consumeFromWhere());
                    consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
                    consumerData.setUnitMode(impl.isUnitMode());
    
                    heartbeatData.getConsumerDataSet().add(consumerData);
                }
            }
    
            // Producer
            for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
                MQProducerInner impl = entry.getValue();
                if (impl != null) {
                    ProducerData producerData = new ProducerData();
                    producerData.setGroupName(entry.getKey());
    
                    heartbeatData.getProducerDataSet().add(producerData);
                }
            }
    
            return heartbeatData;
        }
    }
    
    • 以30s的时间间隔定时执行sendHeartbeatToAllBrokerWithLock方法来发送心跳包。
    • sendHeartbeatToAllBroker负责执行发送心跳报文的任务。
    • sendHeartbeatToAllBroker的核心逻辑组装心跳报文,针对producer的只向master节点发送心跳包,针对consumer的向master/slave发送心跳包。
    • prepareHeartbeatData负责组装心跳包,针对producer和consumer生成心跳包。
    • producer的心跳包内容只包含producerGroup信息即可。
    • consumer的心跳包内容ConsumerData包含消费订阅信息。

    broker#ClientManageProcessor

    public class ClientManageProcessor implements NettyRequestProcessor {
    
        public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
    
            RemotingCommand response = RemotingCommand.createResponseCommand(null);
            HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
            ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
                ctx.channel(),
                heartbeatData.getClientID(),
                request.getLanguage(),
                request.getVersion()
            );
    
            // 1、处理consumer的心跳报文
            for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
                SubscriptionGroupConfig subscriptionGroupConfig =
                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                        data.getGroupName());
                boolean isNotifyConsumerIdsChangedEnable = true;
                if (null != subscriptionGroupConfig) {
                    isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
                    int topicSysFlag = 0;
                    if (data.isUnitMode()) {
                        topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
                    }
                    // 创建重试队列
                    String newTopic = MixAll.getRetryTopic(data.getGroupName());
                    this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                        newTopic,
                        subscriptionGroupConfig.getRetryQueueNums(),
                        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
                }
                // 通过ConsumerManager来注册consumer
                boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                    data.getGroupName(),
                    clientChannelInfo,
                    data.getConsumeType(),
                    data.getMessageModel(),
                    data.getConsumeFromWhere(),
                    data.getSubscriptionDataSet(),
                    isNotifyConsumerIdsChangedEnable
                );
    
                if (changed) {
                    log.info("registerConsumer info changed {} {}",
                        data.toString(),
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                    );
                }
            }
            // 2、处理producer的心跳包
            for (ProducerData data : heartbeatData.getProducerDataSet()) {
                // 通过ProducerManager来注册producer
                this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
                    clientChannelInfo);
            }
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
    }
    
    • ClientManageProcessor#heartBeat负责处理producer/consumer发送过来的心跳包。
    • heartBeat方法内部会区分producer/consumer的心跳包来实现不同的逻辑。
    • ConsumerManager负责处理保存consumer的相关信息
    • ProducerManager负责保存producer的相关信息。

    broker心跳包

    • broker负责定时上报broker上的TopicConfig信息到namesrv当中。

    BrokerController

    public class BrokerController {
    
        public void start() throws Exception {
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
        }
    
    
        public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
    
            // 负责组装TopicConfig信息并上报namesrv。
            TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    
            if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
                for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                    TopicConfig tmp =
                        new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                            this.brokerConfig.getBrokerPermission());
                    topicConfigTable.put(topicConfig.getTopicName(), tmp);
                }
                topicConfigWrapper.setTopicConfigTable(topicConfigTable);
            }
    
            if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.brokerConfig.getRegisterBrokerTimeoutMills())) {
                doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
            }
        }
    
    
        private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
            TopicConfigSerializeWrapper topicConfigWrapper) {
    
            List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.getHAServerAddr(),
                topicConfigWrapper,
                this.filterServerManager.buildNewFilterServerList(),
                oneway,
                this.brokerConfig.getRegisterBrokerTimeoutMills(),
                this.brokerConfig.isCompressedRegister());
    
            if (registerBrokerResultList.size() > 0) {
                RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
                if (registerBrokerResult != null) {
                    if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                        this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                    }
    
                    this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
    
                    if (checkOrderConfig) {
                        this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                    }
                }
            }
        }
    }
    
    • Broker通过定时上报Topic的配置信息到namesrv当中。
    • TopicConfigSerializeWrapper负责封装待上报的TopicConfig。
    • doRegisterBrokerAll负责执行向namesrv上报的逻辑。

    namesrv#DefaultRequestProcessor

    public class DefaultRequestProcessor implements NettyRequestProcessor {
    
        public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
            final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
            final RegisterBrokerRequestHeader requestHeader =
                (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
    
            if (!checksum(ctx, request, requestHeader)) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("crc32 not match");
                return response;
            }
    
            RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
    
            if (request.getBody() != null) {
                try {
                    registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
                } catch (Exception e) {
                    throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
                }
            } else {
                registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
                registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
            }
    
            RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
                requestHeader.getClusterName(),
                requestHeader.getBrokerAddr(),
                requestHeader.getBrokerName(),
                requestHeader.getBrokerId(),
                requestHeader.getHaServerAddr(),
                registerBrokerBody.getTopicConfigSerializeWrapper(),
                registerBrokerBody.getFilterServerList(),
                ctx.channel());
    
            responseHeader.setHaServerAddr(result.getHaServerAddr());
            responseHeader.setMasterAddr(result.getMasterAddr());
    
            byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
            response.setBody(jsonValue);
    
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
    }
    
    • namesrv的DefaultRequestProcessor负责处理broker上报的TopicConfig信息。
    • RouteInfoManager#registerBroker负责保存Topic的配置信息。

    producer/consumer定时拉取路由信息

    • producer/consumer负责定时向namesrv同步路由信息。

    updateTopicRouteInfoFromNameServer

    public class MQClientInstance {
    
        private void startScheduledTask() {
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                    } catch (Exception e) {
                        log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                    }
                }
            }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
        }
    
    
        public void updateTopicRouteInfoFromNameServer() {
            Set<String> topicList = new HashSet<String>();
    
            // Consumer
            {
                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<String, MQConsumerInner> entry = it.next();
                    MQConsumerInner impl = entry.getValue();
                    if (impl != null) {
                        Set<SubscriptionData> subList = impl.subscriptions();
                        if (subList != null) {
                            for (SubscriptionData subData : subList) {
                                topicList.add(subData.getTopic());
                            }
                        }
                    }
                }
            }
    
            // Producer
            {
                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<String, MQProducerInner> entry = it.next();
                    MQProducerInner impl = entry.getValue();
                    if (impl != null) {
                        Set<String> lst = impl.getPublishTopicList();
                        topicList.addAll(lst);
                    }
                }
            }
    
            for (String topic : topicList) {
                this.updateTopicRouteInfoFromNameServer(topic);
            }
        }
    
        public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
            DefaultMQProducer defaultMQProducer) {
            try {
                if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                    try {
                        TopicRouteData topicRouteData;
                        if (isDefault && defaultMQProducer != null) {
                            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                                1000 * 3);
                            if (topicRouteData != null) {
                                for (QueueData data : topicRouteData.getQueueDatas()) {
                                    int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                    data.setReadQueueNums(queueNums);
                                    data.setWriteQueueNums(queueNums);
                                }
                            }
                        } else {
                            // 定时同步路由信息会走这个分支
                            topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                        }
    
                        if (topicRouteData != null) {
                          // 省略更新本地的路由信息的逻辑
                        } else {
                        }
                    } catch (MQClientException e) {
                    } catch (RemotingException e) {
                    } finally {
                        this.lockNamesrv.unlock();
                    }
                } else {
                }
            } catch (InterruptedException e) {
            }
    
            return false;
        }
    }
    
    • updateTopicRouteInfoFromNameServer负责从namesrv同步路由信息。
    • 待同步的Topic信息来自consumer的订阅的topic或者producer发送消息的topic。
    • getTopicRouteInfoFromNameServer负责向namsrv同步路由信息。

    getTopicRouteInfoFromNameServer

    public class MQClientAPIImpl {
        public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
            boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
            GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
            requestHeader.setTopic(topic);
    
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
    
            RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.TOPIC_NOT_EXIST: {
                    if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                        log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                    }
    
                    break;
                }
                case ResponseCode.SUCCESS: {
                    byte[] body = response.getBody();
                    if (body != null) {
                        return TopicRouteData.decode(body, TopicRouteData.class);
                    }
                }
                default:
                    break;
            }
    
            throw new MQClientException(response.getCode(), response.getRemark());
        }
    }
    
    • 同步路由的requestCode为GET_ROUTEINTO_BY_TOPIC。

    DefaultRequestProcessor

    public class DefaultRequestProcessor implements NettyRequestProcessor {
        public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
    
            final RemotingCommand response = RemotingCommand.createResponseCommand(null);
            final GetRouteInfoRequestHeader requestHeader =
                (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
    
            TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
    
            if (topicRouteData != null) {
                if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                    String orderTopicConf =
                        this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                            requestHeader.getTopic());
                    topicRouteData.setOrderTopicConf(orderTopicConf);
                }
    
                byte[] content = topicRouteData.encode();
                response.setBody(content);
                response.setCode(ResponseCode.SUCCESS);
                response.setRemark(null);
                return response;
            }
    
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
                + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
            return response;
        }
    }
    
    • nameserv负责从RouteInfoManager获取topic对应路由信息。

    相关文章

      网友评论

        本文标题:RocketMq 心跳报文

        本文链接:https://www.haomeiwen.com/subject/lsubfktx.html