美文网首页程序员java 成神之路
RocketMQ 命令行工具源码结构解析

RocketMQ 命令行工具源码结构解析

作者: jijs | 来源:发表于2018-12-01 00:36 被阅读214次

概述

RocketMQ 提供有控制台及一系列控制台命令,用于管理员对主题,集群,broker 等信息的管理;

进入 RocketMQ 的bin 目录,可以看到 mqadmin 脚本文件。


执行 mqadmin 脚本显示如下:


显示了 mqadmin 命令支持的所有操作。

如果想具体查新某一个操作的详细命令,可以使用

mqadmin help 命令名称
比如:mqadmin help updateTopic

查看 mqadmin脚本

可以发现 mqadmin 的命令调用的是 tools 命令,设置的启动类为 org.apache.rocketmq.tools.command.MQAdminStartup 。

tools 模块结构

MQAdminStartup 启动类

public static void main(String[] args) {
    main0(args, null);
}

public static void main0(String[] args, RPCHook rpcHook) {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

    //PackageConflictDetect.detectFastjson();

    initCommand();

    try {
        initLogback();
        switch (args.length) {
            case 0:
                printHelp();
                break;
            case 2:
                if (args[0].equals("help")) {
                    SubCommand cmd = findSubCommand(args[1]);
                    if (cmd != null) {
                        Options options = ServerUtil.buildCommandlineOptions(new Options());
                        options = cmd.buildCommandlineOptions(options);
                        if (options != null) {
                            ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options);
                        }
                    } else {
                        System.out.printf("The sub command %s not exist.%n", args[1]);
                    }
                    break;
                }
            case 1:
            default:
                SubCommand cmd = findSubCommand(args[0]);
                if (cmd != null) {
                    String[] subargs = parseSubArgs(args);

                    Options options = ServerUtil.buildCommandlineOptions(new Options());
                    final CommandLine commandLine =
                        ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
                            new PosixParser());
                    if (null == commandLine) {
                        return;
                    }

                    if (commandLine.hasOption('n')) {
                        String namesrvAddr = commandLine.getOptionValue('n');
                        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
                    }

                    cmd.execute(commandLine, options, rpcHook);
                } else {
                    System.out.printf("The sub command %s not exist.%n", args[0]);
               }
               break;
        }
    } catch (Exception e) {
         e.printStackTrace();
    }
}

1、首先调用initCommand() 方法加载所有的命令。
2、初始化日志
3、判断启动该类main 方法传入的参数。

  • 3.1 如果没有参数,则打印帮助信息。
  • 3.2 如果参数为2个,并且第一个是 help,第二个参数是initCommand() 加载的命令名称,则调用 ServerUtil.printCommandLineHelp() 方法打印指定命令的帮助信息。
  • 3.3 如果参赛为一个、或2个,并且第一个参数不为 help,或多个。并且第一个参赛为 initCommand() 加载的命令,则调用 该initCommand() 加载类中的 execute() 方法。
 cmd.execute(commandLine, options, rpcHook);

initCommand() 方法

public static void initCommand() {
    initCommand(new UpdateTopicSubCommand());
    initCommand(new DeleteTopicSubCommand());
    initCommand(new UpdateSubGroupSubCommand());
    initCommand(new DeleteSubscriptionGroupCommand());
    initCommand(new UpdateBrokerConfigSubCommand());
    initCommand(new UpdateTopicPermSubCommand());

    initCommand(new TopicRouteSubCommand());
    initCommand(new TopicStatusSubCommand());
    initCommand(new TopicClusterSubCommand());

    initCommand(new BrokerStatusSubCommand());
    initCommand(new QueryMsgByIdSubCommand());
    initCommand(new QueryMsgByKeySubCommand());
    initCommand(new QueryMsgByUniqueKeySubCommand());
    initCommand(new QueryMsgByOffsetSubCommand());
        
    initCommand(new PrintMessageSubCommand());
    initCommand(new PrintMessageByQueueCommand());
    initCommand(new SendMsgStatusCommand());
    initCommand(new BrokerConsumeStatsSubCommad());

    initCommand(new ProducerConnectionSubCommand());
    initCommand(new ConsumerConnectionSubCommand());
    initCommand(new ConsumerProgressSubCommand());
    initCommand(new ConsumerStatusSubCommand());     
    initCommand(new CloneGroupOffsetCommand());

    initCommand(new ClusterListSubCommand());
    initCommand(new TopicListSubCommand());

    initCommand(new UpdateKvConfigCommand());
    initCommand(new DeleteKvConfigCommand());

    initCommand(new WipeWritePermSubCommand());
    initCommand(new ResetOffsetByTimeCommand());

    initCommand(new UpdateOrderConfCommand());
    initCommand(new CleanExpiredCQSubCommand());
    initCommand(new CleanUnusedTopicCommand());

    initCommand(new StartMonitoringSubCommand());
    initCommand(new StatsAllSubCommand());

    initCommand(new AllocateMQSubCommand());

    initCommand(new CheckMsgSendRTCommand());
    initCommand(new CLusterSendMsgRTCommand());

    initCommand(new GetNamesrvConfigCommand());
    initCommand(new UpdateNamesrvConfigCommand());
    initCommand(new GetBrokerConfigCommand());

    initCommand(new QueryConsumeQueueCommand());
    initCommand(new SendMessageCommand());
    initCommand(new ConsumeMessageCommand());
}

