美文网首页
KafkaController

KafkaController

作者: 是我_7b3f | 来源:发表于2018-05-21 16:08 被阅读0次

       KafkaController是管理leader的地方,在kafka controller启动时会调用它。主要就是在zookeeper的/controller选举路径上注册一个leader变更监听器,然后调用elect方法开启选举,

elect方法

      先读取ZK /controller路径下是否已经有注册,如果有则读出,看是不是自己的brokerId,如果返回true,否则返回false (这里定义的是一个Boolean型变量amILeader)。

      否则说明controller还没有选举,尝试在在zookeeper上创建临时节点。如果创建成功表明broker被选举为leader,调用onBecomeLeader;若节点以存在,再次获取controller ID,若不是-1说明已有其他broker被选举为leader;若是的话表明虽有leader被选出但其放弃了leader角色,还需要开启下一轮的leader选举.如果当前被选举为leader则:

onControllerFailover

//从/controller_epoch路径上读取controller的epoch的值,这个值用于控制controller的切换. 把读取到的值存储到controllerContext中的epoch与epochZkVersion属性中.

1,readControllerEpochFromZookeeper()

//更新controllerContext中的epoch的值(加1),并持久化到/controller_epoch路径上.

2,incrementControllerEpoch(zkUtils.zkClient)//每次选举更新epoch

//注册对/admin/reassign_partitions节点的监听处理程序,由PartitionsReassignedListener实现. 用于监听partition的重新分配的动作.主要用于监听节点的内容修改

3,registerReassignedPartitionsListener()

//注册对/isr_change_notification节点的监听处理程序,这个节点主要用于通知partitoin的isr的变化, 由IsrChangeNotificationListener实现.主要用于监听节点的内容修改,

registerIsrChangeNotificationListener()

//注册对/admin/preferred_replica_election节点的监听处理程序,这个节点用于对副本的首选节点进行处理,由PreferredReplicaElectionListener实现.主要用于监听节点的内容修改.

registerPreferredReplicaElectionListener()

//在partitionStateMachine中对/brokers/topics节点注册监听处理程序,用于监听topic的修改,由TopicChangeListener实现.主要用于监听这个节点的子节点的修改.  如果配置有deletetopic的启用时,通过配置delete.topic.enable,默认为false. 如果这个值配置为true时,对/admin/delete_topics节点注册一个DeleteTopicsListener监听处理程序, 用于监听这个节点下的子节点的修改.

  partitionStateMachine.registerListeners()

//对/brokers/ids节点注册一个BrokerChangeListener监听处理程序,用于监听这个节点的子节点的修改,主要用于监听broker的的改变.

replicaStateMachine.registerListeners()

//初始化controller的上下文.

initializeControllerContext()

//启动对broker状态的监听与partition的状态监听的实例.

replicaStateMachine.startup()

partitionStateMachine.startup()

//根据现在kafka中所有的topic,对/brokers/topics/topicname节点注册一个AddPartitionsListener监听处理程序,用于监听这个topic的修改.

controllerContext.allTopics.foreach(topic =>partitionStateMachine.registerPartitionChangeListener(topic))

/*

这里对未完成partition的副本重新分配的partitionsBeingReassigned集合进行迭代,执行如下的流程处理:

1,根据准备重新分配的partition的副本所在的节点集合,检查当前liveBrokers中是否都存在这些节点,如果要重新分配的节点集合中有在liveBrokers中不包含的节点,表示要分配的副本所在节点有没有启动的节点,throw exception,

2,根据需要重新分配的partition从partitionReplicaAssignment集合中找到对应的partition的信息,这个集合中存储了已经分配的partition的副本信息,如果在已经分配的partition的集合中找不到这个partition,throw exception.

3,如果准备重新分配的副本节点集合与现在partitionReplicaAssignment集合中parition对应的副本节点集合是相同的内容,表示重新分配是没有必要的,throw exception.

4,这种情况表示重新分配的副本节点集合对应的节点都已经启动,同时这个集合与现在此partition对应的分配副本节点集合不相同,执行如下的子流程:

4,1,在/brokers/topics/tpname/partitions/pid/state路径上生成注册一个用于监听isr的变化的ReassignedPartitionsIsrChangeListener监听程序.

4,2,在deleteTopicManager中检查这个topic是否是需要删除的topic,如果是,添加到准备删除的topic的集合中.

4,3,执行对partition中副本的重新分配,通过onPartitionReassignment函数.

*/

      maybeTriggerPartitionReassignment()

