美文网首页
kafka消费者丢消息的原因分析

kafka消费者丢消息的原因分析

作者: Stalary | 来源:发表于2019-06-27 10:36 被阅读0次

    场景:

    通过kafka消费作业,存储到数据库中,当遇到错误信息时停止提交offset,关闭消费者,发送报警短信

    项目运行了很久,但是突然发现在关闭消费者后,有时候重启会遇到消息丢失的情况,于是从代码入手,查找原因,消费者代码如下

            @Override
            public void run() {
                // 构建kafka监控
                Thread thread = Thread.currentThread();
                ThreadHolder threadHolder = new ThreadHolder(this.id, this.name, thread.getState());
                THREAD_HOLDER_MAP.put(this.id, threadHolder);
                log.info("consumer task start, id = " + id);
                while (runnable) {
                    int partition = 0;
                    long offset = 0;
                    String key = null;
                    String topic = null;
                    String value = null;
                    try {
                        ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
                        for (ConsumerRecord<String, String> record : records) {
                            partition = record.partition();
                            offset = record.offset();
                            key = record.key();
                            topic = record.topic();
                            value = record.value();
                            log.info(String.format("topic:%s, partition:%d, offset:%d, key:%s, message:%s", topic, partition, offset, key, value));
                            consumer.process(record);
                            kafkaConsumer.commitAsync();
                        }
                    } catch (MyException e) {
                        // 如果是自定义的错误,就扔到redis上,保留以后处理
                        if (value != null) {
                            log.warn("error message:{}", value);
                            hashOperations.put(RedisKey.KAFKA_ERROR_KEY, CorrectUtils.join(topic, key, partition, offset), value);
                            kafkaConsumer.commitAsync();
                        }
                    } catch (Exception e) {
                        log.warn("process message failure!", e);
                        this.runnable = false;
                        kafkaConsumer.close();
                        THREAD_HOLDER_MAP.replace(this.id, new ThreadHolder(this.id, this.name, Thread.State.TERMINATED));
                        log.info("consumer task shutdown, id = " + id);
                    }
                }
            }
    

    下面开始对问题进行分析,根据经验,丢消息的原因一般是offset提交的不正确,所以从commitAsync()方法入手,根据源码进行分析

        /**
         * Commit offsets returned on the last {@link #poll(Duration)} for all the subscribed list of topics and partition.
         * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
         */
        @Override
        public void commitAsync() {
            commitAsync(null);
        }
    

    可以发现这个方法的含义是提交当前拉取的offset,但是由于是在ConsumerRecords循环中调用的所以会导致一个问题

    一次拉取了十条消息,第一次commit就将最大的offset提交,这样在消费到第二条消息如果遇到报错关闭消费者后,下次重启也会从第十一条开始消费,所以丢掉了未消费的九条消息,该问题只会出现在一次拉取多条的情况下

    然后开始思考解决办法,有如下几个方案

    1. 将commit放置到循环内,但是自行控制offset,每次+1,这样可以保证消息的不丢失不重复,但是会造成多次请求
    2. 将commit放置到循环外,同时在消费失败后,关闭消费者之前进行commit,offset为当前offset+已经消费成功的消息数量

    相关文章

      网友评论

          本文标题:kafka消费者丢消息的原因分析

          本文链接:https://www.haomeiwen.com/subject/zidlcctx.html