美文网首页
kafka数据如何被重复消费

kafka数据如何被重复消费

作者: cartoony | 来源:发表于2020-09-13 23:01 被阅读0次

    近段时间学习极客时间李玥老师的后端存储实战课时,看到一个很多意思的东西:用kafka存储点击流的数据,并重复处理。在以往的使用中,kafka只是一个消息传输的载体,消息被消费后就不能再次消费。新知识与印象相冲突,于是就有了本篇文章:kafka数据如何被重复消费。

    前期理论了解

    首先我先去官网纠正了我对kafka的整体了解。

    image

    官网对kafka的描述是:一个分布式流平台。怪自己的学艺不精。

    其次,我重新看了一下kafka消费者的消费过程:kafka首先通过push/poll(默认为poll)获取消息,接收消息处理完成后手动/自动提交消费成功,kafka服务器则根据提交情况决定是否移动当前偏移量。

    方案确定

    kafka消费者读取数据的位置是通过偏移量判断,那如果我能将偏移量手动设置为起始位置,就能实现重复消费?这个有搞头。

    如何手动设置偏移量是关键。

    show me the code

    代码的关键主要在于偏移量设置 api 的调用,其余没什么特别。

    要注意的是,代码中我分别调用了作用不同的设置偏移量,仅作为展示,可按需取用。

    最后消费者消息消息时,我只使用默认的拉取条数设置消费一次,可按需进行修改。

    /**
     * repeat kafka message
     * @param host kafka host
     * @param groupId kafka consumer group id
     * @param autoCommit whether auto commit consume
     * @param topic consume topic
     * @param consumeTimeOut consume time out
    */
        private void textResetOffset(String host, String groupId, Boolean autoCommit, String topic, Long consumeTimeOut){
            //form a properties to new consumer
            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString());
            properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
            //subscribe incoming topic
            consumer.subscribe(Collections.singletonList(topic));
            //get consumer consume partitions
            List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
            List<TopicPartition> topicPartitions = new ArrayList<>();
            for(PartitionInfo partitionInfo : partitionInfos){
                TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                topicPartitions.add(topicPartition);
            }
            // poll data from kafka server to prevent lazy operation
            consumer.poll(Duration.ofSeconds(consumeTimeOut));
            //reset offset from beginning
            consumer.seekToBeginning(topicPartitions);
            //reset designated partition offset by designated spot
            int offset = 20;
            consumer.seek(topicPartitions.get(0), offset);
            //reset offset to end
            consumer.seekToEnd(topicPartitions);
            //consume message as usual
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
            while (iterator.hasNext()){
                ConsumerRecord<String, String> record = iterator.next();
                log.info("consume data: {}", record.value());
            }
        }
    
    运行结果
    image
    需注意的点

    在手动设置偏移量时,遇到了一个exception

    java.lang.IllegalStateException: No current assignment for partition test-0
    

    翻了一下stackoverflow以及官方文档后,才了解到设置偏移量是一个lazy operation,官网的解释如下。

    Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the first offset in all partitions only when poll(long) or position(TopicPartition) are called. If no partition is provided, seek to the first offset for all of the currently assigned partitions.

    于是我先进行一次 poll 操作后再设置偏移量。

        本文首发于 cartoon的博客
        转载请注明出处:https://cartoonyu.github.io/cartoon-blog/post/message-queue/kafka数据如何被重复消费/

    相关文章

      网友评论

          本文标题:kafka数据如何被重复消费

          本文链接:https://www.haomeiwen.com/subject/smtaektx.html