「 从0到1学习微服务SpringCloud 」08 构建消息驱

作者: KenDoEverything | 来源:发表于2019-04-18 15:14 被阅读0次
    系列文章(更新ing):

    「 从0到1学习微服务SpringCloud 」01 一起来学呀! 「 从0到1学习微服务SpringCloud 」02 Eureka服务注册与发现
    「 从0到1学习微服务SpringCloud 」03 Eureka的自我保护机制
    「 从0到1学习微服务SpringCloud 」04服务消费者Ribbon+RestTemplate
    「 从0到1学习微服务SpringCloud 」05消费者Fegin
    「 从0到1学习微服务SpringCloud 」06 统一配置中心Spring Cloud Config
    「 从0到1学习微服务SpringCloud 」07 RabbitMq的基本使用

    简介

    官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

    简单来说,它就是用来与消息中间件进行交互的,我们不需要直接对消息中间件进行操作,而是通过Spring Cloud Stream,从而简化了对中间件的操作,并进行了解耦(想要更换消息中间件时,无需更改代码)。

    image

    应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,而 Spring Cloud Stream 的 binder 负责与中间件交互。

    所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。最大的好处莫过于对中间件的再次封装,可以做到代码层面对消息中间件的无感知,甚至于动态的切换中间件。

    目前Stream只提供了RabiitMq和Kafka的binder,若要使用其他的消息中间件,需要自己自定义binder。

    基本使用

    消费者

    1.新建一个项目, micro-service1用于接收消息,作为eureka client,增加mq,stream-mq的maven,修改相关配置等不再累述,与之前一样

    2.定义一个接口,将input绑定名为"input"的消息通道

    public interface Receiver {
        //消息通道名称
        String INPUT = "input";
    
        //绑定可订阅的通道
        @Input(INPUT)
        SubscribableChannel input();
    }
    

    3.定义Stream接收类

    @Component
    //@EnableBinding注解可以接收一个或多个接口类作为对象
    // 声明绑定的消息通到,实现与消息代理的连接
    @EnableBinding(Receiver.class)
    @Log4j2
    public class StreamReceiver {
    
        //监听binding的input
        @StreamListener(Receiver.INPUT)
        //message为接收到信息消息
        public void input(Message<String> message){
            log.info("StreamReceiver: {}", message.getPayload());
        }
    }
    

    启动,默认是会创建一个临时队列,临时队列绑定的exchange为 “input”
    所有发送 exchange 为“input” 的MQ消息都会被投递到这个临时队列,并通过上述方法接收。

    image

    以上代码就完成了最基本的消费者部分。

    生产者

    1.新建一个项目, micro-service2用于发送消息,具体步骤步骤累述

    2.定义一个接口,,将output绑定名为"input"的消息通道

    public interface Sender {
        //消息通道名称
        String OUTPUT = "input";
    
        @Output(OUTPUT)
        MessageChannel output();
    }
    

    3.定义Stream发送类Controller

    @RestController
    @EnableBinding(Sender.class)
    @Log4j2
    public class SendController {
        @Autowired
        @Qualifier(Sender.OUTPUT)
        MessageChannel output;
    
        @GetMapping("send")
        public void send(){
            String message = "Hello! I am Stream Message!";
            log.info("发送Stream消息: {}",message);
            output.send(MessageBuilder.withPayload(message).build());
        }
    }
    

    以上代码就完成了最基本的消费者部分。

    启动后,调用/send接口,可看到收发消息成功的日志

    image
    消息分组

    当消费者集群部署时,它们当中应当只有一个能接受到消息。但按照现在的配置,每个消费者都能收到消息,我们来看看。

    1.启动两个micro-service1,设置不同接口

    2.调用/send接口,两个应用均能收到消息

    image image

    显然这是不合理,这里就需要用到消息分组

    3.在micro-service1应用中添加Stream分组配置

    cloud:
        stream:
          bindings:
            #为input消息通道添加分组
            input:
              group: testGroup
    

    4.启动两个micro-service1,调用/send接口。现在,发送一条信息,只能在其中一个应用中接收到消息,两个应用轮训接收。

    Spring Cloud Stream的简单使用讲解就到这里了,下期再见啦~

    如果觉得不错,分享给你的朋友!

    image image

    THANDKS

    • End -

    一个立志成大腿而每天努力奋斗的年轻人

    伴学习伴成长,成长之路你并不孤单!

    扫描二维码,关注公众号

    相关文章

      网友评论

        本文标题:「 从0到1学习微服务SpringCloud 」08 构建消息驱

        本文链接:https://www.haomeiwen.com/subject/xmhtgqtx.html