产生的背景:
定义
重复消费的问题
消息持久化的问题
spring cloud 集成 Spring cloud stream 代码如下
消息生成者
pom.xml
<dependencies>
<!--监控依赖的包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- 引入eureka客户端 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!--引入spring cloud stream 依赖包-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--添加common 通用包-->
<dependency>
<artifactId>springcloud-api-common</artifactId>
<groupId>com.tina.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
============================================================
配置文件
server:
port: 8801
#配置服务名
spring:
application:
name: springcloud-stream-rabbitmq-provider
cloud:
stream:
binders: #此处配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称,用于binding整合
type: rabbit #消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 152.136.27.48
port: 5672
username: tina
password: 123456
bindings: #服务的整合处理
output: #这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: text/plain #applicaiton/json #设置消息类型,本次为json,文本则设置"text/plain"
binder: defaultRabbit #设置要绑定的消息服务的具体设置
#注册到eureka
eureka:
client:
service-url:
defaultZone: http://springcloud-eureka-server7001:7001/eureka,http://springcloud-eureka-server7002:7002/eureka
instance:
instance-id: springcloud-stream-rabbitmq-provider
prefer-ip-address: true #访问路径可以显示IP地址
lease-renewal-interval-in-seconds: 1 #向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
lease-expiration-duration-in-seconds: 2 #收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除
============================================================
启动类
@SpringBootApplication
@EnableEurekaClient
public class StreamRabbitmqProvider8801 {
public static void main(String[] args) {
SpringApplication.run(StreamRabbitmqProvider8801.class,args);
}
}
============================================================
service 包
接口
public interface ImessageProvider {
public String send();
}
----------------------------------------------------------------------------------------------------
实现类
@Slf4j
@EnableBinding(Source.class) //指信道channel和exchange绑定到一起 或者说定义推送管道
public class ImessageProviderImpl implements ImessageProvider {
@Resource
private MessageChannel output; //消息发送管道
@Override
public String send() {
String serial = IdUtil.simpleUUID();
output.send(MessageBuilder.withPayload(serial).build());
log.info("serial的值为:"+serial);
return null;
}
}
============================================================
控制类
@RestController
public class SendMessageProvider {
@Resource
private ImessageProvider imessageProvider ;
@GetMapping(value = "/sendMessage")
public String sendMessage(){
return imessageProvider.send();
}
}
============================================================
消息消费者
pom.xml
<dependencies>
<!--监控依赖的包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 引入eureka客户端的依赖包 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!--引入spring cloud stream 依赖包-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--引入spring cloud config 客户端 的依赖包-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--添加common 通用包-->
<dependency>
<artifactId>springcloud-api-common</artifactId>
<groupId>com.tina.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
</dependencies>
============================================================
配置文件
server:
port: 8802
#配置服务名
spring:
application:
name: springcloud-stream-rabbitmq-consumer
cloud:
stream:
binders: #此处配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称,用于binding整合
type: rabbit #消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 152.136.27.48
port: 5672
username: tina
password: 123456
bindings: #服务的整合处理
input: #这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: text/plain #applicaiton/json #设置消息类型,本次为json,文本则设置"text/plain"
binder: defaultRabbit #设置要绑定的消息服务的具体设置
group: consumerA #设置分组
#注册到eureka
eureka:
client:
service-url:
defaultZone: http://springcloud-eureka-server7001:7001/eureka,http://springcloud-eureka-server7002:7002/eureka
instance:
instance-id: springcloud-stream-rabbitmq-consumer8802
prefer-ip-address: true #访问路径可以显示IP地址
lease-renewal-interval-in-seconds: 1 #向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
lease-expiration-duration-in-seconds: 2 #收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除
============================================================
启动类
@EnableEurekaClient
@SpringBootApplication
public class StreamConsumerApplication8802 {
public static void main(String[] args) {
SpringApplication.run(StreamConsumerApplication8802.class,args);
}
}
============================================================
控制类
@RestController
@Slf4j
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String port;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
log.info("消费者1号, 接收到的信息为:"+message.getPayload() +",端口号为:"+port);
}
}
============================================================
网友评论