GroupCoordinator介绍
1.GroupCoordinator handles general group membership and offset management.
group协调器处理一般的组成员和offset消费位移管理
- Each Kafka server instantiates a coordinator which is responsible for a set of groups. Groups are assigned to coordinators based on their group names.
每个kafka server都实例化一个coordinator,负责其中一部分group
group被分配给哪个coordinator是基于他们的名称来的
它处理的请求类型主要有一下几种
ApiKeys.OFFSET_COMMIT;
ApiKeys.OFFSET_FETCH;
ApiKeys.JOIN_GROUP;
ApiKeys.LEAVE_GROUP;
ApiKeys.SYNC_GROUP;
ApiKeys.DESCRIBE_GROUPS;
ApiKeys.LIST_GROUPS;
ApiKeys.HEARTBEAT;
GroupCoordinator启动
启动在KafkaServer类的startup方法
groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM)
groupCoordinator.startup()
group如何选择coordinator
要说这个,就必须介绍一下这个 __consumer_offsets topic 了,它是 Kafka 内部使用的一个 topic,专门用来存储 group 消费的情况,默认情况下有50个 partition,每个 partition 默认有三个副本,而具体的一个 group 的消费情况要存储到哪一个 partition 上,是根据 abs(GroupId.hashCode()) % NumPartitions 来计算的(其中,NumPartitions 是 __consumer_offsets 的 partition 数,默认是50个)。
也即是说对于 consumer group 而言,是根据其 group.id 进行 hash 并计算得到其具对应的 partition 值,该 partition leader 所在 Broker 即为该 Group 所对应的 GroupCoordinator,GroupCoordinator 会存储与该 group 相关的所有的 Meta 信息。
GroupCoordinator的metadata
对于consumer group而言,其对应的metadata信息主要包含以下内容:
State metadata:
- group state
- generation id
- leader id
@nonthreadsafe
//NOTE: group 的 meta 信息,对 group 级别而言,每个 group 都会有一个实例对象
private[coordinator] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) {
private var state: GroupState = initialState // group 的状态
private val members = new mutable.HashMap[String, MemberMetadata] // group 的 member 信息
private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata] //对应的 commit offset
private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata] // commit offset 成功后更新到上面的 map 中
var protocolType: Option[String] = None
var generationId = 0 // generation id
var leaderId: String = null // leader consumer id
var protocol: String = null
}
而对于每个consumer而言,其meta信息包括以下内容
private[coordinator] class MemberMetadata(val memberId: String,
val groupId: String,
val clientId: String,
val clientHost: String,
val rebalanceTimeoutMs: Int,
val sessionTimeoutMs: Int,
val protocolType: String,
var supportedProtocols: List[(String, Array[Byte])]) {}
GroupCoordinator处理请求
Offset 请求的处理
关于Offset的请求处理有2个,查询Offset OFFSET_FETCH和提交OffsetOFFSET_COMMIT
OFFSET_FETCH
fetch Offset分2种情况,获取group所消费的所有TopicPartition的offset;获取指定TopicPartition的offset;
2种情况都是调用coordinator的handleFetchOffsets方法
并且在查询时对group加锁,严格保证获取的数据时最准确的
OFFSET_COMMIT
doCommitOffsets的
网友评论