上一篇文章我们介绍了kafka基本概念和使用配置,本文将讲解一些kafka在使用过程当中的一些问题。
kafka重复消费,丢失数据
消费者在消费partition的同时会将当前消费的偏移量记录提交到kafka当中,这里消费者当在消费的时候会把主题对应分区和偏移量提交到一个特殊的topic:_consumer_offset 当中, 以便出现再均衡,partition分到另外一个消费者继续之前的偏移量继续消费。
这里提到上一篇的两个消费者配置属性
enable.auto.commit:是否自动提交偏移量
auto.commit.interval.ms:偏移量的提交频率
这里看到这个属性大多数人会认为把自动提交偏移量开启,偏移量频率调高就万事大吉了。
下面举两个例子
(图一)(图一)消费者在数据没有消费完成之前,并且这个时候也没有提交获取到的偏移量,此时发生了再均衡。这里提交的偏移量小于消费的偏移量,那么中间那段就会被重复消费
(图二)(图二)消费者在数据没有消费完成之前,但是这个时候已经提交了偏移量,此时发生再均衡,从之前消费的偏移量到以提交的偏移量那一段就会丢失
在使用自动提交的过程当中上述的问题不可避免。下面介绍几种提交偏移量的方式,看是否能解决呢?
偏移量提交方式
1,同步提交
while(true)
{
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
for(ConsumerRecord<String, String> consumerRecord : consumerRecords){
System.out.println(consumerRecord);
}
try{
kafkaConsumer.commitSync();
}catch (Exception e){
e.printStackTrace();
}
}
同步提交在处理完一条消息就提交一次消息,同步提交会有重试机制,一直到提交成功消息,除非发生不可避免的错误。提交失败这里我就可以记录到日志(记录错误或者重写kafka)。
同步提交可以做到每条消息消费完成之后都提交到kafka,但是这里的同步等待kafka写入是否成功的响应,降低了一定的吞吐量。
2,异步提交
while(true)
{
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e == null) {
System.out.println(map);
} else {
e.printStackTrace();
}
}
});
}
异步提交可见,提交最后一个偏移量之后,不用等待,可以开始下一轮消费,并且这里支持回调,在回调的时候记录错误提交信息,此方法相对比与同步提交吞吐量增大了许多,但是异步提交没有提供自动重试机制,之所以没有提供这个机制是因为假设当你提交偏移量2000的时候,出现错误(一系列网络问题)没有提交成功,(网络好了)但是这个时候消费者已经把偏移量2100提交上去了并且成功了,这个时候如果开始重试2000的偏移量则会覆盖掉最新的偏移量,导致2000 - 2100 之间的数据重复消费。
这里如果你一定要进行重试的话,就需要考虑到这个提交顺序的问题,我们可以维护一个偏移量的序列号,在提交成功回调之后记录一次,在重试之前去判断这个序列号是否比当前要重试的偏移量大,大则取消重试,否之。
上述的提交偏移量的方式当然各有优势,但是还是不能一定保证消息的唯一消费和不丢失,仍然会在过程中出现提交失败,出现问题。
如果再每次再均衡执行之前能提交偏移量那么是不是就不会出现这种问题了呢?
再均衡监听器
由此推出了再均衡监听器的这么一个东西看一下有什么用?
监听器实现接口: ConsumerRebalanceListener
先看下里面的方法
public interface ConsumerRebalanceListener {
void onPartitionsRevoked(Collection<TopicPartition> partitions);
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}
onPartitionsRevoked:
在消费者执行再均衡之前,消费者停止消费数据之后执行
onPartitionsAssigned
在重新分配分区之后,和消费者开始消费数据之前
先来看第一个方法onPartitionsRevoked,如果在消费者群组发生再均衡的时候,在此方法内提交本次所消费到的偏移量,那么此时提交的偏移量就是最准确的,并且可以供下一个获取到此分区消费权的消费者继续消费
class HandlerListener implements ConsumerRebalanceListener{
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
kafkaConsumer.commitSync(getCommitDBoffset()); //自定方法获取当前数据库记录的offset
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
}
kafkaConsumer.subscribe(Collections.singletonList("test"), new HandlerListener());
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
commitToDB(offset); // 记录到DB offset
}
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e == null) {
System.out.println(map);
} else {
e.printStackTrace();
}
}
});
}
上面记录的数据库不一定要使用DB来实现,做个示例,也可以使用任何记录值的方式来实现,每次在再均衡执行之前把当前消费完的数据提交到kafka。
这么一看好像是解决了上面的问题了,但是如果在提交kafka之前应用宕机了呢?
这里就可以用到再均衡监听器的另外一个函数onPartitionsAssigned,如果在我们每次消费之前从我们自行记录的偏移量开始消费,那么这个问题就可以完美解决了。
class HandlerListener implements ConsumerRebalanceListener{
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
kafkaConsumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for(TopicPartition topicPartition : partitions ){
kafkaConsumer.seek(topicPartition,getPartitionOffset(topicPartition));
}
}
}
这里改写一下onPartitionsAssigned方法, 在每次重新分配分区之后,和在消费之前执行,指定到上次记录offset也防止上次提交kafka失败或者因为应用程序宕机而导致的未提交。
总结一下。
为解决kafka消费者重复消费和丢失数据的问题,介绍了几个偏移量的提交方式,和再均衡监听器等解决现有问题,当然看具体的业务需求,有些场景只是为了提高kafka的吞吐量,允许少部分的容错。如果是强业务数据可以结合本文的方式去与实际结合使用。
下一章将讲解kakfa的集群,集群控制者,以及分区首领和副本的复制
网友评论