贴上springcloud-Stream的官方文档,大家可以的话就去多看看https://spring.io/projects/spring-cloud-stream
添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>stream-sample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>stream-sample</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>2020.0.3</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
创建一个消息监听器
package com.example.streamsample;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Slf4j
@Component
public class StreamConsumer {
@Bean
public Consumer<String> sink() {
return System.out::println;
}
}
讲解一下吧,在stream2.0之前是使用 @StreamListener(Sink.INPUT)这种方式来监听的,而现在这种方法官方已经不建议使用,如果用的话也行,不过会提示不建议,现在我写的这种方式是最新的推荐使用的。
image.png
可以看到不论消息生产者还是消费者都建议使用
@Bean
public Function<String, String> toUpperCase() {
return s -> s.toUpperCase();
}
这种方式,如果你是消费者你就用Consumer<String>,如果你是生产者你就使用Supplier<Date>。
配置文件
从上面的依赖可以看到我们必须得安装一个rabbitmq作为消息中间件来使用了,如何安装可以去查阅相关资料,比较简单,下面是默认安装之后的配置,安装成功之后可以访问地址来看rabbitmq(http://localhost:15672/):
image.pngspring.application.name=stream-sample
server.port=63000
#rabbitMQ连接字符串
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
测试
启动项目,然后我们在rabbitmq的页面上选择queues然后在下面找 sink-in-0*******以这个开头的,后面的星号可能是一串字符串,我们选择他,然后可以在页面下面输入一串自定义的消息,然后发布:
image.png我们再回到程序界面控制台,就可以看到打印出来了,就证明springcloud这块已经监听到消息,并消费了。
image.png
网友评论