美文网首页
11、详解Controller选举实现原理

11、详解Controller选举实现原理

作者: 技术灭霸 | 来源:发表于2020-07-26 22:42 被阅读0次

    Controller 选举,是指 Kafka 选择集群中一台 Broker 行使 Controller 职责。整个选举过 程分为两个步骤:触发选举和开始选举。

    触发选举

    我先用一张图展示下可能触发 Controller 选举的三个场景。


    1. 集群从零启动时;
    2. Broker 侦测 /controller 节点消失时;
    3. Broker 侦测到 /controller 节点数据发生变更时。

    场景一:集群从零启动

    集群首次启动时,Controller 尚未被选举出来。于是,Broker 启动后,首先将 Startup 这 个 ControllerEvent 写入到事件队列中,然后启动对应的事件处理线程和 ControllerChangeHandler ZooKeeper 监听器,最后依赖事件处理线程进行 Controller 的选举。

    在源码中,KafkaController 类的 startup 方法就是做这些事情的。当Broker 启动时,它 会调用这个方法启动 ControllerEventThread 线程。值得注意的是,每个 Broker 都需要 做这些事情,不是说只有 Controller 所在的 Broker 才需要执行这些逻辑。

    startup 方法的主体代码如下:

      def startup() = {
        // 第1步:注册Zookeeper状态变更监听器,它是用于监听Zookeeper会话过期
        zkClient.registerStateChangeHandler(new StateChangeHandler {
          override val name: String = StateChangeHandlers.ControllerHandler
          override def afterInitializingSession(): Unit = {
            eventManager.put(RegisterBrokerAndReelect)
          }
          override def beforeInitializingSession(): Unit = {
            val queuedEvent = eventManager.clearAndPut(Expire)
            queuedEvent.awaitProcessing()
          }
        })
        // 第2步:写入Startup事件到事件队列
        eventManager.put(Startup)
        // 第3步:启动ControllerEventThread线程,开始处理事件队列中的ControllerEvent
        eventManager.start()
      }
    

    首先,startup 方法会注册 ZooKeeper 状态变更监听器,用于监听 Broker 与 ZooKeeper 之间的会话是否过期。接着,写入 Startup 事件到事件队列,然后启动 ControllerEventThread 线程,开始处理事件队列中的 Startup 事件。

    接下来,我们来学习下 KafkaController 的 process 方法处理 Startup 事件的方法:

      override def process(event: ControllerEvent): Unit = {
        try {
          event match {
            case Startup =>
              processStartup()
          }
      }
    
     private def processStartup(): Unit = {
        // 注册ControllerChangeHandler ZooKeeper监听器
     zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
    // 执行Controller选举
        elect()
      }
    

    这三种场景都要选举 Controller,因此,我们最后统一学习 elect 方法的代码实现。

    总体来说,集群启动时,Broker 通过向事件队列“塞入”Startup 事件的方式,来触发 Controller 的竞选。

    场景二:/controller 节点消失

    Broker 检测到 /controller 节点消失时,就意味着,此时整个集群中没有 Controller。因此,所有检测到 /controller 节点消失的 Broker,都会立即调用 elect 方法执行竞选逻辑。

    场景三:/controller 节点数据变更

    Broker 检测到 /controller 节点数据发生变化,通常表明,Controller“易主”了,这就分为两种情况:

    1. 如果 Broker 之前是 Controller,那么该 Broker 需要首先执行卸任操作,然后再尝试竞选;
    2. 如果 Broker 之前不是 Controller,那么,该 Broker 直接去竞选新 Controller。

    卸任逻辑是由 onControllerResignation 方法执行的,它主要是用于清空各种数据结构的值、取消 ZooKeeper 监听器、关闭各种状态机以及管理器,等等。我用注释的方式给出它的逻辑实现:

      private def onControllerResignation(): Unit = {
        debug("Resigning")
        // 取消ZooKeeper监听器的注册
        zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
        zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
        zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
        zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
        unregisterBrokerModificationsHandler(brokerModificationsHandlers.keySet)
    
        // 关闭Kafka线程调度器,其实就是取消定期的Leader重选举
        kafkaScheduler.shutdown()
        // 将统计字段全部清0
        offlinePartitionCount = 0
        preferredReplicaImbalanceCount = 0
        globalTopicCount = 0
        globalPartitionCount = 0
        topicsToDeleteCount = 0
        replicasToDeleteCount = 0
        ineligibleTopicsToDeleteCount = 0
        ineligibleReplicasToDeleteCount = 0
    
        // 关闭Token过期检查调度器
        if (tokenCleanScheduler.isStarted)
          tokenCleanScheduler.shutdown()
    
        // 取消分区重分配监听器的注册
        unregisterPartitionReassignmentIsrChangeHandlers()
        // 关闭分区状态机
        partitionStateMachine.shutdown()
        // 取消主题变更监听器的注册
        zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
        // 取消分区变更监听器的注册
        unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
        // 取消主题删除监听器的注册
        zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
        // 关闭副本状态机
        replicaStateMachine.shutdown()
        // 取消Broker变更监听器的注册
        zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
        // 关闭Controller通道管理器
        controllerChannelManager.shutdown()
        // 清空集群元数据
        controllerContext.resetContext()
        info("Resigned")
      }
    

    选举 Controller

    这三种选举场景最后都会调用 elect 方法来执行选举逻辑。我们来看下它的实现:

    private def elect(): Unit = {
        // 第1步:获取当前Controller所在Broker的序列号,如果Controller不存在,显式标记为-1
        activeControllerId = zkClient.getControllerId.getOrElse(-1)
        // 第2步:如果当前Controller已经选出来了,直接返回即可
        if (activeControllerId != -1) {
          debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
          return
        }
    
        try {
          // 第2步:注册Controller相关信息
          val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
          // 主要是创建/controller节点
          controllerContext.epoch = epoch
          controllerContext.epochZkVersion = epochZkVersion
          activeControllerId = config.brokerId
    
          info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
            s"and epoch zk version is now ${controllerContext.epochZkVersion}")
          // 第4步:执行当选Controller的后续逻辑
          onControllerFailover()
        } catch {
          case e: ControllerMovedException =>
            maybeResign()
    
            if (activeControllerId != -1)
              debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
            else
              warn("A controller has been elected but just resigned, this will result in another round of election", e)
    
          case t: Throwable =>
            error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +
              s"Trigger controller movement immediately", t)
            triggerControllerMove()
        }
      }
    

    该方法首先检查 Controller 是否已经选出来了。要知道,集群中的所有 Broker 都要执行 这些逻辑,因此,非常有可能出现某些 Broker 在执行 elect 方法时,Controller 已经被选 出来的情况。如果 Controller 已经选出来了,那么,自然也就不用再做什么了。相反地, 如果 Controller 尚未被选举出来,那么,代码会尝试创建 /controller 节点去抢注 Controller。

    一旦抢注成功,就调用 onControllerFailover 方法,执行选举成功后的动作。这些动作包 括注册各类 ZooKeeper 监听器、删除日志路径变更和 ISR 副本变更通知事件、启动 Controller 通道管理器,以及启动副本状态机和分区状态机。

    如果抢注失败了,代码会抛出 ControllerMovedException 异常。这通常表明 Controller 已经被其他 Broker 抢先占据了,那么,此时代码调用 maybeResign 方法去执行卸任逻 辑。

    总结

    1、Controller 依赖 ZooKeeper 实现 Controller 选举,主要是借助于 /controller 临时节 点和 ZooKeeper 的监听器机制。
    2、Controller 触发场景有 3 种:集群启动时;/controller 节点被删除时;/controller 节 点数据变更时。
    3、源码最终调用 elect 方法实现 Controller 选举。

    相关文章

      网友评论

          本文标题:11、详解Controller选举实现原理

          本文链接:https://www.haomeiwen.com/subject/ptjgcktx.html