美文网首页
Kafka源码分析-Consumer(5)-ConsumerCo

Kafka源码分析-Consumer(5)-ConsumerCo

作者: 陈阳001 | 来源:发表于2018-11-24 16:56 被阅读0次

一. ConsumerCoordinator:

前面讲了Rebalance操作的方案和原理。在KafkaConsumer中通过ConsumerCoordinator组件实现与服务端GroupCoordinator的交互,ConsumerCoordinator继承了AbstractCoordinator抽象类。

AbstractCoordinator介绍:

image.png
image.png
  • heartbeat: 心跳任务的辅助类,其中记录了两次发送心跳的间隔(interval字段),最新发送心跳的时间(lastHeartbeatSend字段),最后收到心跳响应的时间(lastHeartbeatReceive字段),过期时间(timeout字段),心跳任务重置时间(lastSessionReset字段),同时还提供了计算下次发送心跳的时间(timeToNextHeartbeat()方法),检测是否过期的方法(sessionTimeoutExpired()方法)。
  • heartbeatTask:这是一个定时任务,负责定时发送心跳请求和心跳响应的处理,会被添加到ConsumerNetworkClient.delayedTasks定时任务队列中。
  • groupId:当前消费者所属的Consumer Group的Id。
  • client:ConsumerNetworkClient对象,负责网络通信和执行定时任务。
  • needsJoinPrepare: 标记是否需要执行发送JoinGroupRequest请求钱前的准备操作。
  • rejoinNeeded:此字段是否重新发送JoinGroupRequest请求的条件之一。


    rejoinNeeded的调用.jpg

    上图1处是收到正常的JoinGroupResponse响应,将rejoinNeeded设置为False,防止重复发送JoinGroupRequest请求。2,3,4分别是收到异常的SyncGroupResponse或HeartbeatResponse或消费离开Consumer Group时执行的操作,这些情况会将rejoinNeeded设置为true,表示可以重新发送JoinGroupRequest。

  • coordinator:Node类型,记录服务端GroupCoordinator所在的Node节点。
  • memberId: 服务端GroupCoordinator返回的分配给消费者的唯一Id。
  • generation:服务端GroupCoordinator返回的年代信息,用来区分两次Rebalance操作。由于网络延迟等问题,在执行Rebalance操作时可能受到上次Rebalance过程的请求,避免这种干扰,每次Rebalance操作都会递增generation的值。

ConsumerCoordinator介绍:

  • assignors: PartitionAssignor列表。在消费者发送的JoinGroupRequest请求中包含了消费者自身支持的PartitionAssignor信息,GroupCoordinator从所有消费者都支持的分配策略中选择一个,通知Leader使用此分配策略进行分配分配。此字段的值通过partition.assignment.strategy参数配置,可以配置多个。
  • metadata: Kafka集群元数据。
  • subscriptions: SubscriptionState对象。
  • autoCommitEnabled:是否开启了自动提交offset。
  • autoCommitTask:自动提交offset的定时任务。
  • interceptors: ConsumerInterceptors集合。
  • excludeInternalTopics:标识是否排除内部Topic。
  • metadataSnapshot:用来存储Metadata的快照信息,主要用来检测Topic是否发生了分区数量的变化。在ConsumerCoordinator的构造方法中,会为Metadata添加一个监听器,当Metadata更新时会做下面的操作:
    1) 如果是AUTO_PATTERN模式,则使用用户自定义的正则表达式过滤Topic,得到需要订阅的Topic集合后,设置到SubscriptionState的subscriptions集合和groupSubscriptions集合中。
    2)如果是AUTO_PATTERN或AUTO_TOPICS模式,为当前Metadata做一个快照,这个快照的底层是使用HashMap记录每个Topic中Partition的个数。将新旧快照进行比较,发生变化的话,则表示消费者订阅的Topic发生分区数量变化,则将SubscriptionState的needsPartitionAssignment字段置为true,需要重新进行分区分配。
    3)使用metadataSnapshot记录变化后的新快照。
    分析下Metadata的Listener的具体实现:
