美文网首页Apache Kafka
Kafka消费者API总结

Kafka消费者API总结

作者: 森林和海洋 | 来源:发表于2020-04-16 19:18 被阅读0次

相对于Kafka的生产者API,消费者的API略显繁杂,本文总结了0.11.0版本的kafka消费者的几种消费模式,供大家参考,该版本的所有消费方法均在KafkaConsumer<K,V>中实现。

  1. 订阅主题(subscribe)消费, 自动提交offset
    最简单的消费模式, 参考代码如下 :
     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }

在这种情况下, kafka根据auto.commit.interval.ms参数周期性自动提交offset, 并将offset存储在自身的broker上.

  1. 订阅主题(subscribe)消费, 手动提交offset
    参考代码如下:
     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }

自动提交offset的弊端在于, 系统是根据poll方法, 而不是根据你的业务逻辑来判断消费是否完成, 所以存在着漏数据的可能性. 所以我们需要通过手动提交offset来保证在数据都处理完成后再提交offset, 这样就不会存在漏数据的可能了. 但是有可能存在重复消费, 手动提交offset可以实现"至少一次"的消费语义.
需要注意的是, 这里的consumer.commitSync()是将当前每个分区的offset全部提交, 如果你想针对针对每个分区进行单独提交, 也可以用commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets), 参考代码如下:

         while(running) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }
  1. 订阅分区(assign)消费, 手动/自动提交offset
    在前两种消费模式中, 我们并没有指定分区, 只需要指定消费的主题即可, 因为此时kafka会动态地根据消费者的数量去为消费者匹配合适的分区, 这种动态调整模式一般被称为"再平衡"(rebalance), 一般发生在某个消费组中有新的消费者加入时.
    而在这种assign分区模式下, 我们可以为某个消费者指定特定的分区, 从而实现更灵活的消费模式.参考代码如下:
     String topic = "foo";
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));

通过consumer.assign()指定分区后, 接下来同样循环调用consumer.poll()即可进行消费了.在这种模式下, kafka不会对该消费者进行动态管理, 因此为了避免offset提交冲突, 官方建议为每个消费者指定唯一的消费组.

  1. 自定义offset消费(seek)和外部数据源存储offset
    在以上三种消费模式中, 不论是指定主题(subscribe)和指定分区(assign), kafka都是根据自身存储的offset来确定消费的起始位置的, 而有的时候我们需要手动指定消费的起始offset, 这个时候可以通过consumer.seek(TopicPartition, Long)实现, 该方法会对offset进行更新, 并根据更新后的offset进行消费.
    外部数据源存储offset
    当kafka的broker自身存储offset时, 很难对消费数据处理和提交offset进行事务控制, 所以只能实现 "至多一次" 或者 "至少一次" 的消费语义. 而有的时候我们需要进行 "精准一次" 的消费语义, 这时候就可以将offset保存至外部数据源(例如mysql), 实现逻辑如下
    (1) 设置 enable.auto.commit=false
    (2) 将ConsumerRecord中的offset和你的数据处理结果一同存入到外部数据源, 并添加事务控制.
    (3) 在重启的时候读取offset, 并通过seek(TopicPartition,Long)重置offset
    需要注意的是, 在这种消费模式话, 最好采用指定分区(assign)的消费模式, 因为不存在分区的重新分配, 从而避免多个消费者修改同一分区offset导致的数据丢失. 而如果你采用了订阅主题(subscribe)模式, 你可以通过自定义ConsumerRebalanceListener进行控制, 避免冲突. 这种场景下, 我们在订阅时采用subscribe(Collection, ConsumerRebalanceListener), 此时当某个分区从消费者A被重新分配给消费者B时, 消费者A可以通过ConsumerRebalanceListener.onPartitionsRevoked(Collections)方法来提交offset, 而消费者B可以通过ConsumerRebalanceListener.onPartitionsAssigned(Collections)方法来查看offset
  2. 消费流控制
    在订阅主题模式下, 如果一个消费者被分配了多个分区, 那么该消费者对每个分区的消费优先级都是一致的. 如果我们想对某个分区的消费进行控制, 可以单独采用pause(Collection)resume(Collection)来停止/重启对某些分区的消费. 需要注意的是此方法只对订阅主题模式的消费起作用.

总结:

  1. kafka的消费API支持订阅主题(subscribe)和指定分区(assign), 同时还可以通过seek对offset进行重置, 进行灵活的消费控制.
  2. 提交offset的时候, 可以用consumer.commitSync()提交所有分区当前poll后的offset, 也可以用consumer.commitSync(offsets)来手动指定提交的offset, 当然也可以将offset存储在外部数据源, 配合seek实现 "精准一次" 的消费语义.
  3. 文章内容来源 https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

相关文章

网友评论

    本文标题:Kafka消费者API总结

    本文链接:https://www.haomeiwen.com/subject/banrvhtx.html