美文网首页
spring+kafka 异常处理

spring+kafka 异常处理

作者: NazgulSun | 来源:发表于2021-03-29 16:56 被阅读0次

    至少一次消费

    我们的场景是保证至少一次消费,不能丢失数据。
    出现的问题:
    我们在一个 kafkaListener 里面 batch 监听数据,调用 一个API 进行消费,手动提交offset。
    由于流量爆发性的问题或者网络问题,api 有时候会超时。

    假设有三个批次, A, B,C。
    B批次处理的时候,超时抛出了异常。
    这时候,如果没有指定error handler的话,B 批次的数据就会丢失。

    kafka 再 poll C的时候,不会因为 B 的 offset 没有commit 就从 offset A来poll。

    因为kafka的offset下标的记录实际会有两份,服务端会自己记录一份,本地的消费者客户端也会记录一份,提交的offset会告诉服务端已经消费到这了,但是本地的并不会因此而改变offset进行再次消费。

    所以本地的offset,记录的是B 的offset。 poll C的时候会冲 B的offset 开始。也就是程序会从本地内存维护的这个offset作为基准。
    而不是服务端。 只有当consumer 重启的时候,才会参照 服务器端的Offset。

    这个主要是给予客户端更灵活的控制机制,实现自定义的offset 拉取。

    那么在我们的场景中,需要做的就是 下游异常的时候,我们重试,从新消费。所以指定了一个 error handler。
    在下游消费异常的时候,就 从 上一个 提交的Offset开始拉取,达到重试的效果。

        @Bean
        public ConsumerAwareListenerErrorHandler indicConsumerErrorHandler() {
            return (m, e, c) -> {
                log.error("Indic consumerError, retry in kafka", e);
                List<ConsumerRecord> recs = (List<ConsumerRecord>)m.getPayload();
                List<String> topics = recs.stream().map(r->r.topic()).collect(Collectors.toList());
                List<Integer> partitions = recs.stream().map(r->r.partition()).collect(Collectors.toList());
                List<Long> offsets = recs.stream().map(r->r.offset()).collect(Collectors.toList());
                Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
                for (int i = 0; i < topics.size(); i++) {
                    int index = i;
                    offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
                            (k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
                }
                offsetsToReset.forEach((k, v) -> c.seek(k, v));
                return null;
            };
        }
    

    相关文章

      网友评论

          本文标题:spring+kafka 异常处理

          本文链接:https://www.haomeiwen.com/subject/jvbbhltx.html