1、设置异常处理器
@Bean
public ConsumerFactory<String, String> kafkaConsumerFactory(
@Value("${spring.kafka.bootstrap-servers}") List<String> bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(
props, new StringDeserializer(), new StringDeserializer());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf,
@Value("${spring.kafka.bootstrap-servers}") List<String> bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaTemplate<>(pf, props);
}
@Bean
public CommonErrorHandler kafkaErrorHandler(KafkaTemplate<String, String> kafkaTemplate) {
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
// 间隔3秒,重试2次,共三次
BackOff backOff = new FixedBackOff(3000, 2);
return new DefaultErrorHandler(recoverer, backOff);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> kafkaConsumerFactory, CommonErrorHandler kafkaErrorHandler) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setCommonErrorHandler(kafkaErrorHandler);
return factory;
}
2、抛出需要补偿的异常
@KafkaListener(
id = "demoListener",
topics = "${topic}",
containerFactory = "kafkaListenerContainerFactory")
public void listen(String message, Acknowledgment ack) {
try {
/**
业务代码
**/
ack.acknowledge();
} catch(FeignException | SocketException | KafkaException e) {
log.error("process error, could be retry: {}", message, e);
throw e;
} catch (Exception e) {
log.error("process error, discard: {}", message, e);
ack.acknowledge();
}
}
3、死信队列
超出重试次数后,消息会被发往死信队列(topicName + .DLT),可以根据业务需要做进一步处理。
b20f1fc5-f866-43a2-b4f9-5ff8786dbbeb.jpeg
网友评论