美文网首页
Spring cloud stream 消息驱动

Spring cloud stream 消息驱动

作者: 刘小刀tina | 来源:发表于2020-03-11 22:19 被阅读0次

产生的背景:

定义

重复消费的问题




消息持久化的问题

spring cloud 集成 Spring cloud stream 代码如下

消息生成者
pom.xml
 <dependencies>

        <!--监控依赖的包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <!--   引入eureka客户端     -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <!--引入spring cloud stream 依赖包-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <!--添加common 通用包-->
        <dependency>
            <artifactId>springcloud-api-common</artifactId>
            <groupId>com.tina.springcloud</groupId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

    </dependencies>
============================================================
配置文件
server:
  port: 8801

#配置服务名
spring:
  application:
    name: springcloud-stream-rabbitmq-provider
  cloud:
      stream:
        binders:  #此处配置要绑定的rabbitmq的服务信息
          defaultRabbit: #表示定义的名称,用于binding整合
            type: rabbit #消息组件类型
            environment: # 设置rabbitmq的相关的环境配置
              spring:
                rabbitmq:
                  host: 152.136.27.48
                  port: 5672
                  username: tina
                  password: 123456
        bindings:  #服务的整合处理
          output:  #这个名字是一个通道的名称
            destination: studyExchange  # 表示要使用的Exchange名称定义
            content-type: text/plain #applicaiton/json  #设置消息类型,本次为json,文本则设置"text/plain"
            binder: defaultRabbit  #设置要绑定的消息服务的具体设置

#注册到eureka
eureka:
  client:
    service-url:
      defaultZone: http://springcloud-eureka-server7001:7001/eureka,http://springcloud-eureka-server7002:7002/eureka
  instance:
    instance-id: springcloud-stream-rabbitmq-provider
    prefer-ip-address: true  #访问路径可以显示IP地址
    lease-renewal-interval-in-seconds: 1  #向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
    lease-expiration-duration-in-seconds: 2 #收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除
============================================================
启动类
@SpringBootApplication
@EnableEurekaClient
public class StreamRabbitmqProvider8801 {

    public static void main(String[] args) {
        SpringApplication.run(StreamRabbitmqProvider8801.class,args);
    }
}
============================================================
service 包
接口 
public interface ImessageProvider {
     public  String send();
}
----------------------------------------------------------------------------------------------------
实现类
@Slf4j
@EnableBinding(Source.class) //指信道channel和exchange绑定到一起 或者说定义推送管道
public class ImessageProviderImpl  implements ImessageProvider {

    @Resource
    private MessageChannel output;  //消息发送管道

    @Override
    public String send() {
        String serial = IdUtil.simpleUUID();
        output.send(MessageBuilder.withPayload(serial).build());
        log.info("serial的值为:"+serial);
        return null;
    }
}
============================================================
控制类
@RestController
public class SendMessageProvider {

    @Resource
    private ImessageProvider imessageProvider ;

    @GetMapping(value = "/sendMessage")
    public String sendMessage(){

        return imessageProvider.send();
    }
}
============================================================
消息消费者
pom.xml
    <dependencies>


        <!--监控依赖的包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--   引入eureka客户端的依赖包     -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <!--引入spring cloud stream 依赖包-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>


        <!--引入spring cloud config 客户端 的依赖包-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--添加common 通用包-->
        <dependency>
            <artifactId>springcloud-api-common</artifactId>
            <groupId>com.tina.springcloud</groupId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

    </dependencies>
============================================================
配置文件
server:
  port: 8802

#配置服务名
spring:
  application:
    name: springcloud-stream-rabbitmq-consumer
  cloud:
    stream:
      binders:  #此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: #表示定义的名称,用于binding整合
          type: rabbit #消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 152.136.27.48
                port: 5672
                username: tina
                password: 123456
      bindings:  #服务的整合处理
        input:  #这个名字是一个通道的名称
          destination: studyExchange  # 表示要使用的Exchange名称定义
          content-type: text/plain #applicaiton/json  #设置消息类型,本次为json,文本则设置"text/plain"
          binder: defaultRabbit  #设置要绑定的消息服务的具体设置
          group: consumerA #设置分组

#注册到eureka
eureka:
  client:
    service-url:
      defaultZone: http://springcloud-eureka-server7001:7001/eureka,http://springcloud-eureka-server7002:7002/eureka
  instance:
    instance-id: springcloud-stream-rabbitmq-consumer8802
    prefer-ip-address: true  #访问路径可以显示IP地址
    lease-renewal-interval-in-seconds: 1  #向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
    lease-expiration-duration-in-seconds: 2 #收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除
============================================================
启动类
@EnableEurekaClient
@SpringBootApplication
public class StreamConsumerApplication8802 {

    public static void main(String[] args) {
        SpringApplication.run(StreamConsumerApplication8802.class,args);
    }
}
============================================================
控制类
@RestController
@Slf4j
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {

    @Value("${server.port}")
    private  String port;
    
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        log.info("消费者1号, 接收到的信息为:"+message.getPayload() +",端口号为:"+port);
    }
    
}
============================================================

相关文章

网友评论

      本文标题:Spring cloud stream 消息驱动

      本文链接:https://www.haomeiwen.com/subject/vhvhjhtx.html