美文网首页
spring-kafka异常消费补偿

spring-kafka异常消费补偿

作者: 伊丽莎白菜 | 来源:发表于2022-09-23 17:48 被阅读0次

1、设置异常处理器

  @Bean
  public ConsumerFactory<String, String> kafkaConsumerFactory(
    @Value("${spring.kafka.bootstrap-servers}") List<String> bootstrapServers) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

    return new DefaultKafkaConsumerFactory<>(
        props, new StringDeserializer(), new StringDeserializer());
  }

  @Bean
  public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf,
    @Value("${spring.kafka.bootstrap-servers}") List<String> bootstrapServers) {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new KafkaTemplate<>(pf, props);
  }

  @Bean
  public CommonErrorHandler kafkaErrorHandler(KafkaTemplate<String, String> kafkaTemplate) {
    ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
    // 间隔3秒,重试2次,共三次
    BackOff backOff = new FixedBackOff(3000, 2);
    return new DefaultErrorHandler(recoverer, backOff);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
      ConsumerFactory<String, String> kafkaConsumerFactory, CommonErrorHandler kafkaErrorHandler) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(kafkaConsumerFactory);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.setCommonErrorHandler(kafkaErrorHandler);
    return factory;
  }

2、抛出需要补偿的异常

  @KafkaListener(
      id = "demoListener",
      topics = "${topic}",
      containerFactory = "kafkaListenerContainerFactory")
  public void listen(String message, Acknowledgment ack) {
   try {
      /**
      业务代码
      **/
      ack.acknowledge();
    } catch(FeignException | SocketException | KafkaException e) {
      log.error("process error, could be retry: {}", message, e);
      throw e;
    } catch (Exception e) {
      log.error("process error, discard: {}", message, e);
      ack.acknowledge();
    } 
  }

3、死信队列

超出重试次数后,消息会被发往死信队列(topicName + .DLT),可以根据业务需要做进一步处理。

b20f1fc5-f866-43a2-b4f9-5ff8786dbbeb.jpeg

相关文章

网友评论

      本文标题:spring-kafka异常消费补偿

      本文链接:https://www.haomeiwen.com/subject/nimnortx.html