美文网首页编程杂货铺SpringBoot精选Spring Cloud
Spring Cloud Stream 消息驱动 RabbitM

Spring Cloud Stream 消息驱动 RabbitM

作者: 会动的木头疙瘩儿 | 来源:发表于2018-11-30 11:49 被阅读57次

    项目的快速搭建参照官方 Creating a Sample Application by Using Spring Initializr

    RabbitMQ环境使用

    RabbitMQ部署在DockerSwarm集群

    加入依赖

          <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream</artifactId>
            </dependency>
    

    再选择Kafka或RabbitMQ

    • Kafka
    • RabbitMQ
      比如我选择RabbitMQ,那么我项目的pom
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
            </dependency>
    

    消息处理

    修改启动类

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class RabbitmqStreamExampleApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(RabbitmqStreamExampleApplication.class, args);
        }
    
        @StreamListener(Sink.INPUT)
        public void handle(Person person) {
            System.out.println("Received: " + person);
        }
    
        public static class Person {
            private String name;
            public String getName() {
                return name;
            }
            public void setName(String name) {
                this.name = name;
            }
            public String toString() {
                return this.name;
            }
        }
    }
    
    • @EnableBinding(Sink.class) 是绑定一个输入通道,Sink是提供的开箱即用的输入通道
    • @StreamListener(Sink.INPUT) 监听输入进来的消息

    Sink的源码

    public interface Sink {
        String INPUT = "input";
    
        @Input("input")
        SubscribableChannel input();
    }
    

    试试从RabbitMQ手动发消息

    先启动项目,启动前配置一下rabbitmq连接

    spring:
      application:
        name: rabbitmq-stream-example
      rabbitmq:
        host: 172.16.10.172
        port: 5672
        username: guest
        password: guest
    server:
      port: 8080
    

    启动项目
    启动日志中有rabbitmq的连接及注册通道的信息

    Initializing ExecutorService 'taskScheduler'
    Registering MessageChannel input
    Registering MessageChannel nullChannel
    Registering MessageChannel errorChannel
    Registering MessageHandler errorLogger
    Channel 'rabbitmq-stream-example.input' has 1 subscriber(s).
    Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
    Channel 'rabbitmq-stream-example.errorChannel' has 1 subscriber(s).
    started _org.springframework.integration.errorLogger
    declaring queue for inbound: input.anonymous._RE-Zx6tQKWHDKGfc0NV9g, bound to: input
    Attempting to connect to: [172.16.10.172:5672]
    Created new connection: rabbitConnectionFactory#e72dba7:0/SimpleConnection@5f303ecd [delegate=amqp://guest@172.16.10.172:5672/, localPort= 55249]
    Registering MessageChannel input.anonymous._RE-Zx6tQKWHDKGfc0NV9g.errors
    Channel 'rabbitmq-stream-example.input.anonymous._RE-Zx6tQKWHDKGfc0NV9g.errors' has 1 subscriber(s).
    Channel 'rabbitmq-stream-example.input.anonymous._RE-Zx6tQKWHDKGfc0NV9g.errors' has 2 subscriber(s).
    started inbound.input.anonymous._RE-Zx6tQKWHDKGfc0NV9g
    Started RabbitmqStreamExampleApplication in 2.376 seconds (JVM running for 3.764)
    

    查看Rabbitmq的queue

    input.anonymous._RE-Zx6tQKWHDKGfc0NV9g
    手动发消息
    {"name":"Sam Spade"}
    
    发送消息

    查看控制台,已接收到消息


    控制台结果

    应用模型

    SCSt-with-binder.png
    应用程序能过Spring Cloud Stream注入的input和output与外界的连通是通过Binder实现,Spring Cloud Stream 提供了KafkaRabbitMQ的Binder实现。

    给消费者分组 spring.cloud.stream.bindings.<channelName>.group

    举个例子,假如只有一个消息生产者和一个消费者,消息能正常处理,在微服中可能一个消费者会有多个实例,一个消息会被多个实例处理,这样就出现了消息重复的问题,给消费者分组之后,一个消费者的多个实例中只会有一个实例处理消息

    spring:
      application:
        name: rabbitmq-stream-example
      rabbitmq:
        host: 172.16.10.172
        port: 5672
        username: guest
        password: guest
      cloud:
        stream:
          bindings:
            input:
              destination: mqtestDefault # 指定了消息获取的目的地,对应于MQ就是 exchange,这里的exchange就是 mqTestDefault
              group: user-channel
            output:
              destination: mqtestDefault
              contentType: text/plain
    
    server:
      port: 8080
    

    项目改造
    启动类

    @SpringBootApplication
    @EnableBinding(Source.class)
    public class RabbitmqStreamExampleApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(RabbitmqStreamExampleApplication.class, args);
        }
    
    }
    

    创建一个消息监听 SinkMsgRecvicer

    @EnableBinding(Sink.class)
    public class SinkMsgRecvicer {
    
        private static Logger logger = LoggerFactory.getLogger(SinkMsgRecvicer.class);
    
        @StreamListener(Sink.INPUT)
        public void msg(String value) {
            logger.info("Recvicer : {}", value);
        }
    }
    

    写一个测试的TestController
    需要增加web依赖

    @RestController
    public class TestController {
    
        @Autowired
        Source source;
    
        @RequestMapping("/send")
        public String send(String name) {
            source.output().send(MessageBuilder.withPayload("send to : " + name).build());
    
            return "发送成功 " + name;
        }
    }
    

    启动项目 访问 http://localhost:8080/send?name=liangwang
    控制台会有输出

     Recvicer : send to : liangwang
    
    RabbitMq Exchanges

    待续。。。

    源码 rabbitmq-stream-example
    官方文档 Elmhurst.RELEASE

    相关文章

      网友评论

        本文标题:Spring Cloud Stream 消息驱动 RabbitM

        本文链接:https://www.haomeiwen.com/subject/dqkzqqtx.html