private void addMetadataListener() {
        this.metadata.addListener(new Metadata.Listener() {
            @Override
            public void onMetadataUpdate(Cluster cluster) {
                //AUTO_PATTERN模式的处理
                if (subscriptions.hasPatternSubscription()) {

                    Set<String> unauthorizedTopics = new HashSet<String>();
                    for (String topic : cluster.unauthorizedTopics()) {
                        if (filterTopic(topic))
                            unauthorizedTopics.add(topic);
                    }
                    if (!unauthorizedTopics.isEmpty())
                        throw new TopicAuthorizationException(unauthorizedTopics);

                    final List<String> topicsToSubscribe = new ArrayList<>();

                    for (String topic : cluster.topics())
                        //通过subscribedPattern匹配Topic
                        if (filterTopic(topic))
                            topicsToSubscribe.add(topic);
                    //更新subscriptions集合,groupSubscriptions集合,assignment集合
                    subscriptions.changeSubscription(topicsToSubscribe);
                    //更新Metadata需要记录元数据的Topic集合
                    metadata.setTopics(subscriptions.groupSubscription());
                } else if (!cluster.unauthorizedTopics().isEmpty()) {
                    throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
                }
                //检测是否为AUTO_PATTERN或AUTO_TOPICS模式。
                // check if there are any changes to the metadata which should trigger a rebalance
                if (subscriptions.partitionsAutoAssigned()) {
                    MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);//创建快照
                    if (!snapshot.equals(metadataSnapshot)) {//比较快照
                        metadataSnapshot = snapshot;//记录快照
                        //更新needPartitionAssignment字段为true,表示需要重新进行分区分配
                        subscriptions.needReassignment();
                    }
                }

            }
        });
    }
  • assignmentSnapshot:也是用来存储Metadata的快照信息,不过是用来检测Partition分配的过程中有没有发生分区数量变化。具体是在Leader消费者开始分区分配操作前,使用此字段记录Metadata快照;收到SyncGroupResponse后,会比较此字段记录的快照和当前Metadata是否发生变化。如果发生变化,要重新进行分区分配。

PartitionAssignor分析:

Leader消费者在收到JoinGroupResponse后,会按照指定的分区分配策略进行分区分配,每个分区分配策略就是一个PartitionAssignor接口的实现。

image.png image.png

PartitionAssignor接口

PartitionAssignor接口中定义了Assignment和Subscription两个内部类。进行分区分配需要两个方面的数据:Metadata中记录的集群元数据和每个Member的订阅信息。为了给用户增强对分配结果的控制,就将用户订阅信息和一些影响分配的用户自定义信息封装成Subscription,例如,"用户自定义数据"可以是每个消费者的权重。其中,topics集合标识某Member订阅的Topic集合,userData表示用户自定义的数据。PartitionAssignor接口提供了subscription()方法,用于添加用户自定义的数据,在创建JoinGroupRequest的时候回用到subscription()方法。
Assignment中保存了分区的分配结果,partitions表示分配给某消费者的TopicPartition集合,userData是用户自定义的数据。
再看下PartitionAssignor的其他方法,assign()是完成Partition分配的抽象方法。onAssignment()方法是在每个消费者受到Leader分配结果时的回调函数,此调用发生在SyncGroupResponse之后。

AbstractPartitionAssignor

AbstractPartitionAssignor是PartitionAssignor接口的实现,会将Subscription中的userData去除掉后,再进行分区分配。

public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
        Set<String> allSubscribedTopics = new HashSet<>();
        Map<String, List<String>> topicSubscriptions = new HashMap<>();
        //解析Subscriptions集合,去除userData信息
        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
            List<String> topics = subscriptionEntry.getValue().topics();
            allSubscribedTopics.addAll(topics);
            topicSubscriptions.put(subscriptionEntry.getKey(), topics);
        }
        //统计每个Topic的分区个数
        Map<String, Integer> partitionsPerTopic = new HashMap<>();
        for (String topic : allSubscribedTopics) {
            Integer numPartitions = metadata.partitionCountForTopic(topic);
            if (numPartitions != null && numPartitions > 0)
                partitionsPerTopic.put(topic, numPartitions);
            else
                log.debug("Skipping assignment for topic {} since no metadata is available", topic);
        }
        //由子类实现
        Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions);
        //整理分区分配结果
        // this class has maintains no user data, so just wrap the results
        Map<String, Assignment> assignments = new HashMap<>();
        for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
            assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
        return assignments;
    }

如果在自定义PartitionAssignor时需要使用userData控制分区分配结果,就不能直接继承AbstractPartitionAssignor,而需要直接实现PartitionAssignor接口。
RangeAssignor和RoundRobinAssignor都是Kafka提供的PartitionAssignor接口的默认实现:

  • RangeAssignor实现原理是:针对每个Topic, n=分期数/消费数量,m=分区数%消费者数量,前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。
  • RoundRobinAssignor实现原理是:将所有Topic的Partition按照字典排列,对每个Consumer进行轮询分配。

相关文章

网友评论

      本文标题:Kafka源码分析-Consumer(5)-ConsumerCo

      本文链接:https://www.haomeiwen.com/subject/lyjyqqtx.html