至少一次消费
我们的场景是保证至少一次消费,不能丢失数据。
出现的问题:
我们在一个 kafkaListener 里面 batch 监听数据,调用 一个API 进行消费,手动提交offset。
由于流量爆发性的问题或者网络问题,api 有时候会超时。
假设有三个批次, A, B,C。
B批次处理的时候,超时抛出了异常。
这时候,如果没有指定error handler的话,B 批次的数据就会丢失。
kafka 再 poll C的时候,不会因为 B 的 offset 没有commit 就从 offset A来poll。
因为kafka的offset下标的记录实际会有两份,服务端会自己记录一份,本地的消费者客户端也会记录一份,提交的offset会告诉服务端已经消费到这了,但是本地的并不会因此而改变offset进行再次消费。
所以本地的offset,记录的是B 的offset。 poll C的时候会冲 B的offset 开始。也就是程序会从本地内存维护的这个offset作为基准。
而不是服务端。 只有当consumer 重启的时候,才会参照 服务器端的Offset。
这个主要是给予客户端更灵活的控制机制,实现自定义的offset 拉取。
那么在我们的场景中,需要做的就是 下游异常的时候,我们重试,从新消费。所以指定了一个 error handler。
在下游消费异常的时候,就 从 上一个 提交的Offset开始拉取,达到重试的效果。
@Bean
public ConsumerAwareListenerErrorHandler indicConsumerErrorHandler() {
return (m, e, c) -> {
log.error("Indic consumerError, retry in kafka", e);
List<ConsumerRecord> recs = (List<ConsumerRecord>)m.getPayload();
List<String> topics = recs.stream().map(r->r.topic()).collect(Collectors.toList());
List<Integer> partitions = recs.stream().map(r->r.partition()).collect(Collectors.toList());
List<Long> offsets = recs.stream().map(r->r.offset()).collect(Collectors.toList());
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
for (int i = 0; i < topics.size(); i++) {
int index = i;
offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
(k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
}
offsetsToReset.forEach((k, v) -> c.seek(k, v));
return null;
};
}
网友评论