offset commit是在Consumer端进行的操作,将下一次消费的位置(本次poll/准确的说是fetch?的最大record的后一位)commit到服务器。有两种commit方式:自动提交与手动提交。
自动提交:
设置参数 props.put("enable.auto.commit", "true");开启自动提交,这样在执行poll命令后会立即将下一个offset提交至服务器。
自动提交存在消费数据的遗漏问题,具体的需要先解释下poll函数的过程:
Consumer读取partition中的数据是通过调用发起一个fetch请求来执行的。而从KafkaConsumer来看,它有一个poll方法。但是这个poll方法只是可能会发起fetch请求。原因是:Consumer每次发起fetch请求时,读取到的数据是有限制的,通过配置项max.partition.fetch.bytes来限制的。而在执行poll方法时,会根据配置项个max.poll.records来限制一次最多poll多少个record。那么就可能出现这样的情况: 在满足max.partition.fetch.bytes限制的情况下,假如fetch到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么KafkaConsumer就需要执行7次poll方法才能将这一次通过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次pool中15个record,最后一次是poll出10个record。
另外,在consumer中,还有一个配置项:max.poll.interval.ms ,它表示最大的poll数据间隔,如果超过这个间隔没有发起poll请求,就会将该consumer退出consumer group。所以为了不使Consumer 自己被退出,Consumer 应该不停的发起poll(timeout)操作。而这个动作 KafkaConsumer Client是不会帮我们做的,这就需要自己在程序中不停的调用poll方法。
那么自动提交可能遇到两种情况:
1、如果consumer因为某些原因被退出了group,但是它所fetch的数据还没有被它poll完消费掉。那么下一个Consumer接替他,执行fetch的时候应该从哪里开始fetch呢?如果是自动提交的话默认从上一个fetch的下一位开始,那数据就遗漏了。
/*看了源码,从fetch数组拿数据的时候没有commit操作,所以自动提交时提交的应该就是fetch的offset/
2、如果在poll到数据后需要进行处理(如持久化到DB),如果自动提交了,下次fetch数据就是从提交的offset之后获取,但是如果数据持久化过程失败了呢?那就丢了这部分数据。所以需要先将数据持久化到DB,成功后再将offset commit上去。
手动提交:
通过手动提交可以解决数据丢失的问题:
....
props.put("enable.auto.commit", "false"); //将自动提交关闭
....
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer); //持久化到数据库
consumer.commitSync(); //commit offset,没有参数默认所有数据都正常消费了,提交的是本次poll的最大offset+1
buffer.clear();
}
}
也可以提交指定的partition的最新offset值:
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
另外,也可以通过seek函数手动控制Consumer的position(即设置poll时的起始offset),这样就可以跳过一些数据或者获取一些历史数据:(注意使用seek设置指定partition的offset时该Consumer必须要先assign订阅了该partition。)

----------------------------------------------我是分割线-------------------------------------------------
partition assignment决定了Consumer从哪些partition获取数据。同样有自动和手动设置两种方式:
1、 dynamic partition assignment
这种方式我们通过 consumer.subscribe(Topic topic)来订阅整个topic,这样kafka会对consumer group里的consumer进行partitions的公平分配(https://www.jianshu.com/p/6233d5341dfe)
这样就引出两个特殊情况:
1、如果进程中需要维护一些与指定partition相关的状态,那只能从指定partition获取数据。
2、如果进程本身是高可用的,进程失败时会自动重启恢复。但是如果没有手动指定partition的话,kafka检测到进程失败就会自动重新分配partition,这是多余的。
2、Manual Partition Assignment
这种方式我们通过 consumer.assign(Arrays.asList(partition0, partition1))来指定该consumer固定消费哪些partition。这种情况下,指定partition的Consumer即使failed掉了也不会触发partition的rebalance。该Consumer和其他Consumer相互独立。
好文:
Consumer API :http://blog.csdn.net/xianzhen376/article/details/51167742
官网:http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
网友评论