美文网首页
SpringBoot整合rocketmq

SpringBoot整合rocketmq

作者: 任未然 | 来源:发表于2022-01-06 09:00 被阅读0次

    一. 概述

    参考开源项目https://github.com/xkcoding/spring-boot-demo
    本Demo简单介绍springboot继承rocketmq, rocketmq采用docker单机部署

    二. 安装rocketmq

    由于官方没有提供docker镜像,所以用星星较多的foxiswho/rocketmq:4.8.0安装

    1. 拉取镜像
    docker pull foxiswho/rocketmq:4.8.0
    
    1. 安装nameserver
    docker run -d \
    --name rmqnamesrv \
    -e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
    -p 9876:9876 \
    foxiswho/rocketmq:4.8.0 \
    sh mqnamesrv
    
    1. broker

    新建配置文件:~/conf/broker.conf

    #所属集群名字
    brokerClusterName=DefaultCluster
    
    #broker名字,注意此处不同的配置文件填写的不一样,如果在broker-a.properties使用:broker-a,
    #在broker-b.properties使用:broker-b
    brokerName=broker-a
    
    #0 表示Master,>0 表示Slave
    brokerId=0
    
    #nameServer地址,分号分割
    #namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    namesrvAddr=xx.xx.xx.xx:9876
    
    #启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
    # 解决方式1 加上一句producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
    brokerIP1=xx.xx.xx.xx
    
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 !!!这里仔细看是false,false,false
    autoCreateTopicEnable=true
    
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    
    #Broker 对外服务的监听端口
    listenPort=10911
    
    #删除文件时间点,默认凌晨4点
    deleteWhen=04
    
    #文件保留时间,默认48小时
    fileReservedTime=120
    
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    #storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
    #commitLog 存储路径
    #storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
    #消费队列存储
    #storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
    #消息索引存储路径
    #storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
    #checkpoint 文件存储路径
    #storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
    #abort 文件存储路径
    #abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
    #限制的消息大小
    maxMessageSize=65536
    
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=ASYNC_MASTER
    
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    

    运行脚本

    docker run -d \
    -v ~/conf:/home/rocketmq/conf \
    --name rmqbroker \
    # 这里填nameserver的宿主机IP
    -e "NAMESRV_ADDR=xx.xx.xx.xx:9876" \
    -e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
    -p 10911:10911 -p 10912:10912 -p 10909:10909 \
    foxiswho/rocketmq:4.8.0 \
    sh mqbroker -c /home/rocketmq/conf/broker.conf
    
    1. 安装控制台
    docker run -d \
    --name rmqconsole \
    # 这里填nameserver的宿主机IP
    -e "JAVA_OPTS=-Drocketmq.namesrv.addr=xx.xx.xx.xx:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
    -p 8180:8080 -t styletang/rocketmq-console-ng
    
    1. 打开控制台
    localhost:8180
    
    image.png

    三. 搭建spring项目

    3.1 依赖

        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
            <rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
              <groupId>org.apache.rocketmq</groupId>
              <artifactId>rocketmq-spring-boot-starter</artifactId>
              <version>${rocketmq-spring-boot-starter-version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
              <optional>true</optional>
            </dependency>
        </dependencies>
    

    3.2 application.yml

    server:
      port: 8080
      servlet:
        context-path: /demo
    rocketmq:
      # 多个用;隔开
      name-server: 127.0.0.1:9876
      producer:
        # 生产组
        group: demo-group
        # 发送消息超时时间,默认 3000
        sendMessageTimeout: 3000
        # 发送消息失败重试次数,默认2
        retryTimesWhenSendFailed: 2
        # 发送异步消息失败重试次数,默认2
        retryTimesWhenSendAsyncFailed: 2
    

    3.3 启动类

    @SpringBootApplication
    public class SpringBootDemoMqRocketmqApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringBootDemoMqRocketmqApplication.class, args);
        }
    
    }
    

    3.4 消息发送

    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringBootDemoMqRocketmqApplicationTests {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @Test
        public void contextLoads() {
            rocketMQTemplate.syncSend("topic_wpr" + ":" + "tag_1","hello springboot rocketmq");
        }
    
    }
    

    3.5 消息消费

    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    
    /**
     * 监听
     *
     * @author wangpr
     * @date 2022/1/5
     */
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "topic_wpr", selectorExpression = "tag_1", consumerGroup = "QueueHandler")
    public class QueueListener implements RocketMQListener<String>  {
    
        @Override
        public void onMessage(String message) {
            log.info("接收到消息:"+message);
        }
    }
    
    image.png

    相关文章

      网友评论

          本文标题:SpringBoot整合rocketmq

          本文链接:https://www.haomeiwen.com/subject/moehcrtx.html