RocketMq Client管理

作者: 晴天哥_王志 | 来源:发表于2020-05-20 19:06 被阅读0次

    系列

    开篇

    • 这篇文章主要分析RocketMq针对Client的管理,Client包括consumer和producer两类。
    • consumer的管理主要包括consumer的注册、clientId的变更、consumer的rebalance。
    • consumer通过ConsumerManager来管理consumer。
    • producer的管理主要是负责producer的注册。
    • producer通过ProducerManager来管理producer。
    • 核心点在于producer和consumer通过心跳数据来向broker完成注册动作,通过定时任务ClientHousekeepingService来下线断开连接的client。

    ConsumerManager

    public class ConsumerManager {
    
        private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
        private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
            new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
    
        public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
            ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
            final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
            // 1、针对原来不存在的consumerGroupInfo会添加对应的consumerGroupInfo
            ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
            if (null == consumerGroupInfo) {
                ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
                ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
                consumerGroupInfo = prev != null ? prev : tmp;
            }
    
            boolean r1 =
                consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                    consumeFromWhere);
            boolean r2 = consumerGroupInfo.updateSubscription(subList);
            // 针对consumerId变更事件会执行通知操作
            if (r1 || r2) {
                if (isNotifyConsumerIdsChangedEnable) {
                    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
                }
            }
    
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
    
            return r1 || r2;
        }
    
        // scanNotActiveChannel负责删除已经下线的client对应的channel对象
        public void scanNotActiveChannel() {
            Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, ConsumerGroupInfo> next = it.next();
                String group = next.getKey();
                ConsumerGroupInfo consumerGroupInfo = next.getValue();
                ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
                    consumerGroupInfo.getChannelInfoTable();
    
                Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
                while (itChannel.hasNext()) {
                    Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
                    ClientChannelInfo clientChannelInfo = nextChannel.getValue();
                    long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
                    if (diff > CHANNEL_EXPIRED_TIMEOUT) {
                        log.warn(
                            "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
                            RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
                        RemotingUtil.closeChannel(clientChannelInfo.getChannel());
                        itChannel.remove();
                    }
                }
            }
        }
    }
    
    • ConsumerManager的consumerTable来保存consumeGroup和对应的信息。
    • registerConsumer方法负责注册consumer信息,针对consumer本身的id变更会通知所有的consumer重新立即执行rebalance。
    • scanNotActiveChannel负责扫描断开连接的consumer并从consumerGroupInfo中删除。
    public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
        private final BrokerController brokerController;
    
        public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
            this.brokerController = brokerController;
        }
    
        @Override
        public void handle(ConsumerGroupEvent event, String group, Object... args) {
            if (event == null) {
                return;
            }
            switch (event) {
                case CHANGE:
                    if (args == null || args.length < 1) {
                        return;
                    }
                    // 针对CHANGE事件会通知该consumerGroup下的所有client进行rebalance
                    List<Channel> channels = (List<Channel>) args[0];
                    if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
                        for (Channel chl : channels) {
                            this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
                        }
                    }
                    break;
                case UNREGISTER:
                    this.brokerController.getConsumerFilterManager().unRegister(group);
                    break;
                case REGISTER:
                    if (args == null || args.length < 1) {
                        return;
                    }
                    Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
                    this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
                    break;
                default:
                    throw new RuntimeException("Unknown event " + event);
            }
        }
    }
    
    • 针对consumer的clientId发生变更的场景,会通知该consumerGroup下的所有client立即进行rebalance动作。
    public class ClientRemotingProcessor implements NettyRequestProcessor {
        private final InternalLogger log = ClientLogger.getLog();
        private final MQClientInstance mqClientFactory;
    
        public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
            this.mqClientFactory = mqClientFactory;
        }
    
        @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            switch (request.getCode()) {
                case RequestCode.CHECK_TRANSACTION_STATE:
                    return this.checkTransactionState(ctx, request);
                case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
                    return this.notifyConsumerIdsChanged(ctx, request);
                case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
                    return this.resetOffset(ctx, request);
                case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
                    return this.getConsumeStatus(ctx, request);
    
                case RequestCode.GET_CONSUMER_RUNNING_INFO:
                    return this.getConsumerRunningInfo(ctx, request);
    
                case RequestCode.CONSUME_MESSAGE_DIRECTLY:
                    return this.consumeMessageDirectly(ctx, request);
    
                case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:
                    return this.receiveReplyMessage(ctx, request);
                default:
                    break;
            }
            return null;
        }
    
        public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            try {
                final NotifyConsumerIdsChangedRequestHeader requestHeader =
                    (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
    
                this.mqClientFactory.rebalanceImmediately();
            } catch (Exception e) {
                log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
            }
            return null;
        }
    }
    
    • ClientRemotingProcessor针对clientId变更事件会通过rebalanceImmediately立即执行rebalance。

    ProducerManager

    public class ProducerManager {
     
        private final Lock groupChannelLock = new ReentrantLock();
        private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
            new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
        private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
        private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
    
        public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
            try {
                ClientChannelInfo clientChannelInfoFound = null;
    
                if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                    try {
                        HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
                        if (null == channelTable) {
                            channelTable = new HashMap<>();
                            this.groupChannelTable.put(group, channelTable);
                        }
    
                        clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
                        if (null == clientChannelInfoFound) {
                            channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
                            clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
                        }
                    } finally {
                        this.groupChannelLock.unlock();
                    }
    
                    if (clientChannelInfoFound != null) {
                        clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
                    }
                } 
            } catch (InterruptedException e) {
            }
        }
    
    
        public void scanNotActiveChannel() {
            try {
                if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                    try {
                        for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
                            .entrySet()) {
                            final String group = entry.getKey();
                            final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
    
                            Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
                            while (it.hasNext()) {
                                Entry<Channel, ClientChannelInfo> item = it.next();
                                // final Integer id = item.getKey();
                                final ClientChannelInfo info = item.getValue();
    
                                long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
                                if (diff > CHANNEL_EXPIRED_TIMEOUT) {
                                    it.remove();
                                    clientChannelTable.remove(info.getClientId());
    
                                    RemotingUtil.closeChannel(info.getChannel());
                                }
                            }
                        }
                    } finally {
                        this.groupChannelLock.unlock();
                    }
                } 
            } catch (InterruptedException e) {
            }
        }
    }
    
    • groupChannelTable负责保存producerGroup和对应的ClientChannelInfo。
    • registerProducer负责注册producer到producerGroup当中。
    • scanNotActiveChannel负责扫描已下线的producer并从producerGroup删除。

    ClientHousekeepingService

    public class ClientHousekeepingService implements ChannelEventListener {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        private final BrokerController brokerController;
    
        private ScheduledExecutorService scheduledExecutorService = Executors
            .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
    
        public ClientHousekeepingService(final BrokerController brokerController) {
            this.brokerController = brokerController;
        }
    
        public void start() {
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        ClientHousekeepingService.this.scanExceptionChannel();
                    } catch (Throwable e) {
                        log.error("Error occurred when scan not active client channels.", e);
                    }
                }
            }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
        }
    
        private void scanExceptionChannel() {
            this.brokerController.getProducerManager().scanNotActiveChannel();
            this.brokerController.getConsumerManager().scanNotActiveChannel();
            this.brokerController.getFilterServerManager().scanNotActiveChannel();
        }
    }
    
    • ClientHousekeepingService负责定时扫描异常的client,包括producer、consumer等。

    ClientManageProcessor

    public class ClientManageProcessor implements NettyRequestProcessor {
    
        public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
            RemotingCommand response = RemotingCommand.createResponseCommand(null);
            HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
            ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
                ctx.channel(),
                heartbeatData.getClientID(),
                request.getLanguage(),
                request.getVersion()
            );
            // 注册consumer的数据
            for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
                SubscriptionGroupConfig subscriptionGroupConfig =
                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                        data.getGroupName());
                boolean isNotifyConsumerIdsChangedEnable = true;
                if (null != subscriptionGroupConfig) {
                    isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
                    int topicSysFlag = 0;
                    if (data.isUnitMode()) {
                        topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
                    }
                    String newTopic = MixAll.getRetryTopic(data.getGroupName());
                    this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                        newTopic,
                        subscriptionGroupConfig.getRetryQueueNums(),
                        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
                }
    
                boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                    data.getGroupName(),
                    clientChannelInfo,
                    data.getConsumeType(),
                    data.getMessageModel(),
                    data.getConsumeFromWhere(),
                    data.getSubscriptionDataSet(),
                    isNotifyConsumerIdsChangedEnable
                );
    
            // 注册producer数据
            for (ProducerData data : heartbeatData.getProducerDataSet()) {
                this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
                    clientChannelInfo);
            }
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
    }
    
    • ClientManageProcessor内部通过heartBeat心跳包来实现producer和consumer的注册操作。

    相关文章

      网友评论

        本文标题:RocketMq Client管理

        本文链接:https://www.haomeiwen.com/subject/quvnohtx.html