1. 背景
原生 Kafka 是不支持 Retry Topic 和 DLT (Dead Letter Topic,死信队列)。但是 Spring Kafka 在客户端实现了这两个功能。
2. 版本
spring-kafka 2.7.14(2.7.x 以下版本不支持 Retry Topic)
3. 默认重试策略
默认情况下,spring-kafka 在消费逻辑抛出异常时,会快速重试 10 次(无间隔时间),如果重试完成后,依旧消费失败,spring-kafka 会 commit 这条记录。
默认重试的实现原理是:重置当前 consumer offset,感兴趣的同学可以在
SeekUtils#doSeeks
debug 一下
可以通过自定义 SeekToCurrentErrorHandler 来控制消费失败后的处理逻辑。例如:添加重试间隔,重试完成后依旧失败的消息发送到 DLT
3.1. 自定义 SeekToCurrentErrorHandler
@Bean
public ErrorHandler errorHandler(KafkaTemplate<Object, Object> kafkaTemplate) {
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
// 设置重试间隔 10秒 次数为 3次
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
// 创建 SeekToCurrentErrorHandler 对象
return new SeekToCurrentErrorHandler(recoverer, backOff);
}
添加上述代码后,消费逻辑抛出异常后,会间隔 10s 重试 3 次,重试后依旧失败,会将消息发送到 DLT
关于默认重试策略,Kafka 的 TopicPartition 只会分配给一个消费者,而消费者对于某条消息的重试,会占用消费线程,影响整个 TopicPartition 的消费速度。如果使用 Retry Topic 功能,不会占用消费线程,会有专门的 retry 线程订阅 Retry Topic 执行重试消费。
4. Retry Topic + DLT 使用
可以通过注解和全局配置的方式开启 Retry Topic 功能
4.1. @RetryableTopic
使用注解的方式启用 Retry Topic,在 @KafkaListener
方法上添加 @RetryableTopic
即可
@Slf4j
@Component
public class SimpleConsumer {
@RetryableTopic()
@KafkaListener(topics = "test_topic", groupId = "demo01-consumer-group-1")
public void onMessage(MessageWrapper message) {
log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
throw new RuntimeException("test kafka exception");
}
}
此时 Retry Topic 功能已经启用了。当消费逻辑抛出异常时,spring-kafka 会先将消息发送到 Retry Topic,随后在 Main Topic(对应上文的test_topic
)中 commit 这条消息。会有专门的线程订阅 Retry Topic,不会影响正常消费
默认重试 3 次,间隔为 1s,如果在重试结束后,还没有成功被消费,该消息会被发送到 DLT 中
默认情况,消息被发送到死信队列后,会输出一条日志。
2022-08-09 16:05:03.920 INFO 4048 --- [ner#3-dlt-0-C-1] o.s.k.retrytopic.RetryTopicConfigurer : Received message in dlt listener: test_topic-dlt-0@233
上述的日志输出是默认的死信订阅逻辑,用户可以在类中添加 @DltHandler 方法自定义死信消费逻辑
@DltHandler
public void processMessage(MessageWrapper message) {
log.info("dlt {}", message);
}
至此,你的 Kafka 就拥有了类似 RocketMQ 的消息重试能力,但是配置方面还需要调整一下。
4.2. 定制 @RetryableTopic
可以自定义重试次数,延迟时间,死信策略等等,同时大部分参数还支持使用 Spring EL 表达式读取配置,这里简单列举下,更多的配置读者可以自行探索
@RetryableTopic(
attempts = "${kafka.retry.attempts}",
backoff = @Backoff(delayExpression = "${kafka.retry.delay}", multiplierExpression = "${kafka.retry.multiplier}"),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC
)
@KafkaListener(topics = "test_topic", groupId = "demo01-consumer-group-1")
public void onMessage(MessageWrapper message) {
log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
throw new RuntimeException("test kafka exception");
}
解释一下上述的配置
- attempts:重试次数
- @Backoff delayExpression:消费延迟时间
- @Backoff multiplierExpression:乘数。举个例子,第一次delay = 10s,如果
multiplier = 2
,则下次 delay = 20s,以此类推,但是会有一个 maxDelay 作为延迟时间上限 - fixedDelayTopicStrategy:可选策略包括:每次重试发送到单独的 Topic、只使用一个重试 Topic
fixedDelayTopicStrategy 这个参数还是挺重要的,具体应该怎么选呢,我们稍后再说
4.3. RetryTopicConfiguration
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.maxAttempts(4)
.fixedBackOff(15000)
.includeTopic("test_topic")
.create(template);
}
使用这个方式配置项基本和注解一样,如果你有多个需要配置重试的消费者,使用 RetryTopicConfiguration 的方式要比注解方式更简单
5. 源码解析
5.1. 延迟重试怎么实现的
延迟重试这个功能应该分为两步
- 将需要重试的消息发送到 Retry Topic
- Retry Topic 的订阅者延迟消费
非常遗憾的是,Kafka 并没有延迟消息这样的功能,所以这个延迟消费也是 spring-kafka 自己实现的,不得不说这个组件真的下了很多功夫
接下来聊聊延迟重试的实现原理
5.1.1. 延迟消息标识
消息发送到 Retry Topic 这个步骤,感兴趣的同学可以 debug 一下 SeekToCurrentErrorHandler#handle
这里就不详细说了
每个需要被重试的消息,都会被添加 retry_topic-backoff-timestamp
这个 header,这个值代表这个消息的期望执行时间
开启了重试功能的 KafkaListener,在执行消费逻辑前,会先执行KafkaBackoffAwareMessageListenerAdapter#onMessage
,该方法会先对消息进行检查
这部分逻辑是:
- 首先检查 consumerRecord 是否包含
retry_topic-backoff-timestamp
,如果有则进入步骤2 - 现在时间是否达到了期望执行时间,if ( nowTime > executeTime ) 该方法什么也不做,程序会立刻执行消费逻辑
- 未达到期望执行时间,准备暂停消费者对当前 TopicPartition 的消费,但是并不是在这里完成的,这个方法内部只是记录了一下需要暂停的 TopicPartition(这个数据存储在 KafkaMessageListenerContainer 的
pauseRequestedPartitions
中),并在 PartitionPausingBackoffManager 中存储了BackOffContext
,随后抛出一个异常打断消费流程
5.1.2. 暂停分区
只要 Kafka 消费线程还在运行,就会无限调用 KafkaMessageListenerContainer#pollAndInvoke
pollAndInvoke 中 pausePartitionsIfNecessary 方法会根据 KafkaMessageListenerContainer 中存储的 pauseRequestedPartitions
暂停 partition,使用的方法是 Kafka Client 的 consumer.pause
调用
consumer.pause
之后,之后调用consumer.poll
不会返回任何数据,直到调用resume
恢复消费。该方法不会造成 Rebalance
5.1.3. 恢复分区
有了上面暂停消费的逻辑,还得有对应的恢复消费才能实现“延迟消费”,下面来看下恢复消费的逻辑
KafkaMessageListenerContainer#checkIdlePartitionKafkaMessageListenerContainer#checkIdlePartition
方法会不断地检查 partition 是否空闲(长时间未拉取到消息)。如果符合了空闲 partition 的标准,则发送事件 ListenerContainerPartitionIdleEvent
。
PartitionPausingBackoffManager
监听该事件,并尝试查找该 TopicPartition 是否存在 BackOffContext。存在则代表该分区被暂停,如果时间条件满足,从 KafkaMessageListenerContainer 的 pauseRequestedPartitions
删除该分区
最后 KafkaMessageListenerContainer#resumePartitionsIfNecessary
会将“已被 Kafka Consumer 暂停”但是“不存在于 KafkaMessageListenerContainer 的 pauseRequestedPartitions
的分区”恢复消费(通过 consumer.resume
)
5.1.4. 小结
画一张图来总结一下 Retry Topic 的执行流程
这里补充说明一下
- 其实 MAIN_TOPIC 和 RETRY TOPIC 执行的代码是完全相同的,上图只是为了更好的让大家理解 Retry Topic 的流程
- 本身 Kafka 消费流程是一个无限循环
5.2. 关于 Retry Topic 策略
下面详细说说 Topic 策略这个事
5.2.1. FixedDelayStrategy.MULTIPLE_TOPICS
以 test_topic
为例,此时我 attempts = 3, delay=10, multiplier=2
,会额外创建以下三个 Topic
- test_topic-retry-0
- test_topic-retry-1
- test_topic-dlt
第一次消费失败,会发送到 test_topic-retry-0,消息延迟为 10s
第二次消费失败,会发送到 test_topic-retry-1,消息延迟为 20s
第三次消费失败,会发送到 test_topic-dlt
此时每个 Retry Topic 中的消息延迟时间是相同的,在消费时间可控的情况下,消息延迟的时间不会有过大的偏差
该策略的缺点就是,使用了过多的 Topic,但是可以实现重试时间指数级上升
5.2.2. FixedDelayStrategy.SINGLE_TOPIC
延迟时间固定的情况适合使用 SINGLE_TOPIC 策略,该策略下只有一个 Retry Topic。如果 SINGLE_TOPIC 延迟时间指数级增长的话,很可能出现的问题是,第一条消息第三次重试延迟时间为 30s,第二条消息第一次重试延迟时间为 10s,两条消息被分配到同一分区,这二条消息被迫在 40s 之后才能重试
补充:如何使用多个 retry 线程
默认情况下,Main Topic,每个 Retry Topic,DLT 分别有 1 个消费线程,默认情况下 Retry 和 DLT 会使用 KafkaListener 提供的 ContainerFactory 初始化。
例如我把 KafkaListener concurrency 设置为 4。此时 Retry Topic,每个 Retry Topic,DLT 分别有 4 个消费线程
也可以自定义 Retry Topic 消费者使用的 ContainerFactory
spring-kafka 相关 demo
https://github.com/TavenYin/taven-springboot-learning/tree/master/springboot-kafka
参考
- spring kafka 最新版本 => https://docs.spring.io/spring-kafka/docs/current/reference/html/#retry-config
- spring kafka 2.7.14 => https://docs.spring.io/spring-kafka/docs/2.7.14/reference/html/#retry-topic
网友评论