美文网首页
kafka系列五:失败后重试机制

kafka系列五:失败后重试机制

作者: IT前沿技术分享 | 来源:发表于2024-07-11 15:59 被阅读0次

    MQ系列:
    kafka系列一: kafka简介
    kafka系列二: kafka部署
    kafka系列三: Spring kafka
    kafka系列四:动态添加监听器
    kafka系列五:失败后重试机制


    前言

    在 Spring Kafka 中,失败重试与死信队列的处理是关键功能,可以确保消息处理的可靠性和健壮性。当消费者处理消息失败时,可以配置重试机制,在重试多次后仍然失败时,将消息发送到死信队列进行处理。

    重试机制的用法

    springboot 中使用kafka消息失败重试机制非常便捷,关注 @RetryableTopic@DltHandler 两个注解即可。
    以下模拟处理失败的例子

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.annotation.RetryableTopic;
    import org.springframework.kafka.retrytopic.DltHandler;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
    import org.springframework.stereotype.Service;
    import org.springframework.retry.annotation.Backoff;
    
    @Service
    public class KafkaConsumerService {
    
        @RetryableTopic(
                attempts = "3",
                backoff = @Backoff(delay = 2000, multiplier = 2, maxDelay = 60000),
                dltStrategy = RetryableTopic.DltStrategy.FAIL_ON_ERROR,
                autoCreateTopics = "true"
        )
        @KafkaListener(topics = "my-topic", groupId = "${spring.kafka.consumer.group-id}")
        public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
            try {
                System.out.println("Received message: " + record.value());
                // 模拟异常
                if (shouldFail()) {
                    throw new RuntimeException("Simulated failure");
                }
                acknowledgment.acknowledge();
            } catch (Exception e) {
                throw e;
            }
        }
    
        private boolean shouldFail() {
            // 模拟处理失败的条件
            return true;
        }
    
        @DltHandler
        public void dltListen(ConsumerRecord<String, String> record) {
            System.out.println("Received message in DLT: " + record.value());
            // 可以在这里添加对死信消息的处理逻辑
        }
    }
    

    @RetryableTopic 注解:

    这个注解会将失败后重试的监听注册为被标注的方法,如例子中,my-topic的监听器和my-topic处理失败后重试的监听都是方法 listen()

    • attempts:指定重试次数。
    • backoff:配置重试的间隔和倍数。
    • dltStrategy:配置死信队列策略(DltStrategy.FAIL_ON_ERROR 表示处理失败时将消息发送到死信队列)。
    • autoCreateTopics:配置是否自动创建重试和死信队列的主题。

    @DltHandler 注解:

    用于标记处理死信队列消息的方法。
    死信队列的topic name根据原Topic(my-topic)在超过指定失败次数后自动生成新的Topic(my-topic.dlt),并且被 @DltHandler 标注的方法监听。

    自定义死信队列监听器

    通过注解@RetryableTopic 和 @DltHandler,可以非常便捷地整合失败重试机制到你的app中,但是,不够灵活。
    正如上篇文章提到的,动态增加新的Topic监听器时,如何引入对应的失败重试机制呢。
    在上次分析的基础上,解析@RetryableTopic注解时,通过ConcurrentMessageListenerContainer的setCommonErrorHandler的方法设置异常重试配置的

    
        /**
         * Set the {@link CommonErrorHandler} which can handle errors for both record
         * and batch listeners.
         * @param commonErrorHandler the handler.
         * @since 2.8
         */
        public void setCommonErrorHandler(@Nullable CommonErrorHandler commonErrorHandler) {
            this.commonErrorHandler = commonErrorHandler;
        }
    

    因此,我们需要定义一个返回实现CommonErrorHandler(DefaultErrorHandler ) 的方法

        /***
         *
         * @return
         */
        public DefaultErrorHandler errorHandler() {
    
            DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
    
            ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
            backOff.setInitialInterval(2000);
            backOff.setMultiplier(2);
            backOff.setMaxInterval(10000);
    
            return new DefaultErrorHandler(recoverer, backOff);
        }
    

    然后在动态添加新Topic监听器的方法中处理setCommonErrorHandler(errorHandler())

    
    /**
         * 添加 Topeic
         *
         * @param topic
         * @param groupId
         * @param isDeadLetter
         */
        public void addKafkaListener(String topic, String groupId, boolean isDeadLetter) {
            // kafka 消费者
            DefaultKafkaConsumerFactory<String, String> consumerFactory = consumerFactory(groupId);
    
            // 相关属性
            ContainerProperties props = new ContainerProperties(topic);
    
            // 设置监听器(区分死信队列与非死信队列)
            if (!isDeadLetter) {
                props.setMessageListener(new CustomerMsgErrorHandler());
            } else {
                props.setMessageListener(new DeadLetterHandler());
            }
    
            props.setGroupId(groupId);
            props.setAckMode(ContainerProperties.AckMode.MANUAL);
    
            ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, props);
    
            container.setCommonErrorHandler(errorHandler());
    
           // 非死信队列,则添加一个死信队列监听器
            if (!isDeadLetter) {
                addKafkaListener(topic + ".DLT", groupId, true);
            }
            container.start();
        }
    

    如果直接用 @DltHandler标注方法的方式,添加死信队列监听器,监听器无效,故,直接增加了一个监听"topic.dlt"的监听器,来处理死信队列。
    另外定义一个专门模拟异常的Hander和死信队列的Handkler

    @Slf4j
    public class CustomerMsgErrorHandler implements AcknowledgingMessageListener<String, String> {
    
        @Override
        public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
            // doSomething
            System.out.println("------------ onMessage topic " + data.topic() + ": " + data.value());
    
            // 模拟异常
            if (shouldFail()) {
                throw new RuntimeException("Simulated failure");
            }
    
            // 因为前面设置了手动提交ack的方式,这里需要在消息处理完成后提交ack
            acknowledgment.acknowledge();
        }
    
        private boolean shouldFail() {
            // 模拟处理失败的条件
            return true;
        }
    }
    
    @Slf4j
    public class DeadLetterHandler implements AcknowledgingMessageListener<String, String> {
    
        @Override
        public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
            // doSomething
            System.out.println("------------ DLT onMessage topic " + data.topic() + ": " + data.value());
    
            // 因为前面设置了手动提交ack的方式,这里需要在消息处理完成后提交ack
            acknowledgment.acknowledge();
        }
    }
    

    OK,这样就暂时实现了动态增加新Topic监听器的功能了,并且也用到了重试机制。

    总结

    上述配置展示了如何在 Spring Kafka 中实现失败重试与死信队列处理。通过配置 DefaultErrorHandler,可以设置重试机制和死信队列处理策略。在消费者监听器中处理消息时,如果出现异常,消息会根据配置的重试策略进行重试,多次重试失败后会被发送到死信队列。死信队列的监听器可以处理这些失败的消息,从而实现对异常消息的特殊处理。

    在实现动态增加新Topic监听器的功能时,虽然,已经按照配置去执行失败重试了,但是,并没有如意料中的那样,回调@DltHandler标注的死信队列监听器。估计是没有把相关对象托管到springboot容器中的原因,下次再仔细瞧瞧,解决这个问题。

    相关文章

      网友评论

          本文标题:kafka系列五:失败后重试机制

          本文链接:https://www.haomeiwen.com/subject/vxbwcjtx.html