本篇介绍Kafka consumer开发相关的内容。
1. consumer基本概念
1.1 消费者
Kafka消费者就是各种读取Kafka集群消息的应用程序。值得注意的是consumer的版本和分类。旧版本的consumer是由Scala开发的,在Kafka 0.9.0.0版本后推出Java开发的新版本consumer。新旧版本的consumer在设计上、API上有很多不同,所以要注意区分。本文以新版本的consumer做介绍。
此外在具体的应用场景中,consumer还可以分类成:
-
消费者组(consumer group),多个消费者实例组成一个整体消费消息
-
独立消费者(standalone consumer),单独执行消费操作。
1.2 消费者组
consumer group的定义:
消费者使用group.id来标记自己,topic的每条消息都只会被发送到每个订阅它的消费者组的一个消费者实例上。
可以看出三点:
-
一个consumer group有若干个consumer实例,consumer实例可以是一个线程,或者运行在其他机器的进程。
-
同一group内,topic某个分区的消息只能发送给group下的一个consumer实例
-
topic可以发送给多个group
通过consumer group就可以实现两种消息引擎模型:
-
基于队列模型:所有consumer实例属于相同group
-
基于发布/订阅模型:consumer实例属于不同group
consumer group用于实现高伸缩性,高容错性的consumer机制。在consumer group内,若某一个consumer实例“挂”掉,可以将该consumer负责的分区交给其他consumer负责,即所谓的rebalance,保障整个group可以继续工作。
1.3 offset提交
每个consumer实例都会为它消费的分区维护属于自己的位置信息来记录当前消费了多少条信息。Kafka中,consumer group保存offset信息在Kafka的一个内部topic,即__consumer_offsets中。
__consumer_offsets是Kafka内部topic,专门保存offset。
一般情况下,__consumer_offsets有50个分区。保存的每条消息格式为KV:<group.id+topic+分区号,offset>。
当consumer group提交offset时,对group.id做哈希求模,分散在50个分区中。每次更新同一个key的最新offset值时,该topic就会写入一条含有最新offset的信息,同时Kafka会定时对__consumer_offsets做compact操作,控制日志容量。
2. 构建consumer
2.1 代码实例
0.jpeg2.2 详细步骤
构造consumer应用程序的基本步骤为:
-
构造Properties对象,配置consumer参数。必须指定的参数有bootstrap.servers,group.id,key.deserializer,value.deserializer。
-
构造KafkaConsumer对象
-
订阅topic
-
循环调用KafkaConsumer.poll方法获取ConsumerRecord的消息
-
根据业务逻辑处理ConsumerRecord对象
-
关闭KafkaConsumer
2.3 consumer其他参数
-
session.timeout.ms:consumer group检测组内成员是否崩溃的时间间隔,默认10s
-
max.poll.interval.ms:consumer处理逻辑最大时间
-
auto.offset.rest:指定了无位移信息或位移越界时kafka的应对策略。分别是earliest、latest
其他参数可以参阅官网。
3. 消息轮询
Kafka在读取消息时,是要能够同时读取多个topic的多个分区消息。Kafka采用的是一个线程同时管理多个Socket连接,即同时与多个broker通信实现消息的并行读取。
Kafka通过重复性调用poll方法获取消息,每次poll方法返回的都是订阅分区上的一组消息。
4. 位移管理
4.1 consumer位移
常见的3种消息交付语义保证:
-
最多一次(at most once),消息可能丢失,不会重复。消息在消费之前提交位移就可以实现。若consumer在提交位移和消费消息之间崩溃,重启后从最新的offset开始消费,这样那条消息就丢失了。
-
最少一次(at least once),消息不会丢失,可能重复。消息在消费之后提交可以实现。但是Kafka无法保证提交位移和消费消息在一个事务中完成,所以可能重复消费。Kafka默认就是提供at least once的处理语义。
-
精准一次(exactly once),消息一定被处理且只处理一次。在0.11.0.0版本中正式支持事务以及精准处理的需求。
4.2 consumer位移管理
consumer会在Kafka集群所有broker中选择一个作为consumer group的coordinator,用于实现组员管理、消费分配方案制定、位移提交等。
consumer提交位移的机制是:通过向所属的coordinator发送位移提交请求实现,每个位移提交请求都会写进__consumer_offsets对应分区上追加一个新消息。消息的key是<group.id,topic,分区号>,value是offset。
Kafka可以选择手动提交和自动提交offset。手动提交offset可以实现用户自行确定消息何时被真正处理完并提交位移。在较强的精准一次处理语义时,需要用户自行实现手动位移提交。
手动提交API又分为同步手动提交(commitSync)和异步手动提交(commitAsync)。调用commitSync,程序会等待位移提交结束后才执行下一条命令;若调用commitAsync,则是一个异步非阻塞调用,程序会在poll方法中不断轮询这次异步提交的结果。
5. 重平衡(Rebalance)
5.1 rebalance概述
rebalance规定了consumer group如何达成一致分配订阅topic的所有分区。新版本中Kafka通过coordinator完成rebalance。
rebalance触发的三个条件有:
-
组成员发生更变,比如有consumer的加入、离开、崩溃
-
组订阅topic数有变更
-
组订阅topic的分区数发生变更
默认提供了三种策略:range策略、round-robin策略和sticky策略。
5.2 rebalance流程
consumer group在执行rebalance前必须确定coordinator所在broker,然后执行rebalance过程:
-
加入组。组内所有consumer向coordinator发送JoinGroup请求,coordinator从中选择一个consumer当leader,并把所有成员信息和订阅信息发送给leader。
-
同步更新分配方案。leader指定分配方案,根据分配策略决定consumer负责那些topic的哪些分区,分配完成后会把分配方案封装进SyncGroup请求发送给coordinator。coordinator再把分配方案中属于每个consumer的方案单独抽取出来response返回各自的consumer。
6. 多线程消费
一般情况有两种解决方法:第一是每一个线程维护一个KafkaConsumer;第二是单个KafkaConsumer实例+多Worker线程。具体实现方法还请参考原文,或者百度相关代码。
更多内容还请参考《Apache Kafka实战》第五章。
网友评论