美文网首页
Sprin Cloud Stream rabbit实战

Sprin Cloud Stream rabbit实战

作者: Mr培 | 来源:发表于2022-10-12 14:22 被阅读0次
    架构图
    Binder
    • Destination Binder (目标绑定器) :与消息中间件通信的组件
    • Destination Bindings (目标绑定) : Binding是连接应用程序跟消息中间件的桥梁 ,用于消息的消费和生产,由binder创建
    • Message(消息)
    springboot整合stream之生产者
    1. 加依赖
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    
    1. 写注解,在启动类上加
    @SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
    public class StreamProducerMain {
        public static void main(String[] args){
            SpringApplication.run(StreamProducerMain.class,args);
        }
    }
    
    1. 写配置
    server:
      port: 8081
    spring:
      application:
        name: cloud-stream-producer
      cloud:
        stream:
          binders: # 在此处配置要绑定的rabbitmq的服务信息;
            defaultRabbit: # 表示定义的名称,用于于binding整合
              type: rabbit # 消息组件类型
              environment: # 设置rabbitmq的相关的环境配置
                spring:
                  rabbitmq:
                    host: 127.0.0.1
                    port: 5672 #查看rabbitmq Listening ports amqp 为 ip 端口为 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            output: # 这个名字是一个通道的名称
              destination: xypspExchange # 表示要使用的Exchange名称定义
              content-type: application/json # 设置消息类型,文本则设置“text/plain”
              binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    
    
    1. 测试方法
    /**
     * @author rp
     */
    @AllArgsConstructor
    @RestController
    @RequestMapping("/message")
    public class MessageProviderController {
    
    
        private final MessageProviderServer messageProviderServer;
    
    
        /**
         * 发送消息
         * */
        @GetMapping("/sendMessage")
        public Result sendMessage(){
            String uuid = messageProviderServer.send();
            return Result.success(uuid);
        }
    }
    
    
    package com.xypsp.springcloud.server;
    
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    
    import javax.annotation.Resource;
    import java.util.UUID;
    
    /**
     * @author rp
     * 定义消息的推送管道
     */
    @EnableBinding(Source.class)
    public class MessageProviderServer {
    
        /**
         * 消息发送管道
         */
        @Resource
        private MessageChannel output;
    
        public String send() {
            String serial = UUID.randomUUID().toString();
            //发送延迟消息
    //        output.send(MessageBuilder.withPayload(serial).setHeader("x-delay",5000).build());
            output.send(MessageBuilder.withPayload(serial).build());
            return serial;
        }
    
    }
    
    
    springboot整合stream之消费者A
    1. 加依赖
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    
    1. 写配置
    server:
      port: 8082
    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          binders: # 在此处配置要绑定的rabbitmq的服务信息;
            defaultRabbit: # 表示定义的名称,用于于binding整合
              type: rabbit # 消息组件类型
              environment: # 设置rabbitmq的相关的环境配置
                spring:
                  rabbitmq:
                    host: 127.0.0.1
                    port: 5672 #查看rabbitmq Listening ports amqp 为 ip 端口为 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            input: # 这个名字是一个通道的名称
              destination: xypspExchange # 表示要使用的Exchange名称定义
              content-type: application/json # 设置消息类型,文本则设置“text/plain”
              binder: defaultRabbit # 设置要绑定的消息服务的具体设置
              group: xypspGroup # 消息的持久化 Consumer断开连接,队列仍然存在,相同group的消费同一个queue的时候是轮询的方式,每个实例一条轮着消费,避免重复消费。
    
    
    1. 消费方法
    package com.xypsp.springcloud.server;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    /**
     * @author rp
     * 定义消息的接收管道
     */
    @Slf4j
    @Component
    @EnableBinding(Sink.class)
    public class MessageConsumerListener {
    
        @StreamListener(Sink.INPUT)
        public void input(Message<String> message) {
            log.info("接收到消息: {}",message.getPayload());
        }
    }
    
    
    springboot整合stream之消费者B
    1. 加依赖
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    
    1. 写配置
    server:
      port: 8083
    spring:
      application:
        name: cloud-stream-consumer-b
      cloud:
        stream:
          binders: # 在此处配置要绑定的rabbitmq的服务信息;
            defaultRabbit: # 表示定义的名称,用于于binding整合
              type: rabbit # 消息组件类型
              environment: # 设置rabbitmq的相关的环境配置
                spring:
                  rabbitmq:
                    host: 127.0.0.1
                    port: 5672 #查看rabbitmq Listening ports amqp 为 ip 端口为 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            input: # 这个名字是一个通道的名称
              destination: xypspExchange # 表示要使用的Exchange名称定义
              content-type: application/json # 设置消息类型,文本则设置“text/plain”
              binder: defaultRabbit # 设置要绑定的消息服务的具体设置
              group: xypspGroup # 消息的持久化 Consumer断开连接,队列仍然存在,相同group的消费同一个queue的时候是轮询的方式,每个实例一条轮着消费,避免重复消费。
    
    
    1. 消费方法
    package com.xypsp.springcloud.server;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    /**
     * @author rp
     * 定义消息的接收管道
     */
    @Slf4j
    @Component
    @EnableBinding(Sink.class)
    public class MessageConsumerListener {
    
        @StreamListener(Sink.INPUT)
        public void input(Message<String> message) {
            log.info("接收到消息: {}",message.getPayload());
        }
    }
    
    

    相关文章

      网友评论

          本文标题:Sprin Cloud Stream rabbit实战

          本文链接:https://www.haomeiwen.com/subject/crdogrtx.html