美文网首页Kafka
Kafka Group Corridinator

Kafka Group Corridinator

作者: Alen_ab56 | 来源:发表于2022-03-17 20:16 被阅读0次

    GroupCoordinator介绍

    1.GroupCoordinator handles general group membership and offset management.
    group协调器处理一般的组成员和offset消费位移管理

    1. Each Kafka server instantiates a coordinator which is responsible for a set of groups. Groups are assigned to coordinators based on their group names.
      每个kafka server都实例化一个coordinator,负责其中一部分group
      group被分配给哪个coordinator是基于他们的名称来的

    它处理的请求类型主要有一下几种
    ApiKeys.OFFSET_COMMIT;
    ApiKeys.OFFSET_FETCH;
    ApiKeys.JOIN_GROUP;
    ApiKeys.LEAVE_GROUP;
    ApiKeys.SYNC_GROUP;
    ApiKeys.DESCRIBE_GROUPS;
    ApiKeys.LIST_GROUPS;
    ApiKeys.HEARTBEAT;

    GroupCoordinator启动

    启动在KafkaServer类的startup方法

    groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM)
    groupCoordinator.startup()

    group如何选择coordinator

    要说这个,就必须介绍一下这个 __consumer_offsets topic 了,它是 Kafka 内部使用的一个 topic,专门用来存储 group 消费的情况,默认情况下有50个 partition,每个 partition 默认有三个副本,而具体的一个 group 的消费情况要存储到哪一个 partition 上,是根据 abs(GroupId.hashCode()) % NumPartitions 来计算的(其中,NumPartitions 是 __consumer_offsets 的 partition 数,默认是50个)。

    也即是说对于 consumer group 而言,是根据其 group.id 进行 hash 并计算得到其具对应的 partition 值,该 partition leader 所在 Broker 即为该 Group 所对应的 GroupCoordinator,GroupCoordinator 会存储与该 group 相关的所有的 Meta 信息。

    GroupCoordinator的metadata

    对于consumer group而言,其对应的metadata信息主要包含以下内容:
    State metadata:

    1. group state
    2. generation id
    3. leader id
      @nonthreadsafe
      //NOTE: group 的 meta 信息,对 group 级别而言,每个 group 都会有一个实例对象
      private[coordinator] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) {

    private var state: GroupState = initialState // group 的状态
    private val members = new mutable.HashMap[String, MemberMetadata] // group 的 member 信息
    private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata] //对应的 commit offset
    private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata] // commit offset 成功后更新到上面的 map 中

    var protocolType: Option[String] = None
    var generationId = 0 // generation id
    var leaderId: String = null // leader consumer id
    var protocol: String = null
    }

    而对于每个consumer而言,其meta信息包括以下内容
    private[coordinator] class MemberMetadata(val memberId: String,
    val groupId: String,
    val clientId: String,
    val clientHost: String,
    val rebalanceTimeoutMs: Int,
    val sessionTimeoutMs: Int,
    val protocolType: String,
    var supportedProtocols: List[(String, Array[Byte])]) {}

    GroupCoordinator处理请求

    Offset 请求的处理

    关于Offset的请求处理有2个,查询Offset OFFSET_FETCH和提交OffsetOFFSET_COMMIT

    OFFSET_FETCH

    fetch Offset分2种情况,获取group所消费的所有TopicPartition的offset;获取指定TopicPartition的offset;
    2种情况都是调用coordinator的handleFetchOffsets方法
    并且在查询时对group加锁,严格保证获取的数据时最准确的

    OFFSET_COMMIT

    doCommitOffsets的

    相关文章

      网友评论

        本文标题:Kafka Group Corridinator

        本文链接:https://www.haomeiwen.com/subject/krfmdrtx.html