Kafka Consumer
Kafka的消费者只有一个接口,即Consumer接口。
可以将Kafka Consumer接口的API分为以下几类:
- Consumer状态接口
- Set<TopicPartition> assignment()
- Set<String> subscription()
- Map<String, List<PartitionInfo>> listTopics()
- List<PrtitionInfo> partitionsFor(String topic)
- Set<TopicPartition> paused()
- 订阅接口
- void subscribe(Collection<String> topics)
- void subscribe(Collection<String> topics, ConsumerRebalanceListener callback)
- void subscribe(Pattern pattern, ConsumerRebalanceListener callback)
- void subscribe(Pattern)
- void unsuscribe()
- 分区分配
- void assign(Collection<TopicPartition> partitions)
- 消息获取
- ConsumerRecords<K, V> poll(long timeout)
- 提交Offset
- void commitSync()
- void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
- void commitAsync()
- void commitAsync(OffsetCommitCallback callback)
- void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
- 设置Offset
- void seek(TopicPartition partition, long offset)
- void seekToBegining(Collection<TopicPartition> partitions)
- void seekToEnd(Collection<TopicPartition> partitions)
- 获取Offset
- long position(TopicPartition partition)
- 搜索Offset
- Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
- beginningOffsets(Collection<TopicPartition> partitions)
- endOffsets(Collection<TopicPartition> partitions)
- 获取Commit位置
- OffsetAndMetadata committed(TopicPartition partition)
- Metric
- Map<MetricName, ? extends Metric> metrics()
- 消费生命周期相关
- void pause(Collection<TopicPartition> partitions)
- void resume(Collection<TopicPartition> partitions)
- void close()
- void close(long timeout, TimeUnit unit)
- void wakeup()
Consumer接口只有一个实现类,即KafkaConsumer。
KafkaConsumer源码
Consumer透明的处理了Kafka Broker异常的情况,并且透明的处理分区的迁移。Consumer通过consumer group的形式完成负载均衡的消费。
Consumer保持和必要的Broker之间的TCP链接。关闭Consumer失败会导致这些链接泄露。Consumer是非线程安全的。
Offsets and Consumer Position
Kafka为分区中的每一个记录分配一个数字化的offset。这个offset作为分区中记录的唯一ID使用,并且offset指明了Consumer消费该分区的位置。
KafkaConsumer的构造方法
KafkaConsumer有多个构造方法,最终只会通过以下这个方法来构造实例:
KafkaConsumer(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
其中config提供Consumer的配置信息,包括group、是否自动提交offset等策略。keyDeserializer和valueDeserializer分别用于消费的记录的Key和Value的解码。
在这个构造方法中初始化了Consumer核心的组件:
- metrics:用于统计一些数据
- interceptors:Consumer端的拦截器,用于在消费消息、Commit、关闭时调用用户的方法
- client:Consumer的网络客户端
- assignors:分区的分配器
- coordinator:Consumer的协调器
- fetcher:fetching的处理器,执行和管理fetch请求和结果
Consumer状态接口
Set<TopicPartition> assignment()
此方法获取分配给当前Consumer的分区集合。
assignment.png这个方法非常简单,从订阅的SubscriptionState对象中取出分区信息。
在方法的第一行执行了acquireAndEnsureOpen方法,并且把获取分区操作放到了try finally中执行,finally方法执行了release方法。从名字和接口上可以看出,应该执行了一个获取锁的操作,然后在获取分区结束后释放锁。
consumer-not-thread-safe.png通过acquire和release的方法可以看出KafkaConsumer是非线程安全的,但是并没有采用synchronized关键字或者Lock的形式,而是通过CAS的方式实现一个线程安全的操作。
在多线程操作时,Kakfa采用直接抛出异常的形式,而不是等待获取锁(初衷就是不支持多线程的操作)。
(通过线程ID和CAS来避免多线程访问还是挺巧妙的)
Set<String> subscription()
subscription.png返回订阅的Topic。可能返回空的set,如果在调用之间没有执行subscribe方法(同样该方法也只能单线程调用)。
Map<String, List<PartitionInfo>> listTopics()
list-topics.png获取所有有权限的Topic的元数据(Topic信息)。这个方法会发起一个网络调用。
List<PrtitionInfo> partitionsFor(String topic)
partitions-for.png通过本地的元数据获取Topic的Partition信息,如果本地没有则发起一个网络请求获取。
Set<TopicPartition> paused()
paused-topic.png获取通过pause(TopicPartition)暂停消费的TopicPartition集合。
订阅接口
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback)
subscribe(Collection<String> topics)方法只是简化对subscribe(Collection<String> topics, ConsumerRebalanceListener callback)的调用,所以只看subscribe(Collection<String> topics, ConsumerRebalanceListener callback)的实现。
subscribe.png结合这个方法的注释和实现,可以得出下面几个结论:
- 如果topics为空,等价于取消订阅的操作
- 订阅的topics集合不能变更,即不能通过多次订阅来达到新增订阅topic的目的
另外这个方法只是将需要订阅的topic即Partition发生变更时回调的Listener设置到SubscriptionState中,并没有实际和服务端之间进行交互构建订阅关系,所以这块订阅及分区分配必然在后续的异步的流程中执行了。
void subscribe(Pattern pattern, ConsumerRebalanceListener callback)
[图片上传中...(unsubscribe.png-cc9223-1512117040850-0)]
subscribe0.png这个订阅方法区别于上面的subscribe方法在于接收了一个Pattern参数。Pattern参数用于topic的匹配,即用户提供一个正则表达式,Consumer订阅满足该表达式的所有topic。
所以在下面的代码中会将needMetadataForAllTopics设置为true。
void unsuscribe()
unsubscribe.pngunsubscribe方法则是取消subscribe方法设置的一些状态,包括清空订阅的topic、清楚分配的分区。
这里一个的地方调用了coordinator#maybeLeaveGroup方法,通过方法名能猜测到应该是将这个Consumer实例从group中移除,以将分配分配给其他Consumer消费。
到这里,关于subscribe相关的API产生了两个疑问:
- 订阅只是将topic添加到SubscriptionState中,没有通知到Broker,什么时候订阅真正生效?
- 在unsubscribe的时候执行了coordinator#maybeLeaveGroup,那Consumer是什么时候被添加到Group中的呢?
暂时放下这两个疑问,继续看代码。
分区分配接口
void assign(Collection<TopicPartition> partitions)
assign.png这个方法是留给用户进行手动分配分区的方法。
- 每一次执行分配操作都会覆盖之前分配的分区
- 这个分配接口是手动的,即不会考虑group内成员的情况
- 用户不能同时使用手动分配和自动分配分区,只能选择其中一种方式
通过内部调用的subscriptions#assignFromUser也可以明白这个是留给用户调用的接口。
注意中间调用了coordinator@maybeAutoCommitOffsetNow,通过注释大概了解含义是将不再订阅的分区的消费进度进行提交。这一操作是为了减少消费数据的重复,如果没有提交进度,另外的Consumer开始消费这个分区时将从上一次commit的进度开始,所以这里对不订阅的分区进行一次提交可以有效的减少重复的消息量。
消息获取接口
ConsumerRecords<K, V> poll(long timeout)
poll应该是Consumer的核心接口了,因为到这里才真正执行了和获取消息相关的逻辑。
poll.png首先是校验逻辑,在poll之前如果没有进行topic的订阅或分区的分配,poll操作将抛出异常。
接着是poll的核心逻辑:
- 在一个循环体中执行获取数据的逻辑,跳出循环的条件是超时或者获取到数据
从代码中可以看出pollOnce应该是真正的执行一次获取消息的操作。而代码中注释的部分是poll的核心:
- fetcher#sendFetches方法给有需要的Server节点发送获取消息的请求
- 这么做的目的是在用户下一次进行poll操作之前先将获取消息的请求发送出去
- 这样网络操作和就可以和用户处理消息的逻辑并行,降低延迟
- client#hasPendingRequests判断是否还有未从客户端发送出去的请求
- client#pollNoWakeup执行网络IO操作且是分阻塞的
从这段注释和代码中可以看出,poll时如果拿到数据了,会将剩余的请求发送出去来实现pipelining的目的。
所以对应的pollOnce内的逻辑必然有从缓存中(即上一次poll请求中获取的数据)获取数据的操作。
pollOnce.pngpollOnce对目标分区执行一路poll请求,大致流程如下:
- coordinator#poll确保Consumer在Coordinator的管理之中
- ensure coordinator
- ensure active group(将Consumer加入到group中)
- 发送heartbeat
- 更新positions
- 从fetcher中获取消息,如果已经拿到消息则返回结果,调用结束
- 对分区执行poll请求
- 阻塞等待至少一个fetch操作完成
- 判断是否操作期间元数据进行了变更,如果变更了,丢弃获取的数据
- 返回获取结果
这里就找到了上面两个问题的答案:
“订阅只是将topic添加到SubscriptionState中,没有通知到Broker,什么时候订阅真正生效?”
这里的订阅其实并没有需要通知到Broker,而是通过Coordinator的poll操作和join操作,当变更订阅的topic之后,从新进行动态的负载均衡,之后通过fetcher完成订阅topic的数据获取。
“在unsubscribe的时候执行了coordinator#maybeLeaveGroup,那Consumer是什么时候被添加到Group中的呢?”
Consumer是在真正执行poll请求前被添加到group中的。
提交Offset
同步提交
Consumer中提供了两个同步提交offset的方法:
- void commitSync()
- void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
这两个方法用于将offset保存到Kafka中,如果采用其他存储,则不需要使用这两个API。
这个方法是同步的,即会阻塞,直到提交成功或者遇到不可恢复的异常。
两个方法最终都只是调用了coordinator#commitOffsetsSync,所以大致看一下coordinator#commitOffsetsSync方法。
commitOffsetSync.png流程中主要是:
- 状态验证(确认Coordinator)
- 构建请求(按照自定义的协议)
- 阻塞等待请求返回
- 异常处理和重试
异步提交
异步提交提供了是三个方法:
- void commitAsync()
- void commitAsync(OffsetCommitCallback callback)
- void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
三个方法最终调用了coordinator#commitOffsetsAsync来提交进度:
commitOffsetAsync.png如上代码,如果coordinator可用则进行异步的offset提交;如果不可用,则异步查找coordinator,在结果回调中再执行异步的commit操作。
设置Offset
- void seek(TopicPartition partition, long offset)
- 直接将客户端Partition状态中的position设置为指定的offset
- void seekToBegining(Collection<TopicPartition> partitions)
- void seekToEnd(Collection<TopicPartition> partitions)
- seekToBegining和seekToEnd是两个lazily的方法,只是将Partition的OffsetResetStrategy状态标记为EARLIEST或者LATEST,只有在执行poll或者position(TopicPartition)方式时才会根据策略变更offset
获取Offset
long position(TopicPartition partition)方法用于返回Partition对应的Offset值。
[图片上传中...(search_offsets.png-54d221-1512117228661-0)]
结合上面设置offset的方法,这里如果能获取到设置的值则直接返回,否则进行offset更新操作,之后再返回。
在更新offset操作中,会根据设置的OffsetResetStrategy来获取分区对应的offset。
搜索Offset
- Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
- beginningOffsets(Collection<TopicPartition> partitions)
- endOffsets(Collection<TopicPartition> partitions)
KafkaConsumer提供了三个查找Offset,分别是:
- 按指定时间查找
- 查找起始的offset
- 查找末尾的offset
这三个方法最终都通过Fetcher#retrieveOffsetsByTimes方法实现。
search_offsets.png这个方法中,首先构造请求,之后通过网络客户端阻塞的等待这个请求处理完成。
方法中包含了一个循环体,在一些处理失败的情况下会重试,如果用户的超时时间允许的话。结合这个逻辑和网络客户端的阻塞操作,所以上述三个搜索offset的操作都是阻塞的操作。
获取Commit位置
- OffsetAndMetadata committed(TopicPartition partition)
committed方法最终调用了Fetcher#fetchCommittedOffsets,从实现上可以看出这是一个阻塞的方法,且有可能一直处于while循环中(没有具体看什么场景;但是觉得这样写会很奇怪,可能导致客户端永久阻塞)。
Metric和生命周期相关
- Map<MetricName, ? extends Metric> metrics()
Metrics对象提供了一些统计的能力,通过metrics将统计的数据返回给业务方。
- void pause(Collection<TopicPartition> partitions)
- void resume(Collection<TopicPartition> partitions)
- void close()
- void close(long timeout, TimeUnit unit)
- void wakeup()
以上几个方法是生命周期相关的,即暂停恢复之类的操作。
这部分代码非常简单,如pause只是将PartitionState状态设置为pause,这样在fetch时会忽略掉此分区,而resume就是移除这个状态。
wcode.png
网友评论