美文网首页
Kafka的三种消费模式

Kafka的三种消费模式

作者: cefa6a30d1c3 | 来源:发表于2019-07-24 23:13 被阅读0次

自动提交offset

以下实例代码展示了如何自动提交topic的offset:

public void autoOffsetCommit() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Arrays.asList("foo", "bar"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

Properties的实例props中存放的key意义:
1)bootstrap.servers表示要连接的Kafka集群中的节点,其中9092表示端口号;
2)enable.auto.commit为true,表示在auto.commit.interval.ms时间后会自动提交topic的offset,其中auto.commit.interval.ms默认值为5000ms;
3)其中foo和bar为要消费的topic名称,由group.id为test作为consumer group统一进行管理;
4)key.deserializer和value.deserializer表示指定将字节序列化为对象。

手动提交offset

生产环境中,需要在数据消费完全后再提交offset,也就是说在数据从kafka的topic取出来后并被逻辑处理后,才算是数据被消费掉,此时需要手动去提交topic的offset。

以下实例代码展示了如何手动提交topic的offset:

public void manualOffsetCommit() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.Stirng.Deserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Arrays.asList("foo", "bar"));
    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
        }

        if (buffer.size() >= minBatchSize) {
            // operation to handle data
            consumer.commitSync();
            buffer.clear();
        }
    }
}

本方案的缺点是必须保证所有数据被处理后,才提交topic的offset。为避免数据的重复消费,可以用第三种方案,根据每个partition的数据消费情况进行提交,称之为“at-least-once”。

手动提交partition的offset

以下实例代码展示了如何手动提交topic中每一partition的offset:

public void manualOffsetCommitOfPartition() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.Stirng.Deserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Arrays.asList("foo", "bar"));

    boolean running = true;
    try {
        while (running) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    System.out.println(record.offset() + " : " + record.value());
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    } finally {
        consumer.close();
    }
}

引: https://blog.csdn.net/amazon2006/article/details/84896906

相关文章

  • Kafka_核心

    kafka集群 Kafka的设计都是为了实现kafak消息队列消费数据的语义Kafka消息队列中数据消费的三种语义...

  • Kafka的三种消费模式

    自动提交offset 以下实例代码展示了如何自动提交topic的offset: Properties的实例prop...

  • 2020-04-21springboot2.x rabbitmq

    之前使用kafka只有2种模式1.生产者消费者2.发布订阅 而ribbitmq却有三种模式 fanout,topi...

  • Flink kafka source源码解析(四)

    指定offset消费 消费模式 在flink的kafka source中有以下5种模式指定offset消费 默认为...

  • Kafka消费模式

    消息队列的两种模式 1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除) 消息生产者生产消息发送到Qu...

  • Kafka消费者API总结

    相对于Kafka的生产者API,消费者的API略显繁杂,本文总结了0.11.0版本的kafka消费者的几种消费模式...

  • 四 Kafka消费者 概念及Java API

    消费者 1. 消费方式 采取pull的模式从kafka集群获取消息。 push的模式很难适用于消费速率不同的消费者...

  • 探究kafka——概念篇

    kafka的基本概念 kafka特点1:是基于发布订阅模式,而非pear-pear模式,消费者可以有多个,实质是一...

  • kafka——消费者原理解析

    一、消费方式 kafka采用发布订阅模式:一对多。发布订阅模式又分两种: push(推)模式很难适应消费速率不同的...

  • kafka 基于Partition 和group 消费,kafk

    kafka消费模式 基于partition 指定offset 基于group auto.offset.reset ...

网友评论

      本文标题:Kafka的三种消费模式

      本文链接:https://www.haomeiwen.com/subject/zcyqrctx.html