一. ConsumerCoordinator:
前面讲了Rebalance操作的方案和原理。在KafkaConsumer中通过ConsumerCoordinator组件实现与服务端GroupCoordinator的交互,ConsumerCoordinator继承了AbstractCoordinator抽象类。
AbstractCoordinator介绍:
image.pngimage.png
- heartbeat: 心跳任务的辅助类,其中记录了两次发送心跳的间隔(interval字段),最新发送心跳的时间(lastHeartbeatSend字段),最后收到心跳响应的时间(lastHeartbeatReceive字段),过期时间(timeout字段),心跳任务重置时间(lastSessionReset字段),同时还提供了计算下次发送心跳的时间(timeToNextHeartbeat()方法),检测是否过期的方法(sessionTimeoutExpired()方法)。
- heartbeatTask:这是一个定时任务,负责定时发送心跳请求和心跳响应的处理,会被添加到ConsumerNetworkClient.delayedTasks定时任务队列中。
- groupId:当前消费者所属的Consumer Group的Id。
- client:ConsumerNetworkClient对象,负责网络通信和执行定时任务。
- needsJoinPrepare: 标记是否需要执行发送JoinGroupRequest请求钱前的准备操作。
-
rejoinNeeded:此字段是否重新发送JoinGroupRequest请求的条件之一。
rejoinNeeded的调用.jpg
上图1处是收到正常的JoinGroupResponse响应,将rejoinNeeded设置为False,防止重复发送JoinGroupRequest请求。2,3,4分别是收到异常的SyncGroupResponse或HeartbeatResponse或消费离开Consumer Group时执行的操作,这些情况会将rejoinNeeded设置为true,表示可以重新发送JoinGroupRequest。
- coordinator:Node类型,记录服务端GroupCoordinator所在的Node节点。
- memberId: 服务端GroupCoordinator返回的分配给消费者的唯一Id。
- generation:服务端GroupCoordinator返回的年代信息,用来区分两次Rebalance操作。由于网络延迟等问题,在执行Rebalance操作时可能受到上次Rebalance过程的请求,避免这种干扰,每次Rebalance操作都会递增generation的值。
ConsumerCoordinator介绍:
- assignors: PartitionAssignor列表。在消费者发送的JoinGroupRequest请求中包含了消费者自身支持的PartitionAssignor信息,GroupCoordinator从所有消费者都支持的分配策略中选择一个,通知Leader使用此分配策略进行分配分配。此字段的值通过partition.assignment.strategy参数配置,可以配置多个。
- metadata: Kafka集群元数据。
- subscriptions: SubscriptionState对象。
- autoCommitEnabled:是否开启了自动提交offset。
- autoCommitTask:自动提交offset的定时任务。
- interceptors: ConsumerInterceptors集合。
- excludeInternalTopics:标识是否排除内部Topic。
- metadataSnapshot:用来存储Metadata的快照信息,主要用来检测Topic是否发生了分区数量的变化。在ConsumerCoordinator的构造方法中,会为Metadata添加一个监听器,当Metadata更新时会做下面的操作:
1) 如果是AUTO_PATTERN模式,则使用用户自定义的正则表达式过滤Topic,得到需要订阅的Topic集合后,设置到SubscriptionState的subscriptions集合和groupSubscriptions集合中。
2)如果是AUTO_PATTERN或AUTO_TOPICS模式,为当前Metadata做一个快照,这个快照的底层是使用HashMap记录每个Topic中Partition的个数。将新旧快照进行比较,发生变化的话,则表示消费者订阅的Topic发生分区数量变化,则将SubscriptionState的needsPartitionAssignment字段置为true,需要重新进行分区分配。
3)使用metadataSnapshot记录变化后的新快照。
分析下Metadata的Listener的具体实现:
private void addMetadataListener() {
this.metadata.addListener(new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster) {
//AUTO_PATTERN模式的处理
if (subscriptions.hasPatternSubscription()) {
Set<String> unauthorizedTopics = new HashSet<String>();
for (String topic : cluster.unauthorizedTopics()) {
if (filterTopic(topic))
unauthorizedTopics.add(topic);
}
if (!unauthorizedTopics.isEmpty())
throw new TopicAuthorizationException(unauthorizedTopics);
final List<String> topicsToSubscribe = new ArrayList<>();
for (String topic : cluster.topics())
//通过subscribedPattern匹配Topic
if (filterTopic(topic))
topicsToSubscribe.add(topic);
//更新subscriptions集合,groupSubscriptions集合,assignment集合
subscriptions.changeSubscription(topicsToSubscribe);
//更新Metadata需要记录元数据的Topic集合
metadata.setTopics(subscriptions.groupSubscription());
} else if (!cluster.unauthorizedTopics().isEmpty()) {
throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
}
//检测是否为AUTO_PATTERN或AUTO_TOPICS模式。
// check if there are any changes to the metadata which should trigger a rebalance
if (subscriptions.partitionsAutoAssigned()) {
MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);//创建快照
if (!snapshot.equals(metadataSnapshot)) {//比较快照
metadataSnapshot = snapshot;//记录快照
//更新needPartitionAssignment字段为true,表示需要重新进行分区分配
subscriptions.needReassignment();
}
}
}
});
}
- assignmentSnapshot:也是用来存储Metadata的快照信息,不过是用来检测Partition分配的过程中有没有发生分区数量变化。具体是在Leader消费者开始分区分配操作前,使用此字段记录Metadata快照;收到SyncGroupResponse后,会比较此字段记录的快照和当前Metadata是否发生变化。如果发生变化,要重新进行分区分配。
PartitionAssignor分析:
Leader消费者在收到JoinGroupResponse后,会按照指定的分区分配策略进行分区分配,每个分区分配策略就是一个PartitionAssignor接口的实现。
image.png image.pngPartitionAssignor接口
PartitionAssignor接口中定义了Assignment和Subscription两个内部类。进行分区分配需要两个方面的数据:Metadata中记录的集群元数据和每个Member的订阅信息。为了给用户增强对分配结果的控制,就将用户订阅信息和一些影响分配的用户自定义信息封装成Subscription,例如,"用户自定义数据"可以是每个消费者的权重。其中,topics集合标识某Member订阅的Topic集合,userData表示用户自定义的数据。PartitionAssignor接口提供了subscription()方法,用于添加用户自定义的数据,在创建JoinGroupRequest的时候回用到subscription()方法。
Assignment中保存了分区的分配结果,partitions表示分配给某消费者的TopicPartition集合,userData是用户自定义的数据。
再看下PartitionAssignor的其他方法,assign()是完成Partition分配的抽象方法。onAssignment()方法是在每个消费者受到Leader分配结果时的回调函数,此调用发生在SyncGroupResponse之后。
AbstractPartitionAssignor
AbstractPartitionAssignor是PartitionAssignor接口的实现,会将Subscription中的userData去除掉后,再进行分区分配。
public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, List<String>> topicSubscriptions = new HashMap<>();
//解析Subscriptions集合,去除userData信息
for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
List<String> topics = subscriptionEntry.getValue().topics();
allSubscribedTopics.addAll(topics);
topicSubscriptions.put(subscriptionEntry.getKey(), topics);
}
//统计每个Topic的分区个数
Map<String, Integer> partitionsPerTopic = new HashMap<>();
for (String topic : allSubscribedTopics) {
Integer numPartitions = metadata.partitionCountForTopic(topic);
if (numPartitions != null && numPartitions > 0)
partitionsPerTopic.put(topic, numPartitions);
else
log.debug("Skipping assignment for topic {} since no metadata is available", topic);
}
//由子类实现
Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions);
//整理分区分配结果
// this class has maintains no user data, so just wrap the results
Map<String, Assignment> assignments = new HashMap<>();
for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
return assignments;
}
如果在自定义PartitionAssignor时需要使用userData控制分区分配结果,就不能直接继承AbstractPartitionAssignor,而需要直接实现PartitionAssignor接口。
RangeAssignor和RoundRobinAssignor都是Kafka提供的PartitionAssignor接口的默认实现:
- RangeAssignor实现原理是:针对每个Topic, n=分期数/消费数量,m=分区数%消费者数量,前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。
- RoundRobinAssignor实现原理是:将所有Topic的Partition按照字典排列,对每个Consumer进行轮询分配。
网友评论