美文网首页
SpringCloud 进阶: 消息驱动(入门)Spring C

SpringCloud 进阶: 消息驱动(入门)Spring C

作者: _兰陵笑笑生 | 来源:发表于2020-02-05 22:36 被阅读0次

     我的博客:程序员笑笑生,欢迎浏览博客!

     上一章 SpringCloud 基础教程(十二)-Zipkin分布式链路追踪系统搭建当中,我们使用Zipkin搭建完整的实时数据追踪系统。本章开始我们将进入Spring Cloud的更高阶的内容部分,首先从消息驱动Spring Cloud Stream开始。

    前言

     消息驱动,顾明思议,在企业级应用中,消息中间件经常用于处理非同步场景、消息通知、应用解耦等。常用的有RabbitMq、kafka、Redis等消息队列等。Spring Cloud Stream是一个构建事件消息驱动的微服务框架,提供了一个灵活的编程模型。并基于Spring的基础之上,支持发布-订阅模型、消费者分组、数据分片等功能。

    一、Stream 应用模型

    file
    • Middleware: 消息中间件,如RabbitMq等
    • Binder:可以认为是适配器,用来将Stream与中间连接起来的,不同的Binder对应不同的中间件,需要我们配置
    • Application Core:由Stream封装的消息机制,很少情况下自定义开发
    • inputs:输入,可以自定义开发
    • outputs:输出,可以自定义开发

    接下来快速开始,主要就是针对以上几个组件进行不同的配置。

    二、快速开始

     接下来,我们以RabbitMQ为例(消息队列的环境搭建整这里不做过多的介绍,本章以Stream为主),新建2个Maven工程,分别当做消息生产者(server-receiver)、消息生产者(server-sender),在2个项目中引入Stream依赖和Stream对RabbitMq的依赖,在生产者单独的添加web的依赖,为了能够通过HTTP调用发送信息:

    <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
           <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    

    2.1 server-receiver消费者

     启动主类:

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    /**
     * @EnableBinding 表示告诉当前应用,增加消息通道的监听功能
     * 监听Sink类中名为input的输入通道:
     */
    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class ReceiverApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(ReceiverApplication.class, args);
        }
        /**
         * 监听rabbitmq的消息,具体什么队列,什么topic,通过配置信息application获取
         *
         * @param msg
         */
        @StreamListener(Sink.INPUT)
        public void reader(String msg) {
            System.out.print("receiver {}:" + msg);
        }
    }
    
    

    application.yml配置:

    spring:
      cloud:
        stream:
          bindings:
             input:
                destination: mytopic
                binder: defaultRabbit
          binders:
             defaultRabbit:
                 type: rabbit
                 environment:
                    spring:
                     rabbitmq:
                         host: localhost
                         port: 5672
    server:
      port: 8081
    

     具体配置详解,spring.cloud.stream为前缀:

    bindings配置:

    • input:表示channelName,这里为什么是input,是因为启动类中@EnableBinding(Sink.class)注解当中配置Sink接口,该接口中默认定义了channelName的名称,当然我们也可以自己写Sink接口
    • destination:消息中间件的Topic
    • binder:当前bingding绑定的对应的适配器,该实例表示适配rabbitmq,名称默认为defaultRabbit,可以自定义,接着需要配置该名称对应的类型,环境信息等

    binders配置:

    • defaultRabbit:binder配置的适配器的名称,和spring.cloud.stream.bindings.input.binder值一样
    • environment:表示当前binder对应的配置信息

    2.2 生产者server-sender

     SenderApplication启动类,添加@EnableBinding注解:

    import com.microservice.stream.controller.SenderSource;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    /**
     * @EnableBinding(SenderSource.class) 表示监听Stream通道功能
     * 
     * SenderSource为自定义的通道接口
     * 
     */
    @SpringBootApplication
    @EnableBinding(SenderSource.class)
    public class SenderApplication {
       
        public static void main(String[] args) {
            SpringApplication.run(SenderApplication.class,args);
    
        }
    }
    
    

     自定义SenderSource接口,参考org.springframework.cloud.stream.messaging.Source,将channel的名称改成和消费者的Sink的channel名称一样。

    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    public interface SenderSource {
        /**
         * Name of the output channel.
         */
        String OUTPUT = "input";
        /**
         * @return output channel
         */
        @Output(SenderSource.OUTPUT)
        MessageChannel output();
    }
    
    

     编写控制器,通过HTTP发送消息:

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class SenderController {
        @Autowired
        SenderSource source;
        
        @RequestMapping("/send")
        public String sender(String msg) {
            source.output().send(MessageBuilder.withPayload(msg).build());
            return "ok";
        }
    
    }
    
    

     applicaiton.yml配置,配置和消费者的配置一样

    spring:
      cloud:
        stream:
          bindings:
             input:
                destination: mytopic
                binder: defaultRabbit
          binders:
             defaultRabbit:
                 type: rabbit
                 environment:
                    spring:
                     rabbitmq:
                         host: localhost
                         port: 5672
    server:
      port: 8081                     
    

    2.3 启动接受者和消费者,发送消息

     首先启动消费者,看启动日志,我们看到程序声明了一个名称为:mytopic.anonymous.88A97a5vQ9Ox07GnNBlKYQ的队列,并且绑定了mytopic 主题,创建了一个连上rabbit的连接:

    file

    我们看看rabbit的web页面队列列表中就有了新增了一个队列,并且绑定了mytopic主题:

    file

    然后再启动生产者server-sender,在启动日志中我们也看到了应用创建了到对应的消息队列的连接:

    file

    接下来我们通过HTTP发送信息:http://localhost:8081/send/?msg=test,在服务消费者的日志中,监听到了对应的消息:

    file

    通过以上的简单的实例,我们体验了Spring Cloud Stream在提供消息驱动服务方面非常的方便。

    三、代码分析

    3.1 @EnableBinding注解

     @EnableBinding表示告诉应用增加了通道监听功能,可以是一个或者多个,可以传入Sink和Source类,Sink和Souce可以自定义

    3.2 Sink和Soure

     我们首先看看Sink类和Source类,

    Sink

    /**
     * Bindable interface with one input channel.
     */
    public interface Sink {
    
        /**
         * Input channel name.
         */
        String INPUT = "input";
    
        /**
         * @return input channel.
         */
        @Input(Sink.INPUT)
        SubscribableChannel input();
    
    }
    
    

    Source

    package org.springframework.cloud.stream.messaging;
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    public interface Source {
    
        /**
         * Name of the output channel.
         */
        String OUTPUT = "output";
    
        /**
         * @return output channel
         */
        @Output(Source.OUTPUT)
        MessageChannel output();
    
    }
    
    

     Sink和Source一样是一个接口类型, INPUT、OUTPUT表示channel名称,@Input、@Output表示注入参数为对应的channel名称,在我们的上文中我们自定义了SenderSource类型的接口,为了和Sink的channel名称一样。

    • Sink:单一的输入通道
    • Source:单一的输出通道

     Spring会为每一个标注了@Output,@Input的管道接口生成一个实现类

    3.3 Spring-messaging的抽象

     在Sink和source的接口中,我们注意到了MessageChannel、SubscribableChannel类,在spring框架中,spring-message模块对消息处理的抽象类:

    对象 说明
    Message 消息
    MessageHandler 处理消息的单元
    MessageChannel 发送/接受传输消息的信道,单项的
    SubscribableChannel 继承MessageChannel,传送消息到所有的订阅者
    ExecutorSubscribableChannel 继承SubscribableChannel,异步线程池传输消息

    四、配置文件

      配置文件的格式如下,<>表示我们可以自定义:

    spring:
      cloud:
        stream:
          bindings:
             <channel-name>:   #channel名称
                destination: mytopic  # 发布-订阅模型的消息主题topic
                binder: defaultRabbit  #binder(适配器的名称)
          binders:
             <binder-name>:  # 根据binder配置一样的名称
                 type: rabbit  # 中间件的类型
                 environment:  # 中间件实例的环境
                    spring:
                     rabbitmq:
                         host: localhost
                         port: 5672
    

     我们可以定义多个binding,分别为binding绑定相同或不同的Binder

    总结

     本章我们初步的介绍了Spring Cloud Stream,通过对Steam的应用模型一节通过消息生产者和消费者模型实现了简单的发布-订阅的模型,对Stream有了一些了解,Steram的功能远不止于此。在后期的介绍中,我还将继续深入的介绍Stream更多的内容。

    file file file

    SpringCloud基础教程(一)-微服务与SpringCloud

    SpringCloud基础教程(二)-服务发现 Eureka

    SpringCloud基础教程(三)-Eureka进阶

    SpringCloud 基础教程(四)-配置中心入门

    SpringCloud基础教程(五)-配置中心热生效和高可用

    SpringCloud 基础教程(六)-负载均衡Ribbon

    SpringCloud 基础教程(七)-Feign声明式服务调用

    SpringCloud 基础教程(八)-Hystrix熔断器(上)

    SpringCloud 基础教程(九)-Hystrix服务监控(下)

    SpringCloud 基础教程(十)-Zull服务网关

    SpringCloud 基础教程(十一)- Sleuth 调用链追踪简介

    SpringCloud 基础教程(十二)-Zipkin 分布式链路追踪系统搭建

    SpringCloud 进阶: 消息驱动(入门) Spring Cloud Stream【Greenwich.SR3】

    更多精彩内容,请期待...

    本文由博客一文多发平台 OpenWrite 发布!

    相关文章

      网友评论

          本文标题:SpringCloud 进阶: 消息驱动(入门)Spring C

          本文链接:https://www.haomeiwen.com/subject/vciexhtx.html