美文网首页
Spring Cloud Stream 集成 RocketMQ

Spring Cloud Stream 集成 RocketMQ

作者: 树木有朋 | 来源:发表于2020-04-28 22:19 被阅读0次

    Spring Cloud Stream 是什么?

    它是什么

    Spring Cloud Stream 是一个构建高度可扩展的事件驱动微服务的框架,与共享消息系统相连。

    该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的 Spring 用法和最佳实践之上,包括支持持久化的 pub/sub 语义、消费者组和有状态分区。

    绑定的一些实现

    Spring Cloud Stream支持多种绑定实现,下表包括了GitHub项目的链接。

    Spring Cloud Stream的核心构件是:

    • Destination Binders: 负责提供与外部消息系统集成的组件。
    • Destination Bindings: 作为消息中间件与应用程序的提供者和消费者之间的桥梁。
    • Message: 生产者和消费者用于与目的地装订器沟通的典型数据结构(从而通过外部消息系统与其他应用程序进行通信的典型数据结构)。

    为什么用 Cloud Stream?

    1. 解耦。使用了 SCS 之后,我们只需要在配置文件中配置下对应的中间件服务器地址等信息,然后就可使用,使得业务中不需要出现具体的消息中间件。
    2. 便于迁移。例如项目中一开始使用的是 rabbitmq,后期要想迁移成 kafka 的话,如果使用传统方式,在使用的地方使用具体消息中间件的话,那么迁移的成本会很高,而使用 SCS 的话,只需要更改配置文件即可。

    如何使用?(集成 rocket mq )

    配置 JAVA_HOME 路径(以下为 mac 环境下的配置)

    1. chmod 777 /etc/profile

    2. sudo vim /etc/profile

    3. export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home

      export PATH=$JAVA_HOME/bin:$PATH

    4. source /etc/profile

    安装启动 Rocket MQ

    1. 从官网 下载二进制文件
    2. 启动 nameserver nohup sh bin/mqnamesrv &
    3. 启动 broker nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
    4. 设置 nameserver 地址 export NAMESRV_ADDR=localhost:9876
    5. 生产者发送消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    6. 消费者消费消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    7. 关闭 broker sh bin/mqshutdown broker
    8. 关闭 nameserver sh bin/mqshutdown namesrv

    提示:

    如果没有执行 export NAMESRV_ADDR=localhost:9876

    会导致 java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed

    集成 SCS

    1. 通过 https://start.spring.io/ 创建一个初始化项目

    2. 这里贴出 pom 文件配置

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
          <parent>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-parent</artifactId>
              <version>2.2.6.RELEASE</version>
              <relativePath/> <!-- lookup parent from repository -->
          </parent>
          <groupId>org.example</groupId>
          <artifactId>mq</artifactId>
          <version>1.0-SNAPSHOT</version>
      
          <properties>
              <java.version>1.8</java.version>
              <spring-cloud.version>Hoxton.SR4</spring-cloud.version>
          </properties>
      
          <dependencies>
              <dependency>
                  <groupId>com.alibaba.cloud</groupId>
                  <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
                  <version>2.2.1.RELEASE</version>
              </dependency>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-web</artifactId>
              </dependency>
              <dependency>
                  <groupId>org.springframework.cloud</groupId>
                  <artifactId>spring-cloud-stream-test-support</artifactId>
                  <scope>test</scope>
              </dependency>
              <dependency>
                  <groupId>org.springframework.cloud</groupId>
                  <artifactId>spring-cloud-stream</artifactId>
              </dependency>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter</artifactId>
              </dependency>
      
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-test</artifactId>
                  <scope>test</scope>
                  <exclusions>
                      <exclusion>
                          <groupId>org.junit.vintage</groupId>
                          <artifactId>junit-vintage-engine</artifactId>
                      </exclusion>
                  </exclusions>
              </dependency>
              <dependency>
                  <groupId>junit</groupId>
                  <artifactId>junit</artifactId>
                  <scope>test</scope>
              </dependency>
          </dependencies>
      
          <dependencyManagement>
              <dependencies>
                  <dependency>
                      <groupId>org.springframework.cloud</groupId>
                      <artifactId>spring-cloud-dependencies</artifactId>
                      <version>${spring-cloud.version}</version>
                      <type>pom</type>
                      <scope>import</scope>
                  </dependency>
              </dependencies>
          </dependencyManagement>
      
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-maven-plugin</artifactId>
                  </plugin>
              </plugins>
          </build>
      </project>
      
    3. 创建 CustomerChannel

      public interface CustomerChannel {
      
          /**
           * 这里的名称对应了spring.cloud.stream.rocketmq.bindings.<channelName>
           */
          String OUTPUT = "my-output";
          String INPUT = "my-input";
      
          @Output(CustomerChannel.OUTPUT)
          MessageChannel output();
      
          @Input(CustomerChannel.INPUT)
          SubscribableChannel input();
      }
      
    4. 定义 TestController

      @RestController
      @EnableBinding({CustomerChannel.class})
      public class TestController {
      
          private final CustomerChannel customerChannel;
      
          public TestController(CustomerChannel customerChannel) {
              this.customerChannel = customerChannel;
          }
      
          /**
           * 使用一个controller断点模拟发送消息,可以在setHeader方法中设置header来实现消息过滤
           */
          @PostMapping("/message-send")
          public String testCustomInterfaceSendMsg() {
              Message<String> message = MessageBuilder.withPayload("send message")
                      .setHeader(RocketMQHeaders.TAGS, "tag2")
                      .setHeader("mytag", "my-tag")
                      .build();
      
              this.customerChannel.output().send(message);
      
              Message<String> message2 = MessageBuilder.withPayload("send message")
                      .setHeader(RocketMQHeaders.TAGS, "tag3")
                      .setHeader("mytag", "your-tag")
                      .build();
      
              this.customerChannel.output().send(message2);
      
              return "success";
          }
      
          /**
           * 使用@StreamListener来监听消息
           */
          @StreamListener(value = CustomerChannel.INPUT, condition = "headers['mytag']=='my-tag'")
          public void testCustomListener(Message message) {
              System.out.println(message.getHeaders().get("TAGS") + " " + message.getPayload().toString());
          }
      
          /**
           * 使用@StreamListener来监听消息
           */
          @StreamListener(value = CustomerChannel.INPUT, condition = "headers['mytag']=='your-tag'")
          public void testCustomListenerFilter(Message message) {
              System.out.println(message.getHeaders().get("TAGS") + " " + message.getPayload().toString());
          }
      }
      
    5. 配置 application.yml

      spring:
        cloud:
          stream:
            rocketmq:
              binder:
                name-server: localhost:9876
                enable-msg-trace: true
              bindings:
                my-input:
                  consumer:
                    tags: tag2 || tag1 || tag3 || tag4 # tag 为 tag1/tag2/tag3/tag4
            bindings:
              my-input:
                destination: my-stream-topic # 相当于 rocketmq 的 topic
                group: my-stream-group
                binder: rocketmq #
                consumer:
                  instanceCount: 1 # 指定实例数量
              my-output:
                destination: my-stream-topic # 相当于 rocketmq 的 topic
      
    6. 运行 MQApplication,使用 POST 方法请求 localhost:8989/message-send

    核心原理

    消息发送和消费的流程:

    1. 消息通过 MessageChannel(output) 进行发送,AbstractMessageChannel 实现了 MessageChannelAbstractSubscribableChannel 继承了 AbstractMessageChannel 并且实现了 SubscribableChannel,重写了其中的 subscribe 方法,subscribe() 指定了 MessageHandler,最终会调用 RocketMQMessageHandler 发送消息

    2. 消息发出之后,对应的消息中间件内部会有通道适配器,将中间件特有的消息格式转换为 SpringMessage,然后发送到 MessageChannel(input)

    3. StreamListener 订阅了对应的 input ,根据一定的条件,就能收到消费者发出的消息。

    原理图

    本文参考

    https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ-en

    http://rocketmq.apache.org/docs/quick-start/

    RocketMQ 和 Spring Cloud Stream

    相关文章

      网友评论

          本文标题:Spring Cloud Stream 集成 RocketMQ

          本文链接:https://www.haomeiwen.com/subject/reuiwhtx.html