一.KafkaConsumer的使用例子:
消费者从Kafka服务端拉取消息,并交给业务逻辑进行处理。KafkaConsumer的API很友好,开发人员不用关心与Kafka服务端之间网络连接管理,心跳检测,请求超时,重试等底层的操作,也不用关心订阅Topic的分区数量,分区Leader副本的网络拓扑以及Consumer Group的Rebalance等Kafka的具体细节,KafkaConsume中还提供了自动提交offset的功能,让开发人员更加关注业务逻辑,提供开发效率。
Kafka的使用例子:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String[] args) {
Properties props=new Properties();
props.put("bootstrap.servers","localhost:9092");//broker地址
props.put("group.id","test");//所属的Consumer Group的Id
props.put("enable.auto.commit","true");
//自动提交offset的时间间隔
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");
//key使用的Deserializer
props.put("key.deserializer","org.apache.kafka.common.serialization.IntegerSerializer");
// value 使用的Deserializer
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
//订阅test1和test2两个topic
consumer.subscribe(Arrays.asList("test1","test2"));
try{
while (true){
//从服务端拉取消息,每次poll()可以拉取多个消息
ConsumerRecords<String,String> records=consumer.poll(100);
//消费消息,这里仅仅是将消息的offset,key,value输出
for(ConsumerRecord<String,String> record:records){
System.out.printf("offset= %d,key= %s,value= %s\n",record.offset(),record.key(),record.value());
}
}
}finally {
consumer.close();
}
}
}
KafkaConsumer的核心方法是poll()方法,它负责从Kafka服务端拉取消息。
二.传递保证语义:
提交offset
Kafka服务端不会记录消费者的消费位置,而是有消费者自己决定如何记录其消费的offset。旧版本的消费者会将其消费位置记录到ZooKeeper中,在新版本的消费者为了缓解ZooKeeper集群的压力,在服务端添加一个名为"__consumer_offsets"的内部Topic。Offsets Topic用来保存消费者提交的offset,当出现消费者上/下线时会触发Consumer Group进行Rebalance操作,对分区进行重新分配,待Rebalance操作完成后,消费者就可以读取Offsets Topic中记录的offset,并从此offset位置继续消费。使用Offsets Topic记录消费者的offset是默认选项,开发者可以根据业务需求将offset记录在别的存储中。
在消费者消费消息的过程中,提交offset的实际很重要,因为决定了消费者故障重启后的消费位置。
自动提交
在实例中我们将"enable.auto.commit"设置为"true"可以起到自动提交offset的功能,"auto.commit.interval.ms"选项设置了自动提交的时间间隔。每次在调用KafkaConsumer.poll()方法时都会检测是否需要自动提交,并提交上一次poll()方法返回的最后一个消息的offset。为了避免消息丢失,建议poll()方法之前要处理完上传poll方法拉取的全部消息。
手动提交
KafkaConsumer中还提供了两个手动提交offset的方法:
- commitSync():可以指定提交的offset值,同步提交。
- commitSync():可以指定提交的offset值,异步提交。
传递保证语义
传递保证语义的三个级别
- At most once: 消息可能会丢,但绝不会重复传递。
- At least once: 消息绝不会丢,但可能重复传递。
- Exactly once: 每条消息只会传递一次。
很少会有“At most once”的需求。
如果通过Kafka传递的消息是幂等性的(一条消息被反复消费多次也不会对计算结果产生影响),使用At least once语义也是没有什么问题的。
Exactly once
在一般场景中我们需要Exactly once语义才是我们需要的。Exactly once语义由生产者和消费者两部分共同决定:
1.生产者保证不会产生重复的消息。
当生产者向Kafka发送消息,且正常得到响应的时候,可以确保生产者不会产生重复的消息。但是生产者向Kafka发送消息后,遇到网络问题,无法获得响应,生产者就无法判断消息是否成功提交给了Kafka。在我们分析生产者的时候,当出现异常时,会进行消息重传,这就可能出现“At least once”的语义。为了在响应异常情况下实现Exactly once语义,有两个方案:
- 每个分区只有一个生产者写入消息(分区和生产者是多对一的关系),当出现异常或超时的情况时,生产者查询此分区最后一个消息,用来决定要消息重传还是继续发送后面的消息。
- 为每个消息添加一个全局唯一主键,生产者不做处理,让消费者对消息去重,实现“Exactly once”语义。
如果业务数据产生的消息可以找到合适的字段作为主键,或有一个全局的ID,应该优先考虑第二个方案。
2.消费者不能重复拉取相同的消息。
问题提出
消费者处理消息与提交offset的顺序,决定了是哪种语义。
下图表示业务逻辑先对消息进行处理,再提交offset。这种模式如果消费者在处理完消息后,提交offset前出现宕机,待消费者再上线时,还会处理未提交的那部分消息(2-7这部分消息),但是这部分已经被消费者处理过了,这就对应了“At least once”语义。
at least once.jpg
另外一个场景,消费者拉取消息后,先提交offset后再处理消息。在提交offset之后,业务逻辑处理消息之前出现宕机,待消费者重新上线时,就无法读到刚刚已经提交而未处理的这部分消息(5-8这部分消息),这就对应了“At least once”语义。还有Consumer Group Rebalance也可以导致“At least once”语义。
at most once.jpg
为了实现消费者的“Exactly once”语义,可以借鉴的方案:
消费者将关闭自动提交offset的功能且不在手动提交offset,这样就不使用Offsets Topic这个内部Topic记录其offset,而是消费者自己保存offset。这里利用事务的原子性来实现“Exactly once”语义,我们将offset和消息处理放在一个事务中,事务执行成功则认为此消息被消费,否则事务回滚需要重新消费。当出现消费者宕机重启或Rebalance操作时,消费者可以从关系数据库中找到对应的offset,然后调用KafkaConsumer.seek()方法手动设置消费位置,从此offset处开始继续消费。
消费者并不知道Consumer Group什么时候做Rebalance的操作,哪个分区分配给了哪个消费者消费。我们可以通过向KafkaConsumer添加ConsumerRebalanceListener接口来解决这个问题。ConsumerRebalanceListener有两个回调方法。
- onPartitionRevoked()方法:调用时机是Consumer停止拉取数据之后,Rebalance开始之前,我们可以在此方法中实现手动提交offset,这就避免了Rebalance导致的重复消费的情况。
- onPartitionAssigned()方法:调用时机是Rebalance完成后,Consumer开始拉取数据之前,我们可以在此方法中调整或定义offset的值。
ConsumerRebalanceListener和seek()方法,我们就可以实现从关系数据库获取offset并手动设置的功能了。
网友评论