美文网首页MQ
消息中间件——RocketMQ(一) 环境搭建(完整版)

消息中间件——RocketMQ(一) 环境搭建(完整版)

作者: Coder编程 | 来源:发表于2019-05-10 20:27 被阅读0次
    求关注
    环境搭建

    每章一点正能量:自我控制是最强者的本能。——萧伯纳

    前言

    最近在学习消息中间件——RocketMQ,打算把这个学习过程记录下来。此章主要介绍环境搭建。此次主要是单机搭建(条件有限),包括在Windows、Linux环境下的搭建,以及console监控平台搭建,最后加一demo验证一下。

    环境准备

    在搭建RocketMQ之前,请先确保如下环境已经搭建完毕

    • Java环境(我的JDK1.8)
    • Maven环境(我的3.6.1目前最新版)
    • Git环境

    没有搭建的同学走传送门:

    JDK环境搭建: JAVA8环境搭建
    Maven环境搭建: Windows环境下使用Nexus 3.X 搭建Maven私服及使用介绍
    Git环境搭建:Git环境搭建及配置


    1. Windows环境下搭建

    1.1 下载

    官方网站:http://rocketmq.apache.org/

    官网

    目前最新版的是V4.5.0,点击进去。


    rocketmq-all-4.5.0-bin-release.zip

    选择下载 rocketmq-all-4.5.0-bin-release.zip。弹出另外一个页面,这里选择rocketmq-all-4.5.0-bin-release.zip进行下载。


    rocketmq-all-4.5.0-bin-release.zip
    下载成功后,选择一个目录放好并解压。
    解压

    1.2 修改JVM配置

    以上操作完毕之后,进入目录bin目录,我这里是
    H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin
    找到runserver.cmdrunbroker.cmd中的JAVA_OPT。

    在这里插入图片描述
    原JAVA_OPT:
    set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    

    将 Xms Xmx 这两个值改小一些,改为1g,如:

    set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    

    自己根据虚拟机内存大小设置,超出内存大小可能会报错。

    1.3 配置环境变量

    上述步骤执行完毕后,我们需要将RocketMQ安装目录的bin目录配置到环境变量中。

    RocketMQ_HOME

    1.4 启动

    以上配置都完成,接下来就是启动过程。中间有点坑,请务必按步骤安装。

    在RocketMQ安装目录的bin目录下,执行命令cmd:

    我的目录:

    H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin
    

    可以通过shift+鼠标右击 触发cmd窗口选项。也可以通过win+R 在窗口输入cmd,进入cmd窗口后移动到bin目录下。

    1.4.1 启动NAMESERVER

    • 执行命令:start mqnamesrv.cmd

    成功后会弹出提示框,此框勿关闭。


    success

    1.4.3 启动BROKER

    • 执行命令:‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’

    注意:假如弹出提示框提示‘错误: 找不到或无法加载主类 xxxxxx’。打开runbroker.cmd,然后将‘%CLASSPATH%’加上英文双引号。

    失败
    打开 runbroker.cmd 进行修改
    原:
    set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"
    

    修改后:

    set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""
    
    修改后

    再次执行命令:
    启动成功!

    成功

    这时候一共有三个窗口。


    2. 安装Console监控

    2.1 下载

    下载地址:https://github.com/apache/rocketmq-externals

    下载完后如图所示:选择——>rocketmq-console


    rocketmq-console

    2.2 配置

    下载完成之后,进入‘rocketmq-externals\rocketmq-console\src\main\resources’文件夹,打开‘application.properties’进行配置。


    修改配置
    修改配置

    2.2 编译启动

    进入‘\rocketmq-externals\rocketmq-console’文件夹,执行‘mvn clean package -Dmaven.test.skip=true’,编译生成。中间有个比较慢的下载过程需要等待。


    编译

    编译成功之后,cmd进入‘target’文件夹,执行‘java -jar rocketmq-console-ng-1.0.1.jar’,启动‘rocketmq-console-ng-1.0.1.jar’。

    rocketmq-console-ng-1.0.1.jar
    rocketmq-console-ng-1.0.1.jar

    2.3 查看

    访问地址:localhost:8082


    界面

    2.Linux环境下搭建

    2.1 环境准备

    • Java环境
    • Maven环境

    2.1.1 Linux环境搭建Jdk

    下载JDK:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

    下载需要的版本:

    下载地址

    上传到创建的目录/usr/java

    解压命令

    tar -zxvf jdk-8u181-linux-x64.tar.gz
    

    配置环境变量命令

    vim /etc/profile
     
    JAVA_HOME=/usr/java/jdk1.8.0_161
    JRE_HOME=/usr/java/jdk1.8.0_161/jre
    CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
    PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
    export JAVA_HOME JRE_HOME CLASS_PATH PATH
     
    source /etc/profile
    
    

    验证是否成功命令

    
    java -version
    
    
    JDK安装成功

    按照以上操作,完成JDK的安装。接下来安装Maven环境。

    2.1.2 Linux环境搭建Maven

    1. 下载命令:
    
    wget http://mirror.bit.edu.cn/apache/maven/binaries/apache-maven-3.2.2-bin.tar.gz
    
    
    1. 解压命令:
    
    tar -zxvf apache-maven-3.2.2-bin.tar.gz
    
    
    1. 配置Maven环境命令:
    
    vim /etc/profile
     
    #配置maven环境变量
    export MAVEN_HOME=/usr/maven/apache-maven-3.5.4
    export MAVEN_HOME
    export PATH=$PATH:$MAVEN_HOME/bin
     
    source /etc/profile
    
    
    1. 验证是否成功命令:
    
    mvn -v
    
    
    
    Maven安装成功

    2.2 下载RocketMQ

    1. 下载命令:
    
    wget http://mirrors.hust.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-source-release.zip
    
    
    1. 解压命令:
    unzip rocketmq-all-4.4.0-source-release.zip
    

    [图片上传失败...(image-792c35-1557466617990)]

    1. 构建二进制文件命令

    进入解压后的文件目录。

    mvn -Prelease-all -DskipTests clean install -U
    
    
    构建二进制成功

    2.3 修改JVM配置

    同Windows环境一样,修改JVM配置。
    移动到目录 /home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq/bin 中。编辑bin目录下runserver.shrunbroker.sh文件。

    根据个人虚拟机大小进行修改

    
    vim runserver.sh 
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"
    vim runbroker.sh
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"
    
    
    修改JVM配置

    2.4 配置RocketMQ环境变量

    分别执行如下命令:

    #修改环境变量
    vim /etc/profile
    
    export ROCKETMQ=/home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq
    export PATH=$PATH:$ROCKETMQ/bin
    
    #更新配置
    source /etc/profile
    

    [图片上传失败...(image-18d4c3-1557466617991)]

    2.5 启动NAMESERVER

    依然在之前的目录 /home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq

    • 执行命令:
    ##启动命令
    nohup sh bin/mqnamesrv  >/dev/null 2>&1 &
    
    ##查看日志
    tail -f ~/logs/rocketmqlogs/namesrv.log
    
    

    [图片上传失败...(image-88d435-1557466617991)]

    可以看图已经成功了!

    2.6 启动BROKER

    • 执行命令:
    ##启动命令
    nohup sh bin/mqbroker -n localhost:9876 &
    
    ##查看日志
    tail -f ~/logs/rocketmqlogs/broker.log
    
    

    [图片上传失败...(image-62e759-1557466617991)]

    注意防火墙,如果端口连接失败,注意开通。

    2.7 关闭命令

    sh bin/mqshutdown broker    //停止 broker
     
    sh bin/mqshutdown namesrv   //停止 nameserver
    
    

    2.8 配置Console监控平台

    同Windows平台搭建

    2.8.1 启动Console

    我这里直接将Windows平台打包好的jar包直接丢到了Linux系统中

    • 启动命令:
    java -jar rocketmq-console-ng-1.0.1.jar
    
    

    [图片上传失败...(image-435d8d-1557466617991)]

    2.8.2 访问Console管理界面

    访问地址:http://192.168.220.72:8082

    [图片上传失败...(image-635ed8-1557466617991)]


    3. Console监控平台说明

    这里不做过多介绍,可以参考以下文章

    官网地址:https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md

    其他博客地址:https://guozh.net/rocketmqzhiconsolejiankongpingtaishiyongxiangjiesan/

    3. 案例测试

    案例整合环境:SpringBoot环境
    案例来源于网络

    3.1 pom.xml文件

    
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.4.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.coderprogramming.rocketmq</groupId>
        <artifactId>rocketmq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>rocketmq</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>2.0.2</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    

    3.2 Producer生产者

    
    **
     * @Description: 生产者
     * @author Coder编程
     * @date 2019/5/8 17:08
     */
    
    @Component
    public class Producer {
        /**
         * 生产者的组名
         */
        @Value("${apache.rocketmq.producer.producerGroup}")
        private String producerGroup;
    
        /**
         * NameServer 地址
         */
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
    
        public void orderedProducer() throws MQClientException, InterruptedException {
            /**
             * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例
             * 注意:ProducerGroupName需要由应用来保证唯一
             * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
             * 因为服务器会回查这个Group下的任意一个Producer
             */
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            producer.setNamesrvAddr(namesrvAddr);
            /**
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可 注意:切记不可以在每次发送消息时,都调用start方法
             */
            producer.start();
    
    
            /**
             * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
             * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态
             * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,
             * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
             */
            try {
                for (int i = 0; i < 10; i++) {
                        Message msg = new Message("Topic1",// topic
                                "TagA",// tag
                                "001",// key
                                ("Send Msg:Hello MetaQ1").getBytes());// body
                        SendResult sendResult = producer.send(msg);
                        System.out.println(sendResult);
    
                        Message msg2 = new Message("Topic2",// topic
                                "TagB",// tag
                                "002",// key
                                ("Send Msg:Hello MetaQ2").getBytes());// body
                        SendResult sendResult2 = producer.send(msg2);
                        System.out.println(sendResult2);
    
    
                        Message msg3 = new Message("Topic3",// topic
                                "TagC",// tag
                                "003",// key
                                ("Send Msg:Hello MetaQ3").getBytes());// body
                        SendResult sendResult3 = producer.send(msg3);
                        System.out.println(sendResult3);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            /**
             * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
             * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
             */
            producer.shutdown();
        }
    
    }
    
    
    

    3.3 Consumer消费者

    
    
    /**
     * @Description: 消费者
     * @author Coder编程
     * @date 2019/5/8 17:08
     */
    
    @Component
    public class Consumer {
    
        /**
         * 生产者的组名
         */
        @Value("${apache.rocketmq.producer.producerGroup}")
        private String producerGroup;
    
        /**
         * NameServer 地址
         */
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
    
        /**
         * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。
         * 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法
         */
        public void orderedConsumer() throws InterruptedException,MQClientException {
            /**
             * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例
             * 注意:ConsumerGroupName需要由应用来保证唯一
             */
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup);
            // consumer.setNamesrvAddr("10.10.0.102:9876");
            consumer.setNamesrvAddr(namesrvAddr);
            /**
             * 订阅指定topic下tags分别等于TagA或TagC或TagD
             */
            consumer.subscribe("Topic1", "TagA || TagC || TagD");
            /**
             * 订阅指定topic下所有消息<br>
             * 注意:一个consumer对象可以订阅多个topic
             */
            consumer.subscribe("Topic2", "*");
            consumer.subscribe("Topic3", "*");
    
    
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                /**
                 * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
                 */
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
    
                    MessageExt msg = msgs.get(0);
                    if (msg.getTopic().equals("Topic1")) {
                        if (null != msg.getTags()) {
                            // 执行Topic1的消费逻辑
                            if (msg.getTags().equals("TagA")) {
                                // 执行TagA的消费
                                System.out.println("TagA开始。");
                            } else if (msg.getTags().equals("TagC")) {
                                System.out.println("TagC开始。");
                                // 执行TagC的消费
                            } else if (msg.getTags().equals("TagD")) {
                                // 执行TagD的消费
                                System.out.println("TagD开始。");
                            }
                        }
                    } else if (msg.getTopic().equals("Topic2")) {
                        // 执行Topic2的消费逻辑
                        System.out.println("Topic2");
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            /**
             * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
             */
            consumer.start();
            System.out.println("Consumer Started.");
        }
    
    }
    
    

    3.3 properties配置文件

    
    # 消费者的组名
    apache.rocketmq.consumer.PushConsumer=PushConsumer
    # 生产者的组名
    apache.rocketmq.producer.producerGroup=Producer
    # NameServer地址
    apache.rocketmq.namesrvAddr=192.168.220.72:9876
    # 设置应用端口
    server.port=8089
    
    

    3.4 测试代码

    
    /**
     * @author Coder编程
     * @Title: HelloWord
     * @ProjectName rocketmq
     * @Description: Hello World
     * @date 2019/5/814:14
     */
    
    @RestController
    public class Test {
    
        @Autowired
        private Producer producer;
    
        @Autowired
        private Consumer consumer;
    
        @RequestMapping("/test")
        public String testMQ2() {
            try {
                System.out.println("-----------------开始生产-----------------");
                producer.orderedProducer();
                System.out.println("-----------------开始消费-----------------");
                consumer.orderedConsumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "success";
        }
    
    }
    
    
    

    4.奉上源码

    以上安装jar包和案例测试源码已经上传至GitHub/Gitee

    源码图

    源码地址:

    Github地址

    Gitee地址

    文末

    欢迎关注公众号:Coder编程
    获取最新原创技术文章和相关免费学习资料,随时随地学习技术知识!

    微信公众号
    求关注

    相关文章

      网友评论

        本文标题:消息中间件——RocketMQ(一) 环境搭建(完整版)

        本文链接:https://www.haomeiwen.com/subject/oxqaoqtx.html