KafkaController是管理leader的地方,在kafka controller启动时会调用它。主要就是在zookeeper的/controller选举路径上注册一个leader变更监听器,然后调用elect方法开启选举,
elect方法
先读取ZK /controller路径下是否已经有注册,如果有则读出,看是不是自己的brokerId,如果返回true,否则返回false (这里定义的是一个Boolean型变量amILeader)。
否则说明controller还没有选举,尝试在在zookeeper上创建临时节点。如果创建成功表明broker被选举为leader,调用onBecomeLeader;若节点以存在,再次获取controller ID,若不是-1说明已有其他broker被选举为leader;若是的话表明虽有leader被选出但其放弃了leader角色,还需要开启下一轮的leader选举.如果当前被选举为leader则:
onControllerFailover
//从/controller_epoch路径上读取controller的epoch的值,这个值用于控制controller的切换. 把读取到的值存储到controllerContext中的epoch与epochZkVersion属性中.
1,readControllerEpochFromZookeeper()
//更新controllerContext中的epoch的值(加1),并持久化到/controller_epoch路径上.
2,incrementControllerEpoch(zkUtils.zkClient)//每次选举更新epoch
//注册对/admin/reassign_partitions节点的监听处理程序,由PartitionsReassignedListener实现. 用于监听partition的重新分配的动作.主要用于监听节点的内容修改
3,registerReassignedPartitionsListener()
//注册对/isr_change_notification节点的监听处理程序,这个节点主要用于通知partitoin的isr的变化, 由IsrChangeNotificationListener实现.主要用于监听节点的内容修改,
registerIsrChangeNotificationListener()
//注册对/admin/preferred_replica_election节点的监听处理程序,这个节点用于对副本的首选节点进行处理,由PreferredReplicaElectionListener实现.主要用于监听节点的内容修改.
registerPreferredReplicaElectionListener()
//在partitionStateMachine中对/brokers/topics节点注册监听处理程序,用于监听topic的修改,由TopicChangeListener实现.主要用于监听这个节点的子节点的修改. 如果配置有deletetopic的启用时,通过配置delete.topic.enable,默认为false. 如果这个值配置为true时,对/admin/delete_topics节点注册一个DeleteTopicsListener监听处理程序, 用于监听这个节点下的子节点的修改.
partitionStateMachine.registerListeners()
//对/brokers/ids节点注册一个BrokerChangeListener监听处理程序,用于监听这个节点的子节点的修改,主要用于监听broker的的改变.
replicaStateMachine.registerListeners()
//初始化controller的上下文.
initializeControllerContext()
//启动对broker状态的监听与partition的状态监听的实例.
replicaStateMachine.startup()
partitionStateMachine.startup()
//根据现在kafka中所有的topic,对/brokers/topics/topicname节点注册一个AddPartitionsListener监听处理程序,用于监听这个topic的修改.
controllerContext.allTopics.foreach(topic =>partitionStateMachine.registerPartitionChangeListener(topic))
/*
这里对未完成partition的副本重新分配的partitionsBeingReassigned集合进行迭代,执行如下的流程处理:
1,根据准备重新分配的partition的副本所在的节点集合,检查当前liveBrokers中是否都存在这些节点,如果要重新分配的节点集合中有在liveBrokers中不包含的节点,表示要分配的副本所在节点有没有启动的节点,throw exception,
2,根据需要重新分配的partition从partitionReplicaAssignment集合中找到对应的partition的信息,这个集合中存储了已经分配的partition的副本信息,如果在已经分配的partition的集合中找不到这个partition,throw exception.
3,如果准备重新分配的副本节点集合与现在partitionReplicaAssignment集合中parition对应的副本节点集合是相同的内容,表示重新分配是没有必要的,throw exception.
4,这种情况表示重新分配的副本节点集合对应的节点都已经启动,同时这个集合与现在此partition对应的分配副本节点集合不相同,执行如下的子流程:
4,1,在/brokers/topics/tpname/partitions/pid/state路径上生成注册一个用于监听isr的变化的ReassignedPartitionsIsrChangeListener监听程序.
4,2,在deleteTopicManager中检查这个topic是否是需要删除的topic,如果是,添加到准备删除的topic的集合中.
4,3,执行对partition中副本的重新分配,通过onPartitionReassignment函数.
*/
maybeTriggerPartitionReassignment()
/*
这里对还未完成首选副本分配的partition进行首选副本分配的操作,这些未分配首选副本存储在partitionsUndergoingPreferredReplicaElection集合中.
1,首先检查对应的partitions的topic是否是已经被删除的topic,如果包含有要删除的topic时,把对应的partitions集合添加到待删除的topic partitions的集合中.
2,通过partitionStateMachine实例修改要进行首选副本分配的所有的partitions的状态为OnlinePartition.并通过preferredReplicaPartitionLeaderSelector实例进行partition的首选副本的选择操作,通过读取/brokers/topics/topicname/partitions/pid/state路径的isr的信息,如果这个路径还不存在时,根据当前partition的所有活着的副本集合,取第一个副本为leader,并把这个副本集合存储到这个路径中,根据读取这个路径的信息,通过leaderSelector来进行首选副本的分配.
3,更新partitionLeadershipInfo集合的内容,把这个partition对应的isr存储到这个集合中,并向对应的broker节点发送LeaderAndIsrRequest请求.
4,移出partitionsUndergoingPreferredReplicaElection集合中的内容,
并删除/admin/preferred_replica_election节点的数据.
*/
// 并删除/admin/preferred_replica_election节点的数据.
maybeTriggerPreferredReplicaElection()
//向所有的broker的节点发送全部topic的metadata更新的UpdateMetadataRequest请求.
/* send partition leadership info to all live brokers */
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
//如果auto.leader.rebalance.enable配置为true,默认值也是true,
// 根据leader.imbalance.check.interval.seconds配置的间隔时间,对partition进行balance操作.默认配置为300秒.定时执行的调度函数为
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
//启动删除topic的管理组件TopicDeletionManager,这个实例中生成一个DeleteTopicsThread线程,
// 前提是delete.topic.enable配置值为true.否则这个实例什么都不做.
deleteTopicManager.start();
onControllerResignation
如果从leader变为非leader 调用onControllerResignation
deregisterIsrChangeNotificationListener()//取消对/isr_change_notification节点的监听程序IsrChangeNotificationListener实例.
deregisterReassignedPartitionsListener()//取消对/admin/reassign_partitions节点的监听程序PartitionsReassignedListener实例.
deregisterPreferredReplicaElectionListener()//取消对/admin/preferred_replica_election节点的监听程序PreferredReplicaElectionListener实例.
// shutdown delete topic manager
if (deleteTopicManager !=null)//停止topicdelete的管理组件与各个broker进行通信的管理组件.
deleteTopicManager.shutdown()
// shutdown leader rebalance scheduler
if (config.autoLeaderRebalanceEnable)//关闭自动balance partitions的自动调度处理程序.
autoRebalanceScheduler.shutdown()
inLock(controllerContext.controllerLock) {
// de-register partition ISR listener for on-going partition reassignment task
deregisterReassignedPartitionsIsrChangeListeners()//取消对partitions/state节点的监听程序ReassignedPartitionsIsrChangeListener实例
// shutdown partition state machine
partitionStateMachine.shutdown()//关闭partition的状态控制器partitionStateMachine与replica的状态控制器replicaStateMachine实例.
// shutdown replica state machine
replicaStateMachine.shutdown()
// shutdown controller channel manager
if(controllerContext.controllerChannelManager !=null) {
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager =null
}
// reset controller context
controllerContext.epoch=0
controllerContext.epochZkVersion=0
brokerState.newState(RunningAsBroker)//,设置当前的broker的状态为RunningAsBroker.
相当于把原来注册的取消一遍
网友评论