美文网首页rocketMq理论与实践
RocketMq Topic创建和删除

RocketMq Topic创建和删除

作者: 晴天哥_王志 | 来源:发表于2020-06-30 00:24 被阅读0次

    系列

    开篇

    • 这个系列主要用以分析mqadmin常见的比较核心的几个命令,主要包括订阅分组和topic的创建和删除、Topic的权限变更。
    • 这篇文章主要是用来分析Topic的创建和删除。

    创建Topic

    Topic创建的核心步骤如下

    • 1、mqadmin向broker发起创建Topic的命令。
    • 2、broker生成Topic对应的topicConfig配置保存在broker的TopicConfigManager中。
    • 3、broker向所有的namesrv上报topicConfig信息。
    • 4、namesrv的RouteInfoManager的topicQueueTable保存topic的QueueData信息。
    • 5、broker会通过定时任务定期向namesrv发送心跳信息更新topic配置。

    updateTopic

    usage: mqadmin updateTopic -b <arg> | -c <arg>  [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>] -t
           <arg> [-u <arg>] [-w <arg>]
     -b,--brokerAddr <arg>       create topic to which broker
     -c,--clusterName <arg>      create topic to which cluster
     -h,--help                   Print help
     -n,--namesrvAddr <arg>      Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
     -o,--order <arg>            set topic's order(true|false)
     -p,--perm <arg>             set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]
     -r,--readQueueNums <arg>    set read queue nums
     -s,--hasUnitSub <arg>       has unit sub (true|false)
     -t,--topic <arg>            topic name
     -u,--unit <arg>             is unit topic (true|false)
     -w,--writeQueueNums <arg>   set write queue nums
    
    • 通过 --brokerAddr在指定的broker创建topic。
    • 通过 --clusterName在整个集群创建topic。
    • 通过 --namesrvAddr指定namesrv地址。
    • 通过 --topic来指定topic名称。
    • 通过 --perm来指定Topic的权限管理。

    UpdateTopicSubCommand

    public class UpdateTopicSubCommand implements SubCommand {
    
        @Override
        public void execute(final CommandLine commandLine, final Options options,
            RPCHook rpcHook) throws SubCommandException {
            DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
            defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
    
            try {
                // 默认的topic的读写队列为8个
                TopicConfig topicConfig = new TopicConfig();
                topicConfig.setReadQueueNums(8);
                topicConfig.setWriteQueueNums(8);
                topicConfig.setTopicName(commandLine.getOptionValue('t').trim());
    
                // readQueueNums
                if (commandLine.hasOption('r')) {
                    topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
                }
    
                // writeQueueNums
                if (commandLine.hasOption('w')) {
                    topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
                }
    
                // perm
                if (commandLine.hasOption('p')) {
                    topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
                }
    
                boolean isUnit = false;
                if (commandLine.hasOption('u')) {
                    isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
                }
    
                boolean isCenterSync = false;
                if (commandLine.hasOption('s')) {
                    isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
                }
    
                int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync);
                topicConfig.setTopicSysFlag(topicCenterSync);
    
                boolean isOrder = false;
                if (commandLine.hasOption('o')) {
                    isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
                }
                topicConfig.setOrder(isOrder);
    
                if (commandLine.hasOption('b')) {
                    String addr = commandLine.getOptionValue('b').trim();
    
                    defaultMQAdminExt.start();
                    defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
    
                    if (isOrder) {
                        String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
                        String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
                        defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
                        System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",
                            isOrder, orderConf.toString()));
                    }
                    System.out.printf("create topic to %s success.%n", addr);
                    System.out.printf("%s", topicConfig);
                    return;
    
                } else if (commandLine.hasOption('c')) {
                    String clusterName = commandLine.getOptionValue('c').trim();
    
                    defaultMQAdminExt.start();
    
                    Set<String> masterSet =
                        CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
                    for (String addr : masterSet) {
                        defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
                        System.out.printf("create topic to %s success.%n", addr);
                    }
    
                    if (isOrder) {
                        Set<String> brokerNameSet =
                            CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
                        StringBuilder orderConf = new StringBuilder();
                        String splitor = "";
                        for (String s : brokerNameSet) {
                            orderConf.append(splitor).append(s).append(":")
                                .append(topicConfig.getWriteQueueNums());
                            splitor = ";";
                        }
                        defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
                            orderConf.toString(), true);
                        System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf);
                    }
    
                    System.out.printf("%s", topicConfig);
                    return;
                }
    
                ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
            } catch (Exception e) {
                throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
            } finally {
                defaultMQAdminExt.shutdown();
            }
        }
    }
    
    • Topic的默认的读写队列为8.
    • 针对指定broker的场景,只在指定的broker机器创建Topic。
    • 针对指定cluster的场景,获取集群下的所有broker并向全部的broker机器创建Topic。

    mqadmin MQClientAPIImpl

    public class MQClientAPIImpl {
    
        public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
            final long timeoutMillis)
            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
            CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
            requestHeader.setTopic(topicConfig.getTopicName());
            requestHeader.setDefaultTopic(defaultTopic);
            requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
            requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
            requestHeader.setPerm(topicConfig.getPerm());
            requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
            requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
            requestHeader.setOrder(topicConfig.isOrder());
    
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
    
            RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
                request, timeoutMillis);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    return;
                }
                default:
                    break;
            }
    
            throw new MQClientException(response.getCode(), response.getRemark());
        }
    }
    
    • 创建Topic的RequestCode为UPDATE_AND_CREATE_TOPIC。

    broker AdminBrokerProcessor

    public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    
        private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
    
            final RemotingCommand response = RemotingCommand.createResponseCommand(null);
            final CreateTopicRequestHeader requestHeader =
                (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
    
            if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
                String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
                log.warn(errorMsg);
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(errorMsg);
                return response;
            }
    
            if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
                return response;
            }
    
            try {
                response.setCode(ResponseCode.SUCCESS);
                response.setOpaque(request.getOpaque());
                response.markResponseType();
                response.setRemark(null);
                ctx.writeAndFlush(response);
            } catch (Exception e) {
                log.error("Failed to produce a proper response", e);
            }
            // TopicConfig的创建配置
            TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());
            topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
            topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
            topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
            topicConfig.setPerm(requestHeader.getPerm());
            topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
            // broker保存Topic的配置信息
            this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
            // broker负责向namesrv上报topic信息。
            this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
    
            return null;
        }
    }
    
    • 创建TopicConfig对象并通过TopicConfigManager来进行保存。
    • 通过brokerController#registerIncrementBrokerData来上报topic信息到namesrv。

    broker TopicConfigManager

    public class TopicConfigManager extends ConfigManager {
    
        private static final long LOCK_TIMEOUT_MILLIS = 3000;
        private transient final Lock lockTopicConfigTable = new ReentrantLock();
    
        private final ConcurrentMap<String, TopicConfig> topicConfigTable =
            new ConcurrentHashMap<String, TopicConfig>(1024);
        private final DataVersion dataVersion = new DataVersion();
        private final Set<String> systemTopicList = new HashSet<String>();
        private transient BrokerController brokerController;
    
        public void updateTopicConfig(final TopicConfig topicConfig) {
            TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            if (old != null) {
                log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);
            } else {
                log.info("create new topic [{}]", topicConfig);
            }
    
            this.dataVersion.nextVersion();
    
            this.persist();
        }
    }
    
    • 通过topicConfigTable来保存Topic和对应的TopicConfig。

    broker BrokerController

    public class BrokerController {
    
        public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
    
            TopicConfig registerTopicConfig = topicConfig;
            // 如果broker本身的存在不可读和不可写的权限,那么就以broker的读写权限为准
            if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                registerTopicConfig =
                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                        this.brokerConfig.getBrokerPermission());
            }
    
            ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
            topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
            TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
            topicConfigSerializeWrapper.setDataVersion(dataVersion);
            topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
            // 将topic信息注册到namesrv的逻辑
            doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
        }
    
    
        private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
            TopicConfigSerializeWrapper topicConfigWrapper) {
            // 向所有的namesrv发送topic的注册信息
            List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.getHAServerAddr(),
                topicConfigWrapper,
                this.filterServerManager.buildNewFilterServerList(),
                oneway,
                this.brokerConfig.getRegisterBrokerTimeoutMills(),
                this.brokerConfig.isCompressedRegister());
    
            if (registerBrokerResultList.size() > 0) {
                RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
                if (registerBrokerResult != null) {
                    if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                        this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                    }
    
                    this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
    
                    if (checkOrderConfig) {
                        this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                    }
                }
            }
        }
    }
    
    • registerIncrementBrokerData首先会检查broker本身的读写权限重新生成新的topicConfigSerializeWrapper。
    • 通过brokerOuterAPI#registerBrokerAll向namesrv注册最新的topic信息。

    broker BrokerOuterAPI

    public class BrokerOuterAPI {
    
        private RegisterBrokerResult registerBroker(
            final String namesrvAddr,
            final boolean oneway,
            final int timeoutMills,
            final RegisterBrokerRequestHeader requestHeader,
            final byte[] body
        ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
            InterruptedException {
    
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
            request.setBody(body);
            // oneway=false,这个分支不会执行
            if (oneway) {
                try {
                    this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
                } catch (RemotingTooMuchRequestException e) {
                    // Ignore
                }
                return null;
            }
    
            // 向namesrv上报
            RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
            // 处理上报结果并同步主从
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    RegisterBrokerResponseHeader responseHeader =
                        (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                    RegisterBrokerResult result = new RegisterBrokerResult();
                    result.setMasterAddr(responseHeader.getMasterAddr());
                    result.setHaServerAddr(responseHeader.getHaServerAddr());
                    if (response.getBody() != null) {
                        result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                    }
                    return result;
                }
                default:
                    break;
            }
    
            throw new MQBrokerException(response.getCode(), response.getRemark());
        }
    }
    
    • 向namesrv注册topic的RequestCode为REGISTER_BROKER。
    • broker向namesrv发起topic注册。

    namesrv DefaultRequestProcessor

    public class DefaultRequestProcessor implements NettyRequestProcessor {
    
        public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
    
            final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
            final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
            final RegisterBrokerRequestHeader requestHeader =
                (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
    
            if (!checksum(ctx, request, requestHeader)) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("crc32 not match");
                return response;
            }
    
            RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
    
            if (request.getBody() != null) {
                try {
                    registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
                } catch (Exception e) {
                    throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
                }
            } else {
                registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
                registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
            }
            // namesrv同时注册broker信息和TopicConfig信息
            RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
                requestHeader.getClusterName(),
                requestHeader.getBrokerAddr(),
                requestHeader.getBrokerName(),
                requestHeader.getBrokerId(),
                requestHeader.getHaServerAddr(),
                registerBrokerBody.getTopicConfigSerializeWrapper(),
                registerBrokerBody.getFilterServerList(),
                ctx.channel());
    
            responseHeader.setHaServerAddr(result.getHaServerAddr());
            responseHeader.setMasterAddr(result.getMasterAddr());
    
            byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
            response.setBody(jsonValue);
    
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
    }
    
    • getRouteInfoManager().registerBroker()负责向namesrv注册broker信息。

    namesrv RouteInfoManager

    public class RouteInfoManager {
    
        private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    
        public RouteInfoManager() {
            this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
            this.brokerAddrTable = new HashMap<String, BrokerData>(128);
            this.clusterAddrTable = new HashMap<String, Set<String>>(32);
            this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
            this.filterServerTable = new HashMap<String, List<String>>(256);
        }
    
        public RegisterBrokerResult registerBroker(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final Channel channel) {
            RegisterBrokerResult result = new RegisterBrokerResult();
    
            try {
                try {
                    this.lock.writeLock().lockInterruptibly();
    
                    Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                    if (null == brokerNames) {
                        brokerNames = new HashSet<String>();
                        this.clusterAddrTable.put(clusterName, brokerNames);
                    }
                    brokerNames.add(brokerName);
    
                    boolean registerFirst = false;
    
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null == brokerData) {
                        registerFirst = true;
                        brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                        this.brokerAddrTable.put(brokerName, brokerData);
                    }
                    Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                    //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                    //The same IP:PORT must only have one record in brokerAddrTable
                    Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<Long, String> item = it.next();
                        if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                            it.remove();
                        }
                    }
    
                    String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                    registerFirst = registerFirst || (null == oldAddr);
    
                    // 保存topicConfig的信息
                    if (null != topicConfigWrapper
                        && MixAll.MASTER_ID == brokerId) {
                        if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                            || registerFirst) {
                            ConcurrentMap<String, TopicConfig> tcTable =
                                topicConfigWrapper.getTopicConfigTable();
                            if (tcTable != null) {
                                for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                    this.createAndUpdateQueueData(brokerName, entry.getValue());
                                }
                            }
                        }
                    }
    
                    BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                        new BrokerLiveInfo(
                            System.currentTimeMillis(),
                            topicConfigWrapper.getDataVersion(),
                            channel,
                            haServerAddr));
                    if (null == prevBrokerLiveInfo) {
                        log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                    }
    
                    if (filterServerList != null) {
                        if (filterServerList.isEmpty()) {
                            this.filterServerTable.remove(brokerAddr);
                        } else {
                            this.filterServerTable.put(brokerAddr, filterServerList);
                        }
                    }
    
                    if (MixAll.MASTER_ID != brokerId) {
                        String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                        if (masterAddr != null) {
                            BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                            if (brokerLiveInfo != null) {
                                result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                                result.setMasterAddr(masterAddr);
                            }
                        }
                    }
                } finally {
                    this.lock.writeLock().unlock();
                }
            } catch (Exception e) {
                log.error("registerBroker Exception", e);
            }
    
            return result;
        }
    }
    
    • RouteInfoManager是namesrv的核心数据管理中心,broker上报Topic的过程中会同时更新broker的保活信息等。
    • 核心的createAndUpdateQueueData负责注册topic配置信息。

    namesrv createAndUpdateQueueData

    public class RouteInfoManager {
    
        private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
    
            QueueData queueData = new QueueData();
            queueData.setBrokerName(brokerName);
            queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
            queueData.setReadQueueNums(topicConfig.getReadQueueNums());
            queueData.setPerm(topicConfig.getPerm());
            queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
    
            List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
            if (null == queueDataList) {
                queueDataList = new LinkedList<QueueData>();
                queueDataList.add(queueData);
                this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
                log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
            } else {
                boolean addNewOne = true;
    
                Iterator<QueueData> it = queueDataList.iterator();
                while (it.hasNext()) {
                    QueueData qd = it.next();
                    if (qd.getBrokerName().equals(brokerName)) {
                        if (qd.equals(queueData)) {
                            addNewOne = false;
                        } else {
                            log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
                                queueData);
                            it.remove();
                        }
                    }
                }
    
                if (addNewOne) {
                    queueDataList.add(queueData);
                }
            }
        }
    }
    
    • 负责创建QueueData并保存到topicQueueTable当中。
    • createAndUpdateQueueData会处理所有broker的上报,所以在处理QueueData过程中需要判断brokerName以处理对应的broker的QueueData。

    定时上报 BrokerController

    public class BrokerController {
    
        public void start() throws Exception {
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    }
    
    • broker会定时向namesrv上报broker的信息并且包括topic的注册信息。

    删除Topic

    删除Topic的核心逻辑如下

    • 1、mqadmin负责通知所有的broker删除topic对应的配置和消息文件。
    • 2、mqadmin负责通知所有的namesrv删除topic对应的配置。

    deleteTopic

    usage: mqadmin deleteTopic -c <arg> [-h] [-n <arg>] -t <arg>
     -c,--clusterName <arg>   delete topic from which cluster
     -h,--help                Print help
     -n,--namesrvAddr <arg>   Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
     -t,--topic <arg>         topic name
    
    • --topic指定待删除的topic信息。
    • --clusterName指定待删除Topic所在的集群信息。

    mqadmin DeleteTopicSubCommand

    public class DeleteTopicSubCommand implements SubCommand {
    
        public static void deleteTopic(final DefaultMQAdminExt adminExt,
            final String clusterName,
            final String topic
        ) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
    
            Set<String> brokerAddressSet = CommandUtil.fetchMasterAndSlaveAddrByClusterName(adminExt, clusterName);
            adminExt.deleteTopicInBroker(brokerAddressSet, topic);
            System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName);
    
            Set<String> nameServerSet = null;
            if (adminExt.getNamesrvAddr() != null) {
                String[] ns = adminExt.getNamesrvAddr().trim().split(";");
                nameServerSet = new HashSet(Arrays.asList(ns));
            }
    
            adminExt.deleteTopicInNameServer(nameServerSet, topic);
            System.out.printf("delete topic [%s] from NameServer success.%n", topic);
        }
    
        @Override
        public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
            DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
            adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
            try {
                String topic = commandLine.getOptionValue('t').trim();
    
                if (commandLine.hasOption('c')) {
                    String clusterName = commandLine.getOptionValue('c').trim();
    
                    adminExt.start();
                    deleteTopic(adminExt, clusterName, topic);
                    return;
                }
    
                ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
            } catch (Exception e) {
                throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
            } finally {
                adminExt.shutdown();
            }
        }
    }
    
    • deleteTopicInBroker负责当前集群的所有broker上的Topic信息。
    • deleteTopicInNameServer负责删除namesrv中的topic信息

    mqadmin DefaultMQAdminExtImpl

    public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
    
        public void deleteTopicInBroker(Set<String> addrs,
            String topic) throws RemotingException, MQBrokerException, InterruptedException,
            MQClientException {
            for (String addr : addrs) {
                this.mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, timeoutMillis);
            }
        }
    }
    
    • 遍历所有broker依次执行deleteTopicInBroker操作。

    mqadmin MQClientAPIImpl

    public class MQClientAPIImpl {
    
        public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis)
            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
            DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
            requestHeader.setTopic(topic);
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader);
    
            RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
                request, timeoutMillis);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    return;
                }
                default:
                    break;
            }
    
            throw new MQClientException(response.getCode(), response.getRemark());
        }
    }
    
    • deleteTopicInBroker的RequestCode为DELETE_TOPIC_IN_BROKER。

    broker AdminBrokerProcessor

    public class AdminBrokerProcessor implements NettyRequestProcessor {
    
        private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            final RemotingCommand response = RemotingCommand.createResponseCommand(null);
            DeleteTopicRequestHeader requestHeader =
                (DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
    
            log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            // 删除配置信息
            this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
            // 删除消息存储文件
            this.brokerController.getMessageStore()
                .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
    
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
    }
    
    public class TopicConfigManager extends ConfigManager {
    
        public void deleteTopicConfig(final String topic) {
            // 从topicConfigTable中删除topic的配置信息
            TopicConfig old = this.topicConfigTable.remove(topic);
            if (old != null) {
                log.info("delete topic config OK, topic: {}", old);
                this.dataVersion.nextVersion();
                this.persist();
            } else {
                log.warn("delete topic config failed, topic: {} not exists", topic);
            }
        }
    
    • deleteTopic操作包括删除broker上topic的配置信息和topic对应的消息存储文件。

    mqadmin DefaultMQAdminExtImpl

    public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
    
        @Override
        public void deleteTopicInNameServer(Set<String> addrs,
            String topic) throws RemotingException, MQBrokerException, InterruptedException,
            MQClientException {
            if (addrs == null) {
                String ns = this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr();
                addrs = new HashSet(Arrays.asList(ns.split(";")));
            }
            for (String addr : addrs) {
                this.mqClientInstance.getMQClientAPIImpl().deleteTopicInNameServer(addr, topic, timeoutMillis);
            }
        }
    }
    
    
    public class MQClientAPIImpl {
    
        public void deleteTopicInNameServer(final String addr, final String topic, final long timeoutMillis)
            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
            DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
            requestHeader.setTopic(topic);
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, requestHeader);
    
            RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    return;
                }
                default:
                    break;
            }
    
            throw new MQClientException(response.getCode(), response.getRemark());
        }
    }
    
    • 遍历所有的namesrv并依次执行topic的删除。
    • requestCode为DELETE_TOPIC_IN_NAMESRV。

    namesrv DefaultRequestProcessor

    public class DefaultRequestProcessor implements NettyRequestProcessor {
    
        private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            final RemotingCommand response = RemotingCommand.createResponseCommand(null);
            final DeleteTopicInNamesrvRequestHeader requestHeader =
                (DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);
    
            this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic());
    
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
    }
    
    
    public class RouteInfoManager {
    
        public void deleteTopic(final String topic) {
            try {
                try {
                    this.lock.writeLock().lockInterruptibly();
                    this.topicQueueTable.remove(topic);
                } finally {
                    this.lock.writeLock().unlock();
                }
            } catch (Exception e) {
                log.error("deleteTopic Exception", e);
            }
        }
    }
    
    • namesrv的topic删除负责从RouteInfoManager移除topic对应的QueueData。

    相关文章

      网友评论

        本文标题:RocketMq Topic创建和删除

        本文链接:https://www.haomeiwen.com/subject/jzpbfktx.html