丛类名中可以看出跟上面控制台 执行 mqadmin 指令输出命令的名字和这里的类名可以一一对应上。

initCommand 方法

protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();

public static void initCommand(SubCommand command) {
    subCommandList.add(command);
}

把 init 加载到一个List集合中。

SubCommand 接口定义

所有的操作命令都实现了 SubCommand 接口

public interface SubCommand {
    String commandName();
    String commandDesc();
    Options buildCommandlineOptions(final Options options);
    void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException;
}

1、commandName() 命令名称
2、commandDesc()命令描述
3、buildCommandlineOptions() 构建命令解析器
4、execute() 执行命令

创建 Topic 源码分析

下面我们以创建 Topic 命令来分析实现原理。
updateTopic 命令是创建Topic的命令。


通过该命令可以查看 updateTopic 支持那么多参数。
下面我们来分析下 UpdateTopicPermSubCommand 类的实现

UpdateTopicPermSubCommand 解析

commandName()
@Override
public String commandName() {
    return "updateTopic";
}

命令名称

commandDesc()
@Override
public String commandDesc() {
    return "Update or create topic";
}

命令描述

buildCommandlineOptions()
@Override
public Options buildCommandlineOptions(Options options) {
    Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("c", "clusterName", true, "create topic to which cluster");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("t", "topic", true, "topic name");
    opt.setRequired(true);
    options.addOption(opt);

    opt = new Option("r", "readQueueNums", true, "set read queue nums");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("w", "writeQueueNums", true, "set write queue nums");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("o", "order", true, "set topic's order(true|false)");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("u", "unit", true, "is unit topic (true|false)");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("s", "hasUnitSub", true, "has unit sub (true|false)");
    opt.setRequired(false);
    options.addOption(opt);

    return options;
}

从该方法中可以看到定义的命令及其说明。

execute() 方法
@Override
public void execute(final CommandLine commandLine, final Options options,
    RPCHook rpcHook) throws SubCommandException {
    DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
    defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));

    try {
        TopicConfig topicConfig = new TopicConfig();
        topicConfig.setReadQueueNums(8);
        topicConfig.setWriteQueueNums(8);
        topicConfig.setTopicName(commandLine.getOptionValue('t').trim());

        // readQueueNums
        if (commandLine.hasOption('r')) {
            topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
        }

        // writeQueueNums
        if (commandLine.hasOption('w')) {
            topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
        }

        // perm
        if (commandLine.hasOption('p')) {
            topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
        }

        boolean isUnit = false;
        if (commandLine.hasOption('u')) {
            isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
        }

        boolean isCenterSync = false;
        if (commandLine.hasOption('s')) {
            isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
        }

        int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync);
        topicConfig.setTopicSysFlag(topicCenterSync);

        boolean isOrder = false;
        if (commandLine.hasOption('o')) {
            isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
        }
        topicConfig.setOrder(isOrder);

        if (commandLine.hasOption('b')) {
            String addr = commandLine.getOptionValue('b').trim();

            defaultMQAdminExt.start();
            defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);

            if (isOrder) {
                String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
                String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
                defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
                System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",
                    isOrder, orderConf.toString()));
            }
            System.out.printf("create topic to %s success.%n", addr);
            System.out.printf("%s", topicConfig);
            return;

        } else if (commandLine.hasOption('c')) {
            String clusterName = commandLine.getOptionValue('c').trim();

            defaultMQAdminExt.start();

            Set<String> masterSet =
                CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
            for (String addr : masterSet) {
                defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
                System.out.printf("create topic to %s success.%n", addr);
            }

            if (isOrder) {
                Set<String> brokerNameSet =
                    CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
                StringBuilder orderConf = new StringBuilder();
                String splitor = "";
                for (String s : brokerNameSet) {
                    orderConf.append(splitor).append(s).append(":")
                        .append(topicConfig.getWriteQueueNums());
                    splitor = ";";
                }
                defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
                    orderConf.toString(), true);
                System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf);
            }

            System.out.printf("%s", topicConfig);
            return;
        }

        ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
    } catch (Exception e) {
        throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
    } finally {
        defaultMQAdminExt.shutdown();
    }
}

从上面代码中可以看出,很大一部分代码都是解析 commandLine 参数。
解析出来的参数来填充 TopicConfig 对象。
然后调用 DefaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig) 方法来创建 Topic。

从上面的代码中可以看出 -b 和 -c 参数只能有一个生效。
-b 参数是在指定的 broker 上创建 topic
-c 是在指定的集群上每一个 broker 创建 topic。

优先判断的是 -b 参数,如果指定 -b 参数就会在指定的 broker 上创建,而不会在 -c 指定的集群上创建。

其它的 SubCommand 命令的实现方式都一样,就不一一解析了。

相关文章

网友评论

    本文标题:RocketMQ 命令行工具源码结构解析

    本文链接:https://www.haomeiwen.com/subject/ywnkcqtx.html