美文网首页
spring+kafka 异常处理

spring+kafka 异常处理

作者: NazgulSun | 来源:发表于2021-03-29 16:56 被阅读0次

至少一次消费

我们的场景是保证至少一次消费,不能丢失数据。
出现的问题:
我们在一个 kafkaListener 里面 batch 监听数据,调用 一个API 进行消费,手动提交offset。
由于流量爆发性的问题或者网络问题,api 有时候会超时。

假设有三个批次, A, B,C。
B批次处理的时候,超时抛出了异常。
这时候,如果没有指定error handler的话,B 批次的数据就会丢失。

kafka 再 poll C的时候,不会因为 B 的 offset 没有commit 就从 offset A来poll。

因为kafka的offset下标的记录实际会有两份,服务端会自己记录一份,本地的消费者客户端也会记录一份,提交的offset会告诉服务端已经消费到这了,但是本地的并不会因此而改变offset进行再次消费。

所以本地的offset,记录的是B 的offset。 poll C的时候会冲 B的offset 开始。也就是程序会从本地内存维护的这个offset作为基准。
而不是服务端。 只有当consumer 重启的时候,才会参照 服务器端的Offset。

这个主要是给予客户端更灵活的控制机制,实现自定义的offset 拉取。

那么在我们的场景中,需要做的就是 下游异常的时候,我们重试,从新消费。所以指定了一个 error handler。
在下游消费异常的时候,就 从 上一个 提交的Offset开始拉取,达到重试的效果。

    @Bean
    public ConsumerAwareListenerErrorHandler indicConsumerErrorHandler() {
        return (m, e, c) -> {
            log.error("Indic consumerError, retry in kafka", e);
            List<ConsumerRecord> recs = (List<ConsumerRecord>)m.getPayload();
            List<String> topics = recs.stream().map(r->r.topic()).collect(Collectors.toList());
            List<Integer> partitions = recs.stream().map(r->r.partition()).collect(Collectors.toList());
            List<Long> offsets = recs.stream().map(r->r.offset()).collect(Collectors.toList());
            Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
            for (int i = 0; i < topics.size(); i++) {
                int index = i;
                offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
                        (k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
            }
            offsetsToReset.forEach((k, v) -> c.seek(k, v));
            return null;
        };
    }

相关文章

  • spring+kafka 异常处理

    至少一次消费 我们的场景是保证至少一次消费,不能丢失数据。出现的问题:我们在一个 kafkaListener 里面...

  • 同步或异步异常处理

    同步或异步异常处理 同步读取异常处理 异步读取文件异常处理

  • Java 异常

    异常处理机制 异常处理模型:终止模型:当异常发生时,就进入异常处理程序,处理结束并不返回异常发生位置继续执行;恢复...

  • 第十二章:异常处理

    异常处理语法格式: try: ...处理语句 except 异常类型: ...异常处理语句 pytho...

  • Spring MVC 全局统一异常处理(注解方式)

    全局异常处理 对异常处理类增加@ControllerAdvice 增加异常处理方法,使用@ExceptionHan...

  • 异常处理设计文档

    一、异常处理流程说明 在ESB处理过程中,若发生异常,将进入异常流程的处理。一个异常处理流程大致如下: ESB处理...

  • Java异常处理机制

    什么是异常处理机制: 异常处理机制: 让程序发生异常时,按照代码预先设定的异常处理逻辑,针对性地处理异常,让程序尽...

  • ASP .NET Core Web API_05_异常处理

    默认异常处理 自定义异常处理

  • springboot 异常捕获和处理

    springboot 异常捕获和处理 异常捕获处理

  • NDK开发---C++学习(七):异常

    前言 C++的异常处理机制能够将异常检测与异常处理分离开来,当异常发生时,它能自动调用异常处理程序进行错误处理。把...

网友评论

      本文标题:spring+kafka 异常处理

      本文链接:https://www.haomeiwen.com/subject/jvbbhltx.html