美文网首页springcloudSpringCloudspring cloud
49 Spring Cloud Stream 入门案例

49 Spring Cloud Stream 入门案例

作者: 木子教程 | 来源:发表于2022-01-15 18:25 被阅读0次

    准备工作

    案例中通过rabbitMQ作为消息中间件,完成SpringCloud Stream的案例。需要自行安装

    消息生产者

    (1)创建工程引入依赖

       <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    

    (2)定义bingding

    发送消息时需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 Spring Cloud Stream 内置的接口:

    public interface Source {
        String OUTPUT = "output";
        
        @Output("output")
        MessageChannel output();
    }
    

    这就接口声明了一个 binding 命名为 “output”。这个binding 声明了一个消息输出流,也就是消息的生 产者。

    (3)配置application.yml

    spring:
     cloud:
       stream:
         bindings:
           output:
             destination: muziwk-default
             contentType: text/plain
    
    
    • contentType:用于指定消息的类型。具体可以参考 spring cloud stream docs
    • destination:指定了消息发送的目的地,对应 RabbitMQ,会发送到 exchange 是 muziwk-default 的所有消息队列中。

    (4)测试发送消息

    @SpringBootApplication
    @EnableBinding(Source.class)
    public class Application implements CommandLineRunner {
        @Autowired
        @Qualifier("output")
        MessageChannel output;
        @Override
        public void run(String... strings) throws Exception {
            //发送MQ消息
            output.send(MessageBuilder.withPayload("hello world").build());
       }
        
        public static void main(String[] args) {
            SpringApplication.run(Application.class);
       }
    }
    
    

    消息消费者

    (1)创建工程引入依赖

    <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    

    (2)定义bingding

    同发送消息一致,在Spring Cloud Stream中接受消息,需要定义一个接口,如下是内置的一个接口。

    public interface Sink {
        String INPUT = "input";
        @Input("input")
        SubscribableChannel input();
    }
    

    注释 @Input 对应的方法,需要返回 SubscribableChannel ,并且参入一个参数值。 这就接口声明了一个 binding 命名为 “input” 。

    (3)配置application.yml

    spring:
     cloud:
       stream:
         bindings:
           input:
             destination: muziwk-default 
    

    destination:指定了消息获取的目的地,对应于MQ就是 exchange,这里的exchange就是 muziwk-default

    (4) 测试

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class Application {
        // 监听 binding 为 Sink.INPUT 的消息
        @StreamListener(Sink.INPUT)
        public void input(Message<String> message) {
            System.out.println("监听收到:" + message.getPayload());
       }
        public static void main(String[] args) {
            SpringApplication.run(Application.class);
       }
    }
    
    
    
    • 定义一个 class (这里直接在启动类),并且添加注解@EnableBinding(Sink.class) ,其中 Sink 就是上述的接口。同时定义一个方法(此处是 input)标明注解为 @StreamListener(Processor.INPUT),方法参数为 Message 。
    • 所有发送 exchange 为“muziwk-default ” 的MQ消息都会被投递到这个临时队列,并且触发上述的方 法。

    相关文章

      网友评论

        本文标题:49 Spring Cloud Stream 入门案例

        本文链接:https://www.haomeiwen.com/subject/rltxhrtx.html