美文网首页
RocketMQ在Java代码之中手动创建Topic

RocketMQ在Java代码之中手动创建Topic

作者: Zephyr006 | 来源:发表于2020-08-13 16:46 被阅读0次

    Rocketmq在Java代码之中手动创建Topic

    【原创,转载请注明出处】
    我的 【博客园主页】 【CSDN主页】 【简书主页】
    加V进Java交流群,备注Java交流:w1129574379

    ** 本文仅限RocketMQ 4.5.*版本,其他版本可能有区别,仅供参考 **
    本文仅限单 name server 的情况,nameserver集群的情况下不确定能否正常工作(原理都是一样的)

    参考信息源及相关类/方法

    额外的依赖

    需要添加额外的maven依赖,版本自选

    <!-- 额外的依赖 -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-tools</artifactId>
        <version>4.5.1</version>
    </dependency>
    <!--rocketmq与springboot集成所必须的依赖-->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.3</version>
    </dependency>
    

    使用

    • 在命令行中创建Topic时应使用updateTopic命令,其使用示例如下:

    ./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t tx-mq-TOPIC

    各参数含义如下:

    mqadmin updateTopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>]
    -t <arg> [-u <arg>] [-w <arg>]
    -b,--brokerAddr <arg> create topic to which broker
    -c,--clusterName <arg> create topic to which cluster
    -h,--help Print help

    -n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
    -o,--order <arg> set topic's order(true|false)
    -p,--perm <arg> set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]
    -r,--readQueueNums <arg> set read queue nums
    -s,--hasUnitSub <arg> has unit sub (true|false)
    -t,--topic <arg> topic name
    -u,--unit <arg> is unit topic (true|false)
    -w,--writeQueueNums <arg> set write queue nums

    • 注意事项及存在的问题:

    1. RocketMQ规定,在使用updateTopic命令创建topic时,-b-c选项必须指定其中一个(都指定则处理-b参数,忽略-c参数),与此同时,-t参数也为必要参数,缺少这几个必要参数则topic创建失败!!!

    2. 使用Java代码手动创建topic的形式中,使用-b选项创建topic可以正常使用,此时RocketMQ直接使用指定的broker地址来找到broker并在对应broker上创建topic。

    3. 使用Java代码手动创建topic的形式中,直接使用-c选项创建topic无法创建成功,因为org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute方法需要一个DefaultMQAdminExt对象来连接到对应nameserver上以便获取对应集群下的所有broker信息,
      DefaultMQAdminExt对象针对nameserver的处理代码为:private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses(),查看代码发现其实际实现为System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV))。此时问题出现了,这里的实现代码并不能get到namesrv地址的参数值,导致连接到namesrv失败,完整报错信息如下:
      <details>
      <summary>点击查看完整报错信息</summary>

    org.apache.rocketmq.tools.command.SubCommandException: UpdateTopicSubCommand command failed
        at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:185)
        at com.bayss.bws.common.utils.RocketMQUtil.createTopic(RocketMQUtil.java:54)
        at com.bayss.bws.agent.internal.ConsumerManager.init(ConsumerManager.java:78)
        at com.bayss.bws.agent.core.InitializeAgent.init(InitializeAgent.java:74)
        at com.bayss.bws.agent.core.InitializeAgent.onApplicationEvent(InitializeAgent.java:184)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
        at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:402)
        at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:359)
        at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:896)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
        at com.bayss.bws.agent.core.BwsAgentApplication.main(BwsAgentApplication.java:24)
    Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed
        at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:392)
        at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerClusterInfo(MQClientAPIImpl.java:1193)
        at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.examineBrokerClusterInfo(DefaultMQAdminExtImpl.java:275)
        at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.examineBrokerClusterInfo(DefaultMQAdminExt.java:222)
        at org.apache.rocketmq.tools.command.CommandUtil.fetchMasterAddrByClusterName(CommandUtil.java:83)
        at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:158)
        ... 17 more
    

    </details>

    于是决定采用手动set属性值的方式(对应属性值设值完成后不再需要-n参数即可创建topic),手动set属性值的代码如下:

    @Configuration
    public class RocketMQConfig implements InitializingBean {
        // 必须保证这里能获取到正确的namesrv地址,否则再次gg
        @Value("${rocketmq.name-server}")
        private String rocketMQNamesrv;
        @Override
        public void afterPropertiesSet() throws Exception {
            System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, rocketMQNamesrv);
        }
    }
    
    1. 在命令行中使用updateTopic命令帮助时,有-n选项,但是在RocketMQ源码中并没有发现该选项的处理逻辑,并且在只设置-n(namesrv)时程序会报错(因为缺少第一条所说的-b-c参数)。
    • 使用示例:

    String[] subargs = new String[] {
                    "-b 10.1.4.231:10911",
                    "-t unit-test-from-java-111",
                    //"-r 8",
                    //"-w 8",
                    //"-p 6",
                    //"-o false",
                    //"-u false",
                    //"-s false"
            };
            boolean isTopicCreated = RocketMQUtil.createTopic(subargs);
            //boolean isTopicCreated = RocketMQUtil.createTopic("10.1.4.231:10911", "", "testttttt");
            if (isTopicCreated) {
                System.err.println("topic create success");
            }
    
    • Java工具类
    import org.apache.commons.cli.CommandLine;
    import org.apache.commons.cli.Options;
    import org.apache.commons.cli.PosixParser;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.protocol.body.ClusterInfo;
    import org.apache.rocketmq.common.protocol.route.BrokerData;
    import org.apache.rocketmq.remoting.exception.RemotingConnectException;
    import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
    import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
    import org.apache.rocketmq.srvutil.ServerUtil;
    import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
    import org.apache.rocketmq.tools.command.SubCommandException;
    import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
    
    import java.util.HashSet;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    
    /**
     * fixme 类中的方法不确定能否支持多namesrv的情况!
     * @author Zephyr
     * @date 2019/12/21.
     */
    public class RocketMQUtil {
    
        /**
         * 创建topic 可以自定义所有topic支持的参数
         * @param subargs updateTopic命名支持的所有参数选项
         * @return topic创建成功,返回 true
         * @throws SubCommandException
         */
        public static boolean createTopic(String[] subargs) throws SubCommandException {
            /*String[] subargs = new String[] {
                    "-b 10.1.4.231:10911",
                    "-t unit-test-from-java-1",
                    "-r 8",
                    "-w 8",
                    "-p 6",
                    "-o false",
                    "-u false",
                    "-s false"};*/
            UpdateTopicSubCommand cmd = new UpdateTopicSubCommand();
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            final Options updateTopicOptions = cmd.buildCommandlineOptions(options);
            final CommandLine commandLine = ServerUtil
                    .parseCmdLine("mqadmin "   cmd.commandName(),
                            subargs, updateTopicOptions, new PosixParser());
    
            cmd.execute(commandLine, updateTopicOptions, null);
            return true;
        }
    
        /**
         * 根据 brokerAddr or clusterName 创建topic
         * @param brokerAddr  在指定 broker 上创建topic时,此参数为必填,否则传null
         * @param clusterName 在指定 cluster 上创建topic时,此参数为必填,否则传null
         * @param topic       要创建的topic
         * @return            创建成功,返回true
         */
        public static boolean createTopic(String brokerAddr, String clusterName, String topic) throws Exception {
            if (StringUtils.isBlank(topic)) {
                return false;
            }
            List<String> argList = new LinkedList<>();
            argList.add("-t "   topic);
            if (StringUtils.isNotBlank(brokerAddr)) {
                argList.add("-b "   brokerAddr.trim());
            } else {
                argList.add("-c "   clusterName.trim());
            }
            return createTopic(argList.toArray(new String [0]));
        }
    
        /**
         * 在指定name server下使用默认参数创建topic
         * @param namesrvAddr
         * @param topic
         * @return
         */
        public static boolean createTopic(String namesrvAddr, String topic) {
            try {
                Set<String> clusterNames = RocketMQUtil.getClusterNames(namesrvAddr);
                for (String clusterName : clusterNames) {
                    RocketMQUtil.createTopic(null, clusterName, topic);
                }
                return true;
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        }
    
    
        /**
         * 获取指定 namesrv下的集群信息
         * @param namesrvAddr
         * @return
         * @throws MQClientException
         * @throws InterruptedException
         * @throws MQBrokerException
         * @throws RemotingTimeoutException
         * @throws RemotingSendRequestException
         * @throws RemotingConnectException
         */
        public static ClusterInfo getClusterInfo(String namesrvAddr) throws MQClientException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
            if (StringUtils.isBlank(namesrvAddr)) {
                return new ClusterInfo();
            }
            DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(5000L);
            mqAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
            mqAdminExt.setNamesrvAddr(namesrvAddr);
            mqAdminExt.start();
    
            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
            mqAdminExt.shutdown();
            return clusterInfo;
        }
    
        /**
         * 获取指定name server下的所有集群名称
         * @param namesrvAddr
         * @return
         * @throws MQClientException
         * @throws InterruptedException
         * @throws MQBrokerException
         * @throws RemotingTimeoutException
         * @throws RemotingSendRequestException
         * @throws RemotingConnectException
         */
        public static Set<String> getClusterNames(String namesrvAddr) throws MQClientException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
            return getClusterInfo(namesrvAddr).getClusterAddrTable().keySet();
        }
    
        /**
         * 获取指定 namesrv 下的所有broker信息(多name server下不确定能否正常工作)
         * @param namesrvAddr namesrv地址
         * @return HashMap<String, BrokerData>
         */
        public static Map<String, BrokerData> getAllBrokerInfo(String namesrvAddr) throws MQClientException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
            return getClusterInfo(namesrvAddr).getBrokerAddrTable();
        }
    
        /**
         * 获取连接到指定 namesrv 下的所有broker地址
         * @param namesrvAddr
         * @return
         */
        public static Set<String> getBrokerAddrs(String namesrvAddr) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
            Map<String, BrokerData> allBrokerInfo = getAllBrokerInfo(namesrvAddr);
            Set<String> brokerAddrs = new HashSet<>();
            for (BrokerData brokerData : allBrokerInfo.values()) {
                brokerAddrs.addAll(brokerData.getBrokerAddrs().values());
            }
            return brokerAddrs;
        }
    }
    
    

    相关文章

      网友评论

          本文标题:RocketMQ在Java代码之中手动创建Topic

          本文链接:https://www.haomeiwen.com/subject/cdysrktx.html