1. 下载RabbitMQ
可以使用docker镜像:
https://python.iitter.com/other/170428.html
2. 添加依赖
生产者和消费者都需要添加以下依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
3. 生产者
@Service
@Slf4j
public class Producer {
@Autowired
private StreamBridge streamBridge;
public void send(Object message) {
log.info("sent: {}", message);
streamBridge.send("testMessage-out-0", message);
}
}
4. 消费者
@Slf4j
@Service
public class Consumer {
@Autowired
private ConsumerService consumerService;
@Bean
public Consumer<Object> testMessage() {
return request -> {
log.info("received: {}", request);
consumerService.testConsumer(request);
};
}
}
在默认情况下,框架会使用消费者方法的 method name 作为当前消费者的标识,如果消费者标识和配置文件中的名称不一致,那么 Spring 应用就不知道该把当前的消费者绑定到哪一个 Stream 信道上去。
5. 配置文件
- Binder
如果程序中只使用了单一的中间件,比如只接入了 RabbitMQ,那么可以直接在 spring.rabbitmq 节点下配置连接串,不需要特别指定 binders 配置。
spring:
cloud:
stream:
# 如果你项目里只对接一个中间件,那么不用定义binders
# 当系统要定义多个不同消息中间件的时候,使用binders定义
binders:
my-rabbit:
type: rabbit # 消息中间件类型
environment: # 连接信息
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
- bindings
配置了生产者、消费者、binder 和 RabbitMQ 四方的关联关系。
spring:
cloud:
stream:
bindings:
# 添加 Producer
testMessage-out-0:
destination: testMessage-topic
content-type: application/json
binder: my-rabbit
# Consumer
testMessage-in-0:
destination: testMessage-topic
content-type: application/json
# 消费组,同一个组内只能被消费一次
group: add-coupon-group
binder: my-rabbit
function:
definition: testMessage
如上就可以生成和消费消息了。
6. 异常处理
- 重试
消息重试是一种简单高效的异常恢复手段,当 Consumer 端抛出异常的时候,Stream 会自动执行 2 次重试。
spring:
cloud:
stream:
bindings:
testMessage-in-0:
destination: testMessage-topic
content-type: application/json
# 消费组,同一个组内只能被消费一次
group: testMessage-group
binder: my-rabbit
consumer:
# 如果最大尝试次数为1,即不重试
# 默认是做3次尝试
max-attempts: 5
# 两次重试之间的初始间隔
backOffInitialInterval: 2000
# 重试最大间隔
backOffMaxInterval: 10000
# 每次重试后,间隔时间乘以的系数
backOffMultiplier: 2
# 如果某个异常你不想重试,写在这里
retryableExceptions:
java.lang.IllegalArgumentException: false
除了本地重试以外,还可以把这个失败的消息丢回到原始队列中,做一个 requeue 的操作。
spring:
cloud:
stream:
rabbit:
bindings:
# requeue重试
testMessage-in-0:
consumer:
requeue-rejected: true
- 降级
通过 spring-integration 的注解 @ServiceActivator 做了一个桥接,将指定 Channel 的异常错误转到本地方法里。
@ServiceActivator(inputChannel = "testMessage-topic.testMessage-group.errors")
public void requestCouponFallback(ErrorMessage errorMessage) throws Exception {
log.info("consumer error: {}", errorMessage);
// 实现自己的逻辑
}
- 死信队列
如果想要保留这条出错的 Message,可以选择将它发送到另一个 Queue 里。这个特殊的 Queue 就叫做死信队列。
开启
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
配置
spring:
cloud:
stream:
rabbit:
bindings:
testMessage-in-0:
consumer:
auto-bind-dlq: true
死信队列的名称和第一个队列几乎一样,唯一区别就是末尾多了一个.dlq,这个 dlq 就是死信队列的标志。
7.延迟消息
下载rabbitmq_delayed_message_exchange插件
https://blog.csdn.net/u010375456/article/details/106323962/
- 生产者
// 使用延迟消息发送
public void sendInDelay(Object message) {
log.info("sent: {}", coupon);
streamBridge.send("testMessageDelay-out-0"
MessageBuilder.withPayload(message)
.setHeader("x-delay", 10 * 1000)
.build());
}
- 消费者
@Bean
public Consumer<Object> testMessageDelay() {
return request -> {
log.info("received: {}", request);
service.consumer(request);
};
}
- 配置文件
spring:
cloud:
stream:
bindings:
# 延迟发券 - producer
testMessageDelay-out-0:
destination: testMessage-delayed-topic
content-type: application/json
binder: my-rabbit
# 延迟发券 - Consumer
testMessageDelay-in-0:
destination: testMessage-delayed-topic
content-type: application/json
# 消费组,同一个组内只能被消费一次
group: testMessage-group
binder: my-rabbit
consumer:
# 如果最大尝试次数为1,即不重试
# 默认是做3次尝试
max-attempts: 1
function:
definition: testMessageDelay
rabbit:
bindings:
testMessageDelay-out-0:
producer:
delayed-exchange: true
testMessageDelay-in-0:
consumer:
delayed-exchange: true
网友评论