前言
软件的高可用一直是软件建设的难点,接下来探讨一下如何借助 Spring Cloud Stream
让我们的 rabbitmq
变得更加高可用。
消息消费失败
消息的消费,说到底其实就是:根据接收到的消息(携带了某种信号)执行一系列业务逻辑。而执行过程中,由于种种异常情况,或多或少都会出现执行失败的情况,那么问题来了,当消息消费失败后,该怎么处理呢?
对于那种因为突发的异常情况导致消息消费失败的,可以简单的分为:
- 短暂性异常
- 持久性异常
短暂性异常比如有:网络抖动导致远程调用失败无法继续执行导致消费失败,这种短暂性异常一般在短时间内就能恢复正常,所以如果能让消费失败后的消息等待一小段时间后重新被投递并消费,那岂不是能大大减少因为异常导致消费失败的消息数量,因为异常恢复了,消息也就能正常消费了。
持久性异常比如有:某个服务因为一个未在测试阶段发现的bug导致整个远程服务不可用,远程服务不可用,消息也就注定消费失败了,这种情况下,肯定没办法短时间内就解决并重新部署服务,因此,就算消息被重新投递多少次,也不可能被正常消费,所以简单的重复投递消费失败的消息是无法让消息被正常消费的。这样反而只会无谓的浪费系统资源,说不定还会因此影响到其他服务。
失败重试
上面说到,失败重试可以解决短暂性导致的消费失败的情况。那么,Spring Cloud Stream
支不支持呢?答案是肯定的,而且还非常简单,只需加入几个配置即可。
首先,配置 spring.cloud.stream.bindings.<channelName>.consumer.maxAttempts
是用来决定:消息最大可以被尝试消费的次数,包含第一次投递。举个例子,假设为默认值 3,在第一次投递后,消费失败了,那么该消息还可以再被重复投递2次。如果设为1,也就代表不重试。另外,该配置的值必须大于0,当配置了 0 或 负数,直接无法启动成功,并报如下错误:

其次,既然有了失败重试机制,那么肯定得有重试策略,所以还需另外3个参数的配合,分别为(以下参数的前缀与maxAttempts
一样,均为 spring.cloud.stream.bindings.<channelName>.consumer
):
- backOffInitialInterval: 消息消费失败后重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
- backOffMultiplier: 相邻两次重试之间的间隔时间的倍数。默认2,即第二次是第一次间隔时间的2倍,第三次是第二次的2倍
- backOffMaxInterval: 下一次尝试重试的最大时间间隔,默认为10000ms,即10s。
那么怎么结合起来理解呢?举个例子:假设这几个配置均使用默认值,重试第一次1s,第二次2秒,因为默认最大重试次数为3,所以也就不会进行第三次重试;而如果最大重试次数配置了大于3的值,比如10,那么第三次4秒,第四次为8秒,而在第五次重试的时候,若没有最大重试时间间隔的限制,重试时间为 2^4^ = 16
,但是因为有了不超过10秒的限制,第五次重试的时间间隔为10秒,而不是刚刚算出的16秒;而接下来剩余的重试次数,其重试时间间隔均为10秒。
示例
以下代码可在 源码 查看。
配置
spring:
application:
name: scas-data-collection
profiles:
active:
default
cloud:
stream:
binders:
rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
packetUplinkOutput:
destination: packetUplinkTopic
content-type: application/json
binder: rabbit
packetUplinkInput:
destination: packetUplinkTopic
content-type: application/json
group: ${spring.application.name}
binder: rabbit
consumer:
maxAttempts: 3 # 当消息消费失败时,尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
backOffInitialInterval: 1000 # 消息消费失败后重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2,即第二次是第一次间隔时间的2倍,第三次是第二次的2倍
backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s。
上面的配置均使用默认配置。
消息模型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PacketModel {
/**
* 设备 eui
*/
private String devEui;
/**
* 数据
*/
private String data;
// 省略其他字段
}
测试用例
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("maxAttempt")
@EnableBinding({ScasMaxAttemptTest.MessageSink.class, ScasMaxAttemptTest.MessageSource.class})
public class ScasMaxAttemptTest {
@Autowired
private PacketUplinkProducer packetUplinkProducer;
private Random random = new Random();
private List<String> devEuis = new ArrayList<>(10);
@PostConstruct
private void initDevEuis() {
devEuis.add("10001");
devEuis.add("10002");
devEuis.add("10003");
devEuis.add("10004");
devEuis.add("10005");
devEuis.add("10006");
devEuis.add("10007");
devEuis.add("10008");
devEuis.add("10009");
devEuis.add("10010");
}
/**
*
*/
@Test
public void test() throws InterruptedException {
for (int i = 0; i < 1; i++) {
String devEui = getDevEuis();
packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
}
Thread.sleep(1000000);
}
private String getDevEuis() {
return devEuis.get(random.nextInt(10));
}
@Component
public static class PacketUplinkProducer {
@Autowired
private MessageSource messageSource;
public void publish(PacketModel model) {
log.info("发布上行数据包消息. model: [{}].", model);
messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
}
}
@Component
public static class PacketUplinkHandler {
@StreamListener("packetUplinkInput")
public void handle(PacketModel model) {
log.info("消费上行数据包消息. model: [{}].", model);
throw new RuntimeException();
}
}
public interface MessageSink {
@Input("packetUplinkInput")
SubscribableChannel packetUplinkInput();
}
public interface MessageSource {
@Output("packetUplinkOutput")
MessageChannel packetUplinkOutput();
}
}@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("maxAttempt")
@EnableBinding({ScasMaxAttemptTest.MessageSink.class, ScasMaxAttemptTest.MessageSource.class})
public class ScasMaxAttemptTest {
@Autowired
private PacketUplinkProducer packetUplinkProducer;
private Random random = new Random();
private List<String> devEuis = new ArrayList<>(10);
@PostConstruct
private void initDevEuis() {
devEuis.add("10001");
devEuis.add("10002");
devEuis.add("10003");
devEuis.add("10004");
devEuis.add("10005");
devEuis.add("10006");
devEuis.add("10007");
devEuis.add("10008");
devEuis.add("10009");
devEuis.add("10010");
}
/**
*
*/
@Test
public void test() throws InterruptedException {
for (int i = 0; i < 1; i++) {
String devEui = getDevEuis();
packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
}
Thread.sleep(1000000);
}
private String getDevEuis() {
return devEuis.get(random.nextInt(10));
}
@Component
public static class PacketUplinkProducer {
@Autowired
private MessageSource messageSource;
public void publish(PacketModel model) {
log.info("发布上行数据包消息. model: [{}].", model);
messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
}
}
@Component
public static class PacketUplinkHandler {
@StreamListener("packetUplinkInput")
public void handle(PacketModel model) {
log.info("消费上行数据包消息. model: [{}].", model);
throw new RuntimeException();
}
}
public interface MessageSink {
@Input("packetUplinkInput")
SubscribableChannel packetUplinkInput();
}
public interface MessageSource {
@Output("packetUplinkOutput")
MessageChannel packetUplinkOutput();
}
}
运行测试用例
使用默认配置
运行测试用例后,你会看到控制台打印类似如下的日志:

