stream是消息驱动框架,类似于jdbc驱动,它整合了kafka、rabitmq。使得开发人员只专注于底层的业务逻辑,不关注用的是什么消息中间件。
1.demo:
pom文件:
<!--rabbit驱动-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
配置文件:
server:
port: 8801
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka/
instance:
##标识你的服务名称
instance-id: spring-cloud-config-consume
##标识你的ip地址
prefer-ip-address: true
spring:
application:
name: springcloudstream8801
cloud:
stream:
##绑定rabbitmq信息
binders:
defaultRabbit:
type: rabbit
enviroment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
##服务的整合处理
bindings:
output:
##使用exchange名称定义
destination: studyExchange
content-tpe: application/json
binder: defaultRabbit
业务逻辑:
消息生产者:
//定义消息的推送通道
@EnableBinding(Source.class)
public class IMessegeOutImpl implements IMessegeOut {
// 消息发送管道
@Resource
private MessageChannel output;
public String outMessage() {
output.send(MessageBuilder.withPayload("一剪梅").build());
return null;
}
}
消息消费者:
消费者配置文件里的output要改成input
@Component
@EnableBinding(Sink.class)
public class StreamController {
@StreamListener(Sink.INPUT)
public void input(Message<String> message ){
System.out.println("我是消息消费者============="+message.getPayload());
}
}
2.消息重复消费问题、持久化问题:
由于消费者被绑定之后都会生成不同的流水号的组,这里在配置文件中设置group,设置同一个组名。生产的消息会被均分到同组下的所有服务中去,解决了消息城府消费问题。
如果某一个服务待机挂掉同组的服务会消费未被接受的消息。
bindings:
input:
##使用exchange名称定义
destination: studyExchange
content-tpe: application/json
binder: defaultRabbit
group: lishuaiA
网友评论