美文网首页springcloudRocketMQalready
SpringCloud Stream整合RocketMQ

SpringCloud Stream整合RocketMQ

作者: 无我_无他_有你 | 来源:发表于2022-09-14 11:31 被阅读0次

    前言

    1.rocketmq 安装可参考:https://www.jianshu.com/p/f3713adfa3dd
    2.启动好nameserv 和 broker
    3.官方RocketMQ+springcloud stream 例子 https://github.com/alibaba/spring-cloud-alibaba/blob/2021.x/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md

    1. 本文将说明普通消息发送/消费、广播消息发送/消费、延时消息发送消费三种模式

    项目环境/依赖:

        <properties>
            <spring-boot-version>2.3.12.RELEASE</spring-boot-version>
            <spring-cloud-version>Hoxton.SR12</spring-cloud-version>
            <spring-cloud-alibaba-version>2.2.7.RELEASE</spring-cloud-alibaba-version>
            <rocketmq.version>2021.1</rocketmq.version>
        </properties>
        !-- Environment START-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring-boot-version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>${spring-cloud-alibaba-version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud-version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-stream-rocketmq -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <version>${rocketmq.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-acl</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.4</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.9.4</version>
        </dependency>
    

    依赖说明:spring-cloud-starter-stream-rocketmq 排除了rocketmq-client、rocketmq-acl依赖是因为我想换成新一点的依赖,不排除也是可以的。

    1.普通消息发送

    新建模块A用于消息发送
    创建一个controller用户测试消息发送

    @RestController
    public class RocketMqSendMsgController {
    
        @Autowired
        private StreamBridge streamBridge;
    
        @PostMapping(value = "/cluster")
        public void sendClusterMsg(@RequestParam("message") String message) {
            Message<BaseMessage<String>> msg = new GenericMessage<>(new BaseMessage<>(CLUSTER_MESSAGE_OUTPUT,"",message));
            boolean result = streamBridge.send(CLUSTER_MESSAGE_OUTPUT, msg);
            System.out.println(Thread.currentThread().getName() + " 消息集群发送: " + msg.getPayload().getData());
        }
    }
    

    yml配置

    server:
      port: 10004
    spring:
      application:
        name: search-server
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: localhost:9876
          bindings:
            cluster-out-0:
              destination: cluster
    

    配置说明
    1.配置name-server服务地址,必须要配置
    2.cluster-out-0 :channel 通道名称 默认的一个规则吧 发送消息就是 -out- 这样子

    1. destination: cluster :topic为cluster

    附上代码中用到的常量类

    package com.ly.tuliy.commons.base.mq;
    
    /**
     * 类说明: mq 常量类
     *
     * @author wqf
     * @date 2022/9/7 9:30
     */
    public class MessageConstant {
    
        //生产者-集群消息主题
        public static String CLUSTER_MESSAGE_OUTPUT="cluster-out-0";
        //生产者-广播消息主题
        public static String BROADCAST_MESSAGE_OUTPUT="broadcast-out-0";
        //生产者-延时消息主题
        public static String DELAYED_MESSAGE_OUTPUT="delayed-out-0";
    
    
        //消费者-集群消息主题
        public static String CLUSTER_MESSAGE_INPUT="cluster-in-0";
        //消费者-广播消息主题
        public static String BROADCAST_MESSAGE_INPUT="broadcast-in-0";
        //消费者-延时消息主题
        public static String DELAYED_MESSAGE_INPUT="delayed-in-0";
    
    }
    
    
    import java.io.Serializable;
    import java.util.Map;
    
    /**
     * @Author: wqf
     * @Date: 2022/09/09
     * @Description: mq 发送消息的内容体基础内容
     */
    @ToString
    public class BaseMessage<T> implements Serializable {
        /**
         * 消息主题
         */
        private String topic;
        /**
         * 消息标签
         */
        private String tag;
        /**
         * 消息内容
         */
        private T data;
        /**
         *
         */
        private Map<String, Object> header;
    
    
        public String getTopic() {
            return topic;
        }
    
        public void setTopic(String topic) {
            this.topic = topic;
        }
    
        public String getTag() {
            return tag;
        }
    
        public void setTag(String tag) {
            this.tag = tag;
        }
    
        public T getData() {
            return data;
        }
    
        public void setData(T data) {
            this.data = data;
        }
    
        public Map<String, Object> getHeader() {
            return header;
        }
    
        public void setHeader(Map<String, Object> header) {
            this.header = header;
        }
    
        public BaseMessage(String topic, String tag, T data, Map<String, Object> header) {
            this.topic = topic;
            this.tag = tag;
            this.data = data;
            this.header = header;
        }
    
        public BaseMessage(String topic, String tag, T data) {
            this.topic = topic;
            this.tag = tag;
            this.data = data;
        }
    
        public BaseMessage(String topic,  T data) {
            this.topic = topic;
            this.data = data;
        }
    
        public BaseMessage() {
        }
    }
    

    新建模块B用于消息消费
    创建一个类接收消息

    /**
     * @Author: wqf
     * @Date: 2022/09/09
     * @Description:
     */
    @RestController
    public class RocketMqReceiveMsgController {
    
        @Autowired
        private StreamBridge streamBridge;
    
        /**
         * 函数式编辑接收消息
         */
        @Bean
        public Consumer<String> cluster() {
            return message -> {
                System.out.println("接收的集群消息为:" + message);
            };
        }
    

    yml配置

    server:
      port: 10005 #${random.int[10000,19999]} # 随机端口,方便启动多个消费者
    spring:
      application:
        name: seckill-server
      cloud:
        stream:
          function:
            #消费者端配置
            definition: cluster
          rocketmq:
            binder:
              name-server: localhost:9876
          bindings:
            cluster-in-0:
              destination: cluster
              group: cluster-group
    

    配置说明:
    1.definition: cluster 消费者端配置,这里配置的cluster 必须和我们接收消息类中的方法名称一致


    image.png

    2.cluster-in-0:也是默认的规则 -in- 标识接收消息
    3.group:消费组名称配置 ,这个一定要配,名称命名没有要求

    测试:
    用postman在生产者端(A)发送消息,消费端(B)能正常接收到消息。将消费端B多启动几个端口,创建多消费者环境,此时我们发送消息可以观测到消息将随即被几个消费者消费,一个消息只会被消费一次

    出现的问题: 消息接收不到或者是报错,请先检查下主题是否创建(rocketmq 控制台看看),或者启动broker时修改配置为自动创建主题。

    2.广播消息发送

    生产者(A)controller添加测试接口

        @PostMapping(value = "/broadcast")
        public void sendBroadcastMsg(@RequestParam("message") String message) {
            Message<BaseMessage<String>> msg = new GenericMessage<>(new BaseMessage<>(BROADCAST_MESSAGE_OUTPUT,"",message));
            boolean result = streamBridge.send(BROADCAST_MESSAGE_OUTPUT, msg);
            System.out.println(Thread.currentThread().getName() + " 消息广播发送: " + msg.getPayload().getData());
        }
    

    消费者端(B)添加以下配置

        /**
         * 函数式编辑接收消息
         */
        @Bean
        public Consumer<String> broadcast() {
            return message -> {
                System.out.println("接收的广播消息为:" + message);
            };
        }
    
    server:
      port: 10005 #${random.int[10000,19999]} # 随机端口,方便启动多个消费者
    spring:
      application:
        name: seckill-server
      cloud:
        stream:
          function:
            #消费者端配置
            definition: cluster;broadcast
          rocketmq:
            binder:
              name-server: localhost:9876
            bindings:
              broadcast-in-0:
                consumer:
                  #配置是否开启广播消息 默认为false
                  broadcasting: true
          bindings:
            cluster-in-0:
              destination: cluster
              group: cluster-group
            broadcast-in-0:
              destination: broadcast
              group: broadcast-group
    

    配置说明:
    1.consumer.broadcasting: true 该配置默认是false,true表示开启广播消费

    测试:
    启动多个消费者,发送消息时,每个消费者都能接收到每条生产者的消息

    3.延时消息发送

    生产者(A)controller添加测试接口

        @PostMapping(value = "/delayed")
        public void sendDelayedMsg(@RequestParam("message") String message) {
            String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    
            for (int i = 0; i < 100; i++) {
                String key = "KEY" + i;
                Map<String, Object> headers = new HashMap<>();
                headers.put(MessageConst.PROPERTY_KEYS, key);
                headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
                // 设置延时等级1~10
                headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 4);
                BaseMessage<String> baseMessage = new BaseMessage<>(MessageConstant.DELAYED_MESSAGE_OUTPUT, message);
                baseMessage.setHeader(headers);
                Message<BaseMessage<String>> msg = new GenericMessage<>(baseMessage, headers);
                streamBridge.send(MessageConstant.DELAYED_MESSAGE_OUTPUT, msg);
                System.out.println(Thread.currentThread().getName() + " 延时消息: " + msg.getPayload().getData());
            }
        }
    

    参数说明:
    messageDelayLevel :延时有18个等级(我试了前4个等级),每个等级延时时间如代码

    yml添加配置

    server:
      port: 10004
    spring:
      application:
        name: search-server
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: localhost:9876
            bindings:
              delayed-out-0:
                producer:
                  group: delayed-group
                  sync: true
          bindings:
            cluster-out-0:
              destination: cluster
            broadcast-out-0:
              destination: broadcast
            delayed-out-0:
              destination: delayed
    

    配置说明:
    bindings.delayed-out-0.producer.sync=true 该项配置只在生产端配置,表示消息发送通道delayed-out-0开启消息异步发送,一定要有,不然延时消息没效果

    消费者端(B)添加以下配置

        /**
         * 函数式编辑接收消息
         */
        @Bean
        public Consumer<String> delayed() {
            return message -> {
                System.out.println("接收的延时消息为:" + message);
            };
        }
    
    server:
      port: 10005 #${random.int[10000,19999]} # 随机端口,方便启动多个消费者
    spring:
      application:
        name: seckill-server
      cloud:
        stream:
          function:
            #消费者端配置
            definition: cluster;broadcast;delayed
          rocketmq:
            binder:
              name-server: localhost:9876
            bindings:
              broadcast-in-0:
                consumer:
                  #配置是否开启广播消息 默认为false
                  broadcasting: true
          bindings:
            cluster-in-0:
              destination: cluster
              group: cluster-group
            broadcast-in-0:
              destination: broadcast
              group: broadcast-group
            delayed-in-0:
              destination: delayed
              group: delayed-group
    

    相关文章

      网友评论

        本文标题:SpringCloud Stream整合RocketMQ

        本文链接:https://www.haomeiwen.com/subject/iqbkortx.html