美文网首页
kafka Java API中commit offset与par

kafka Java API中commit offset与par

作者: 柚子过来 | 来源:发表于2018-01-23 14:53 被阅读0次

    offset commit是在Consumer端进行的操作,将下一次消费的位置(本次poll/准确的说是fetch?的最大record的后一位)commit到服务器。有两种commit方式:自动提交与手动提交。

    自动提交:

    设置参数 props.put("enable.auto.commit", "true");开启自动提交,这样在执行poll命令后会立即将下一个offset提交至服务器。

    自动提交存在消费数据的遗漏问题,具体的需要先解释下poll函数的过程:

    Consumer读取partition中的数据是通过调用发起一个fetch请求来执行的。而从KafkaConsumer来看,它有一个poll方法。但是这个poll方法只是可能会发起fetch请求。原因是:Consumer每次发起fetch请求时,读取到的数据是有限制的,通过配置项max.partition.fetch.bytes来限制的。而在执行poll方法时,会根据配置项个max.poll.records来限制一次最多poll多少个record。那么就可能出现这样的情况: 在满足max.partition.fetch.bytes限制的情况下,假如fetch到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么KafkaConsumer就需要执行7次poll方法才能将这一次通过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次pool中15个record,最后一次是poll出10个record。
    另外,在consumer中,还有一个配置项:max.poll.interval.ms ,它表示最大的poll数据间隔,如果超过这个间隔没有发起poll请求,就会将该consumer退出consumer group。所以为了不使Consumer 自己被退出,Consumer 应该不停的发起poll(timeout)操作。而这个动作 KafkaConsumer Client是不会帮我们做的,这就需要自己在程序中不停的调用poll方法。

    那么自动提交可能遇到两种情况:

    1、如果consumer因为某些原因被退出了group,但是它所fetch的数据还没有被它poll完消费掉。那么下一个Consumer接替他,执行fetch的时候应该从哪里开始fetch呢?如果是自动提交的话默认从上一个fetch的下一位开始,那数据就遗漏了。
    /*看了源码,从fetch数组拿数据的时候没有commit操作,所以自动提交时提交的应该就是fetch的offset/
    2、如果在poll到数据后需要进行处理(如持久化到DB),如果自动提交了,下次fetch数据就是从提交的offset之后获取,但是如果数据持久化过程失败了呢?那就丢了这部分数据。所以需要先将数据持久化到DB,成功后再将offset commit上去。

    手动提交:

    通过手动提交可以解决数据丢失的问题:

     ....
     props.put("enable.auto.commit", "false");  //将自动提交关闭
     ....
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);   //持久化到数据库
             consumer.commitSync();  //commit offset,没有参数默认所有数据都正常消费了,提交的是本次poll的最大offset+1
             buffer.clear();
         }
     }
    

    也可以提交指定的partition的最新offset值:

    while(running) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
    



    另外,也可以通过seek函数手动控制Consumer的position(即设置poll时的起始offset),这样就可以跳过一些数据或者获取一些历史数据:(注意使用seek设置指定partition的offset时该Consumer必须要先assign订阅了该partition。)

    2018-01-23 16-24-12屏幕截图.png

    ----------------------------------------------我是分割线-------------------------------------------------

    partition assignment决定了Consumer从哪些partition获取数据。同样有自动和手动设置两种方式:

    1、 dynamic partition assignment
    这种方式我们通过 consumer.subscribe(Topic topic)来订阅整个topic,这样kafka会对consumer group里的consumer进行partitions的公平分配(https://www.jianshu.com/p/6233d5341dfe)

    这样就引出两个特殊情况:
    1、如果进程中需要维护一些与指定partition相关的状态,那只能从指定partition获取数据。
    2、如果进程本身是高可用的,进程失败时会自动重启恢复。但是如果没有手动指定partition的话,kafka检测到进程失败就会自动重新分配partition,这是多余的。

    2、Manual Partition Assignment
    这种方式我们通过 consumer.assign(Arrays.asList(partition0, partition1))来指定该consumer固定消费哪些partition。这种情况下,指定partition的Consumer即使failed掉了也不会触发partition的rebalance。该Consumer和其他Consumer相互独立。

    好文:
    Consumer API :http://blog.csdn.net/xianzhen376/article/details/51167742
    官网:http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

    相关文章

      网友评论

          本文标题:kafka Java API中commit offset与par

          本文链接:https://www.haomeiwen.com/subject/twssaxtx.html