美文网首页KafkaJava 开发Spring 开发
Spring Kafka:Retry Topic、DLT 的使用

Spring Kafka:Retry Topic、DLT 的使用

作者: 殷天文 | 来源:发表于2022-08-17 22:26 被阅读0次

    1. 背景

    原生 Kafka 是不支持 Retry Topic 和 DLT (Dead Letter Topic,死信队列)。但是 Spring Kafka 在客户端实现了这两个功能。

    2. 版本

    spring-kafka 2.7.14(2.7.x 以下版本不支持 Retry Topic)

    3. 默认重试策略

    默认情况下,spring-kafka 在消费逻辑抛出异常时,会快速重试 10 次(无间隔时间),如果重试完成后,依旧消费失败,spring-kafka 会 commit 这条记录。

    默认重试的实现原理是:重置当前 consumer offset,感兴趣的同学可以在 SeekUtils#doSeeks debug 一下

    可以通过自定义 SeekToCurrentErrorHandler 来控制消费失败后的处理逻辑。例如:添加重试间隔,重试完成后依旧失败的消息发送到 DLT

    3.1. 自定义 SeekToCurrentErrorHandler

        @Bean
        public ErrorHandler errorHandler(KafkaTemplate<Object, Object> kafkaTemplate) {
            ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
            // 设置重试间隔 10秒 次数为 3次
            BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
            // 创建 SeekToCurrentErrorHandler 对象
            return new SeekToCurrentErrorHandler(recoverer, backOff);
        }
    

    添加上述代码后,消费逻辑抛出异常后,会间隔 10s 重试 3 次,重试后依旧失败,会将消息发送到 DLT

    关于默认重试策略,Kafka 的 TopicPartition 只会分配给一个消费者,而消费者对于某条消息的重试,会占用消费线程,影响整个 TopicPartition 的消费速度。如果使用 Retry Topic 功能,不会占用消费线程,会有专门的 retry 线程订阅 Retry Topic 执行重试消费。

    4. Retry Topic + DLT 使用

    可以通过注解和全局配置的方式开启 Retry Topic 功能

    4.1. @RetryableTopic

    使用注解的方式启用 Retry Topic,在 @KafkaListener 方法上添加 @RetryableTopic 即可

    @Slf4j
    @Component
    public class SimpleConsumer {
    
        @RetryableTopic()
        @KafkaListener(topics = "test_topic", groupId = "demo01-consumer-group-1")
        public void onMessage(MessageWrapper message) {
            log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
            throw new RuntimeException("test kafka exception");
        }
    
    }
    

    此时 Retry Topic 功能已经启用了。当消费逻辑抛出异常时,spring-kafka 会先将消息发送到 Retry Topic,随后在 Main Topic(对应上文的test_topic)中 commit 这条消息。会有专门的线程订阅 Retry Topic,不会影响正常消费

    默认重试 3 次,间隔为 1s,如果在重试结束后,还没有成功被消费,该消息会被发送到 DLT 中

    默认情况,消息被发送到死信队列后,会输出一条日志。

    2022-08-09 16:05:03.920  INFO 4048 --- [ner#3-dlt-0-C-1] o.s.k.retrytopic.RetryTopicConfigurer    : Received message in dlt listener: test_topic-dlt-0@233
    

    上述的日志输出是默认的死信订阅逻辑,用户可以在类中添加 @DltHandler 方法自定义死信消费逻辑

        @DltHandler
        public void processMessage(MessageWrapper message) {
            log.info("dlt {}", message);
        }
    

    至此,你的 Kafka 就拥有了类似 RocketMQ 的消息重试能力,但是配置方面还需要调整一下。

    4.2. 定制 @RetryableTopic

    可以自定义重试次数,延迟时间,死信策略等等,同时大部分参数还支持使用 Spring EL 表达式读取配置,这里简单列举下,更多的配置读者可以自行探索

        @RetryableTopic(
                attempts = "${kafka.retry.attempts}",
                backoff = @Backoff(delayExpression = "${kafka.retry.delay}", multiplierExpression = "${kafka.retry.multiplier}"),
                fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC
        )
        @KafkaListener(topics = "test_topic", groupId = "demo01-consumer-group-1")
        public void onMessage(MessageWrapper message) {
            log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
            throw new RuntimeException("test kafka exception");
        }
    

    解释一下上述的配置

    • attempts:重试次数
    • @Backoff delayExpression:消费延迟时间
    • @Backoff multiplierExpression:乘数。举个例子,第一次delay = 10s,如果 multiplier = 2,则下次 delay = 20s,以此类推,但是会有一个 maxDelay 作为延迟时间上限
    • fixedDelayTopicStrategy:可选策略包括:每次重试发送到单独的 Topic、只使用一个重试 Topic

    fixedDelayTopicStrategy 这个参数还是挺重要的,具体应该怎么选呢,我们稍后再说

    4.3. RetryTopicConfiguration

        @Bean
        public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
            return RetryTopicConfigurationBuilder
                    .newInstance()
                    .maxAttempts(4)
                    .fixedBackOff(15000)
                    .includeTopic("test_topic")
                    .create(template);
        }
    

    使用这个方式配置项基本和注解一样,如果你有多个需要配置重试的消费者,使用 RetryTopicConfiguration 的方式要比注解方式更简单

    5. 源码解析

    5.1. 延迟重试怎么实现的

    延迟重试这个功能应该分为两步

    1. 将需要重试的消息发送到 Retry Topic
    2. Retry Topic 的订阅者延迟消费

    非常遗憾的是,Kafka 并没有延迟消息这样的功能,所以这个延迟消费也是 spring-kafka 自己实现的,不得不说这个组件真的下了很多功夫

    接下来聊聊延迟重试的实现原理

    5.1.1. 延迟消息标识

    消息发送到 Retry Topic 这个步骤,感兴趣的同学可以 debug 一下 SeekToCurrentErrorHandler#handle 这里就不详细说了

    每个需要被重试的消息,都会被添加 retry_topic-backoff-timestamp 这个 header,这个值代表这个消息的期望执行时间

    开启了重试功能的 KafkaListener,在执行消费逻辑前,会先执行KafkaBackoffAwareMessageListenerAdapter#onMessage,该方法会先对消息进行检查

    KafkaBackoffAwareMessageListenerAdapter#onMessage

    这部分逻辑是:

    1. 首先检查 consumerRecord 是否包含 retry_topic-backoff-timestamp,如果有则进入步骤2
    2. 现在时间是否达到了期望执行时间,if ( nowTime > executeTime ) 该方法什么也不做,程序会立刻执行消费逻辑
    3. 未达到期望执行时间,准备暂停消费者对当前 TopicPartition 的消费,但是并不是在这里完成的,这个方法内部只是记录了一下需要暂停的 TopicPartition(这个数据存储在 KafkaMessageListenerContainer 的 pauseRequestedPartitions 中),并在 PartitionPausingBackoffManager 中存储了 BackOffContext,随后抛出一个异常打断消费流程

    5.1.2. 暂停分区

    只要 Kafka 消费线程还在运行,就会无限调用 KafkaMessageListenerContainer#pollAndInvoke

    KafkaMessageListenerContainer#pollAndInvoke

    pollAndInvoke 中 pausePartitionsIfNecessary 方法会根据 KafkaMessageListenerContainer 中存储的 pauseRequestedPartitions 暂停 partition,使用的方法是 Kafka Client 的 consumer.pause

    调用 consumer.pause 之后,之后调用 consumer.poll 不会返回任何数据,直到调用 resume 恢复消费。该方法不会造成 Rebalance

    5.1.3. 恢复分区

    有了上面暂停消费的逻辑,还得有对应的恢复消费才能实现“延迟消费”,下面来看下恢复消费的逻辑

    KafkaMessageListenerContainer#checkIdlePartition

    KafkaMessageListenerContainer#checkIdlePartition 方法会不断地检查 partition 是否空闲(长时间未拉取到消息)。如果符合了空闲 partition 的标准,则发送事件 ListenerContainerPartitionIdleEvent

    PartitionPausingBackoffManager

    PartitionPausingBackoffManager 监听该事件,并尝试查找该 TopicPartition 是否存在 BackOffContext。存在则代表该分区被暂停,如果时间条件满足,从 KafkaMessageListenerContainer 的 pauseRequestedPartitions 删除该分区

    KafkaMessageListenerContainer#resumePartitionsIfNecessary

    最后 KafkaMessageListenerContainer#resumePartitionsIfNecessary 会将“已被 Kafka Consumer 暂停”但是“不存在于 KafkaMessageListenerContainer 的 pauseRequestedPartitions 的分区”恢复消费(通过 consumer.resume

    5.1.4. 小结

    画一张图来总结一下 Retry Topic 的执行流程

    这里补充说明一下

    • 其实 MAIN_TOPIC 和 RETRY TOPIC 执行的代码是完全相同的,上图只是为了更好的让大家理解 Retry Topic 的流程
    • 本身 Kafka 消费流程是一个无限循环

    5.2. 关于 Retry Topic 策略

    下面详细说说 Topic 策略这个事

    5.2.1. FixedDelayStrategy.MULTIPLE_TOPICS

    test_topic 为例,此时我 attempts = 3, delay=10, multiplier=2,会额外创建以下三个 Topic

    • test_topic-retry-0
    • test_topic-retry-1
    • test_topic-dlt

    第一次消费失败,会发送到 test_topic-retry-0,消息延迟为 10s
    第二次消费失败,会发送到 test_topic-retry-1,消息延迟为 20s
    第三次消费失败,会发送到 test_topic-dlt

    此时每个 Retry Topic 中的消息延迟时间是相同的,在消费时间可控的情况下,消息延迟的时间不会有过大的偏差

    该策略的缺点就是,使用了过多的 Topic,但是可以实现重试时间指数级上升

    5.2.2. FixedDelayStrategy.SINGLE_TOPIC

    延迟时间固定的情况适合使用 SINGLE_TOPIC 策略,该策略下只有一个 Retry Topic。如果 SINGLE_TOPIC 延迟时间指数级增长的话,很可能出现的问题是,第一条消息第三次重试延迟时间为 30s,第二条消息第一次重试延迟时间为 10s,两条消息被分配到同一分区,这二条消息被迫在 40s 之后才能重试

    补充:如何使用多个 retry 线程

    默认情况下,Main Topic,每个 Retry Topic,DLT 分别有 1 个消费线程,默认情况下 Retry 和 DLT 会使用 KafkaListener 提供的 ContainerFactory 初始化。

    例如我把 KafkaListener concurrency 设置为 4。此时 Retry Topic,每个 Retry Topic,DLT 分别有 4 个消费线程

    也可以自定义 Retry Topic 消费者使用的 ContainerFactory

    spring-kafka 相关 demo

    https://github.com/TavenYin/taven-springboot-learning/tree/master/springboot-kafka

    参考

    相关文章

      网友评论

        本文标题:Spring Kafka:Retry Topic、DLT 的使用

        本文链接:https://www.haomeiwen.com/subject/yqmpwrtx.html