可以看到,打印的日志与上文分析的一致,第一次消费失败后,会再重试2次,一共尝试消费3次,最后一次也失败后,直接抛出异常,不再继续重试。
增加最大重试次数
配置 maxAttempts = 10
,再次启动测试用例,日志打印如下:

可以看到,从第五次重试开始,剩下的重试次数,重试时间间隔均为10s。
如何配置更合适
其实 Spring Cloud Stream
的默认配置基本就够了,因为如果是因为短暂性异常导致消息消费失败,重试2次基本就差不多了,重试太多反而可能会导致出现其他问题。
但是考虑到有些短暂性异常可能无法在1、2秒内恢复正常,那我们可以稍微增大配置 backOffInitialInterval
或 backOffMultiplier
的值,比如:backOffInitialInterval = 5000
,backOffMultiplier = 5
,backOffMaxInterval =60000
,这种配置可能就比较适合实时性不高的情况。
总之,我们可以根据具体业务以及生产环境,调整这几个配置的值。
重试次数用完后消息会去哪?
你可能会好奇,当重试次数用完后,消息会跑去哪呢?这时如果访问 Rabbitmq可视化页面,你会看到:

可以看到,
Ready
Unacked
Total
均为0,也就是说,消息被丢弃了?
事实上,消息确实被丢弃了,但是这样不好吧,这样会存在丢失部分消息的隐患,于是不得不引入另一个概念——死信队列。死信队列有什么用呢?死信队列是用来接收因为种种原因导致消息无法正常消费后的消息,当然这里的原因不止消息重试次数用完后的消息。
因为死信队列超出本文的范畴,这里就不详细说明,会在以后的文章详讲。
持久性异常的消费失败
当异常情况为持久性异常,在异常情况恢复正常之前,那么无论重试多少次,消息都无法被正常消费,所以只能在重试次数用完之后,要么丢弃该消息或进入死信队列。所以重试次数不能设置过大,避免浪费系统资源。
推荐阅读
Spring Cloud Stream 进阶配置——高吞吐量(一)——多消费者
Spring Cloud Stream 进阶配置——高吞吐量(二)——弹性消费者数量
Spring Cloud Stream 进阶配置——高吞吐量(三)——批量预取消息(prefetch)
网友评论