美文网首页
spring-kafka异常消费补偿

spring-kafka异常消费补偿

作者: 伊丽莎白菜 | 来源:发表于2022-09-23 17:48 被阅读0次

    1、设置异常处理器

      @Bean
      public ConsumerFactory<String, String> kafkaConsumerFactory(
        @Value("${spring.kafka.bootstrap-servers}") List<String> bootstrapServers) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
        return new DefaultKafkaConsumerFactory<>(
            props, new StringDeserializer(), new StringDeserializer());
      }
    
      @Bean
      public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf,
        @Value("${spring.kafka.bootstrap-servers}") List<String> bootstrapServers) {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new KafkaTemplate<>(pf, props);
      }
    
      @Bean
      public CommonErrorHandler kafkaErrorHandler(KafkaTemplate<String, String> kafkaTemplate) {
        ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
        // 间隔3秒,重试2次,共三次
        BackOff backOff = new FixedBackOff(3000, 2);
        return new DefaultErrorHandler(recoverer, backOff);
      }
    
      @Bean
      public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
          ConsumerFactory<String, String> kafkaConsumerFactory, CommonErrorHandler kafkaErrorHandler) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setCommonErrorHandler(kafkaErrorHandler);
        return factory;
      }
    

    2、抛出需要补偿的异常

      @KafkaListener(
          id = "demoListener",
          topics = "${topic}",
          containerFactory = "kafkaListenerContainerFactory")
      public void listen(String message, Acknowledgment ack) {
       try {
          /**
          业务代码
          **/
          ack.acknowledge();
        } catch(FeignException | SocketException | KafkaException e) {
          log.error("process error, could be retry: {}", message, e);
          throw e;
        } catch (Exception e) {
          log.error("process error, discard: {}", message, e);
          ack.acknowledge();
        } 
      }
    

    3、死信队列

    超出重试次数后,消息会被发往死信队列(topicName + .DLT),可以根据业务需要做进一步处理。

    b20f1fc5-f866-43a2-b4f9-5ff8786dbbeb.jpeg

    相关文章

      网友评论

          本文标题:spring-kafka异常消费补偿

          本文链接:https://www.haomeiwen.com/subject/nimnortx.html