/*

这里对还未完成首选副本分配的partition进行首选副本分配的操作,这些未分配首选副本存储在partitionsUndergoingPreferredReplicaElection集合中.

1,首先检查对应的partitions的topic是否是已经被删除的topic,如果包含有要删除的topic时,把对应的partitions集合添加到待删除的topic partitions的集合中.

2,通过partitionStateMachine实例修改要进行首选副本分配的所有的partitions的状态为OnlinePartition.并通过preferredReplicaPartitionLeaderSelector实例进行partition的首选副本的选择操作,通过读取/brokers/topics/topicname/partitions/pid/state路径的isr的信息,如果这个路径还不存在时,根据当前partition的所有活着的副本集合,取第一个副本为leader,并把这个副本集合存储到这个路径中,根据读取这个路径的信息,通过leaderSelector来进行首选副本的分配.

3,更新partitionLeadershipInfo集合的内容,把这个partition对应的isr存储到这个集合中,并向对应的broker节点发送LeaderAndIsrRequest请求.

4,移出partitionsUndergoingPreferredReplicaElection集合中的内容,

并删除/admin/preferred_replica_election节点的数据.

*/

// 并删除/admin/preferred_replica_election节点的数据.

      maybeTriggerPreferredReplicaElection()

//向所有的broker的节点发送全部topic的metadata更新的UpdateMetadataRequest请求.

/* send partition leadership info to all live brokers */

sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)

//如果auto.leader.rebalance.enable配置为true,默认值也是true,

// 根据leader.imbalance.check.interval.seconds配置的间隔时间,对partition进行balance操作.默认配置为300秒.定时执行的调度函数为

if (config.autoLeaderRebalanceEnable) {

info("starting the partition rebalance scheduler")

autoRebalanceScheduler.startup()

autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,

5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)

}

//启动删除topic的管理组件TopicDeletionManager,这个实例中生成一个DeleteTopicsThread线程,

// 前提是delete.topic.enable配置值为true.否则这个实例什么都不做.

deleteTopicManager.start();

onControllerResignation

如果从leader变为非leader 调用onControllerResignation

deregisterIsrChangeNotificationListener()//取消对/isr_change_notification节点的监听程序IsrChangeNotificationListener实例.

deregisterReassignedPartitionsListener()//取消对/admin/reassign_partitions节点的监听程序PartitionsReassignedListener实例.

deregisterPreferredReplicaElectionListener()//取消对/admin/preferred_replica_election节点的监听程序PreferredReplicaElectionListener实例.

// shutdown delete topic manager

if (deleteTopicManager !=null)//停止topicdelete的管理组件与各个broker进行通信的管理组件.

  deleteTopicManager.shutdown()

// shutdown leader rebalance scheduler

if (config.autoLeaderRebalanceEnable)//关闭自动balance partitions的自动调度处理程序.

  autoRebalanceScheduler.shutdown()

inLock(controllerContext.controllerLock) {

// de-register partition ISR listener for on-going partition reassignment task

  deregisterReassignedPartitionsIsrChangeListeners()//取消对partitions/state节点的监听程序ReassignedPartitionsIsrChangeListener实例

// shutdown partition state machine

  partitionStateMachine.shutdown()//关闭partition的状态控制器partitionStateMachine与replica的状态控制器replicaStateMachine实例.

// shutdown replica state machine

  replicaStateMachine.shutdown()

// shutdown controller channel manager

  if(controllerContext.controllerChannelManager !=null) {

controllerContext.controllerChannelManager.shutdown()

controllerContext.controllerChannelManager =null

  }

// reset controller context

  controllerContext.epoch=0

  controllerContext.epochZkVersion=0

  brokerState.newState(RunningAsBroker)//,设置当前的broker的状态为RunningAsBroker.

相当于把原来注册的取消一遍

相关文章

网友评论

      本文标题:KafkaController

      本文链接:https://www.haomeiwen.com/subject/wyfadftx.html