美文网首页kafka
Kafka-server源码分析之ZK和controller

Kafka-server源码分析之ZK和controller

作者: tracy_668 | 来源:发表于2021-01-23 11:16 被阅读0次

    [TOC]

    Zookeeper与集群管理原理

    我们知道所有的broker启动之后,都会连接到Zookeeper上面。那具体来讲,Zookeeper要帮助Kafka完成什么工作呢?

    集群管理的思路

    broker的“生“与“死“

    任何时候,当集群中有1个新的broker加入,或者某个旧的broker死亡,集群中其它机器都需要知道这件事。

    其实现方式就是监听Zookeeper上面的/broker/ids结点,其每个子结点就对应1台broker机器,当broker机器添加,子结点列表增大;broker机器死亡,子结点列表减小。

    Controller
    为了减小Zookeeper的压力,同时也降低整个分布式系统的复杂度,Kafka引入了一个“中央控制器“,也就是Controller。

    其基本思路是:先通过Zookeeper在所有broker中选举出一个Controller,然后用这个Controller来控制其它所有的broker,而不是让zookeeper直接控制所有的机器。

    比如上面对/broker/ids的监听,并不是所有broker都监听此结点,而是只有Controller监听此结点,这样就把一个“分布式“问题转化成了“集中式“问题,即降低了Zookeeper负担,也便于控制逻辑的编写。

    topic与partition的增加/删除
    同样,作为1个分布式集群,当增加/删除一个topic或者partition的时候,不可能挨个通知集群的每1台机器。

    这里的实现思路也是:管理端(Admin/TopicCommand)把增加/删除命令发送给Zk,Controller监听Zk获取更新消息, Controller再分发给相关的broker。

    I0ITec ZkClient
    关于Zookeeper的客户端,我们知道常用的有Apache Curator,但Kafka用的不是这个。而是另外一个叫做I0ITec ZkClient的,记得没错的话,阿里的dubbo框架,也用的这个。相对Curator,它更加轻量级。

    具体来说,其主要有3个Listener:

    //当某个session断了重连,就会调用这个监听器
    public interface IZkStateListener {
        public void handleStateChanged(KeeperState state) throws Exception;
    
        public void handleNewSession() throws Exception;
    
        public void handleSessionEstablishmentError(final Throwable error) throws Exception;
    
    }
    
    //当某个结点的data变化之后(data变化,或者结点本事被删除)
    public interface IZkDataListener {
    
        public void handleDataChange(String dataPath, Object data) throws Exception;
    
        public void handleDataDeleted(String dataPath) throws Exception;
    }
    
    //当某个结点的子结点发生变化
    public interface IZkChildListener {
        public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception;
    }
    

    Kafka正是利用上面3个listener实现了所有zookeeper相关状态变化的监听,其具体应用,将在后续序列逐个展开!

    KafkaController选举过程/Failover与Resignation

    Kafka集群的几大核心组件

    在正式进入源码分析之前,我们先看一下整个Kafka集群的几大核心组件。让我们从整个服务器的main函数开始:

    //Kafka
      def main(args: Array[String]): Unit = {
        try {
          val serverProps = getPropsFromArgs(args)
          val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
    
          // attach shutdown handler to catch control-c
          Runtime.getRuntime().addShutdownHook(new Thread() {
            override def run() = {
              kafkaServerStartable.shutdown   //注册一个JVM关闭的钩子
            }
          })
    
          kafkaServerStartable.startup  //启动程序
          kafkaServerStartable.awaitShutdown
        }
        catch {
          case e: Throwable =>
            fatal(e)
            System.exit(1)
        }
        System.exit(0)
      }
    
    //KafkaServerStartable
    class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
      private val server = new KafkaServer(serverConfig)
    
      def startup() {
        try {
          server.startup()
        }
        catch {
          case e: Throwable =>
            fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
            // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
            System.exit(1)
        }
      }
      ...
    }
    
    //KafkaServer
      def startup() {
        try {
          info("starting")
    
          if(isShuttingDown.get)
            throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
    
          if(startupComplete.get)
            return
    
          val canStartup = isStartingUp.compareAndSet(false, true)
          if (canStartup) {
            metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
    
            brokerState.newState(Starting)
    
            //核心组件0
            kafkaScheduler.startup()
    
            zkUtils = initZk()
    
            logManager = createLogManager(zkUtils.zkClient, brokerState)
            logManager.startup()
    
            config.brokerId =  getBrokerId
            this.logIdent = "[Kafka Server " + config.brokerId + "], "
            //核心组件1
            socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
            socketServer.startup()
    
            //核心组件2
            replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
              isShuttingDown)
            replicaManager.startup()
    
            //核心组件3
            kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
            kafkaController.startup()
    
            //核心组件4
            consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager)
            consumerCoordinator.startup()
    
            /* Get the authorizer and initialize it if one is specified.*/
            authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
              val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
              authZ.configure(config.originals())
              authZ
            }
    
            //核心组件5
            apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
              kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
            requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
            brokerState.newState(RunningAsBroker)
    
            Mx4jLoader.maybeLoad()
    
            /* start dynamic config manager */
            dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
                                                               ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))
    
            // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
            // TODO: Move this logic to DynamicConfigManager
            AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
              case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
            }
    
            // Create the config manager. start listening to notifications
            dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
            dynamicConfigManager.startup()
    
            /* tell everyone we are alive */
            val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
              if (endpoint.port == 0)
                (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
              else
                (protocol, endpoint)
            }
            kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils)
            kafkaHealthcheck.startup()
    
            /* register broker metrics */
            registerStats()
    
            shutdownLatch = new CountDownLatch(1)
            startupComplete.set(true)
            isStartingUp.set(false)
            AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
            info("started")
          }
        }
        catch {
          case e: Throwable =>
            fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
            isStartingUp.set(false)
            shutdown()
            throw e
        }
      }
    
    

    除了代码,我们也看一下服务器的启动/关闭的shell脚本:

    bin/kafka-server-start.sh
    通过 nohup 守护进程,具体脚本细节就不在此列出了。
    
    //bin/kafak-server-stop.sh 可以看到,进程的关闭很简单,就是通过kill命令,发送SIGTERM信号。JVM收到信号,执行上面的钩子函数
    ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM   
    
    

    通过看Server的启动函数,我们可以看到有以下几大核心组件:

    1。SocketServer + KafkaApis 前者接收所有网络请求,后者处理请求
    2。KafkaController 负责Controller选举
    3。ConsumerCoordinator 用于consumer group的负载均衡
    4。ReplicaManager 机器的管理
    5。KafkaSchedule

    这里着重分析KafkaController,其它核心组件,后面会一一讲述。

    选举的基本原理

    整个选举过程是通过zk上的一个临时节点来实现的:/controller节点,其data结构为:核心信息就是记录当前的controller的brokerId。

    "version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp
    
    

    当controller挂了,其它所有broker监听到此临时节点消失,然后争相创建此临时节点,谁创建成功,谁就成为新的Controller。

    除了/controller节点,还有一个辅助的/controller_epoch,记录当前Controller的轮值数。

    KafkaController与ZookeeperLeaderElector

    整个选举过程是通过这2个核心类实现的,其中ZookeeperLeaderElector是KafkaController的一个成员变量:

    //KafkaController的一个成员变量
      private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
        onControllerResignation, config.brokerId)
    
    

    下图展示了选举的整个交互过程:
    (1)KafkaController和ZookeeperLeaderElector内部各有1个Listener,一个监听session重连,1个监听/controller节点变化。
    (2)当session重连,或者/controller节点被删除,则调用elect()函数,发起重新选举。在重新选举之前,先判断自己是否旧的Controller,如果是,则先调用onResignation退位。


    image.png

    下面从KakfaController的startup函数看起:

      def startup() = {
        inLock(controllerContext.controllerLock) {
          info("Controller starting up")
          registerSessionExpirationListener()   //第1种监听:SessionExpirationListener
          isRunning = true
          controllerElector.startup   //第2种监听:LeaderChangeListener
          info("Controller startup complete")
        }
      }
    
      class SessionExpirationListener() extends IZkStateListener with Logging {
        ...
        @throws(classOf[Exception])
        def handleNewSession() {
          info("ZK expired; shut down all controller components and try to re-elect")
          inLock(controllerContext.controllerLock) {
            onControllerResignation()   //先退位
            controllerElector.elect   //发起重新选举
          }
        }
        ...
      }
      
      class LeaderChangeListener extends IZkDataListener with Logging {
    
        @throws(classOf[Exception])
        def handleDataChange(dataPath: String, data: Object) {
          inLock(controllerContext.controllerLock) {
            val amILeaderBeforeDataChange = amILeader
            leaderId = KafkaController.parseControllerId(data.toString)
    
            if (amILeaderBeforeDataChange && !amILeader)
              onResigningAsLeader()  //自己以前是controller,现在不是,退位
          }
        }
    
    
        @throws(classOf[Exception])
        def handleDataDeleted(dataPath: String) {
          inLock(controllerContext.controllerLock) {
            debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
              .format(brokerId, dataPath))
            if(amILeader)
              onResigningAsLeader()  //关键点:controller死了,有可能不是因为自己死了。而是和zookeeper的session断了。但是自己还在。此时,自己先退休,再重新发起选举。
            elect   //发起重现选举
          }
        }
      }
    
    

    2个关键回调:Failover(上任)与Resignation(退位)
    在上面的选举过程中,存在2个关键的callback:也就是新Controller上任要做的事情和旧Controller退位要做的事情。

    “上任“这个比较容易理解,也就是新的broker选举为controller;那为什么会有“退位“呢?

    这是因为zk是用心跳来判断controller是否存活,可能controller存活,但zk认为它挂了,这个时候选举出了新的controller。那旧的controller发现自己是旧的,就得主动退位。

    下面看一下“新官上任“和“旧官退位“时,分别做了什么:

      def onControllerFailover() {
        if(isRunning) {
          readControllerEpochFromZookeeper()
          //递增controller epoch
          incrementControllerEpoch(zkUtils.zkClient) 
          
          //关键点:接管所有对broker/partition节点的监听
          registerReassignedPartitionsListener()
          registerIsrChangeNotificationListener()
          registerPreferredReplicaElectionListener()
          partitionStateMachine.registerListeners()
          replicaStateMachine.registerListeners()
          initializeControllerContext()
          replicaStateMachine.startup()
          partitionStateMachine.startup()
         
          // register the partition change listeners for all existing topics on failover
          controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
          info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
          brokerState.newState(RunningAsController)
          maybeTriggerPartitionReassignment()
          maybeTriggerPreferredReplicaElection()
          /* send partition leadership info to all live brokers */
          sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
          if (config.autoLeaderRebalanceEnable) {
            info("starting the partition rebalance scheduler")
            autoRebalanceScheduler.startup()
            autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
              5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
          }
          deleteTopicManager.start()
        }
        else
          info("Controller has been shut down, aborting startup/failover")
      }
    
      def onControllerResignation() {
        //关键点:放弃对所有broker/partition的监听
        deregisterIsrChangeNotificationListener()
        deregisterReassignedPartitionsListener()
        deregisterPreferredReplicaElectionListener()
    
        // shutdown delete topic manager
        if (deleteTopicManager != null)
          deleteTopicManager.shutdown()
    
        // shutdown leader rebalance scheduler
        if (config.autoLeaderRebalanceEnable)
          autoRebalanceScheduler.shutdown()
    
        inLock(controllerContext.controllerLock) {
          // de-register partition ISR listener for on-going partition reassignment task
          deregisterReassignedPartitionsIsrChangeListeners()
          // shutdown partition state machine
          partitionStateMachine.shutdown()
          // shutdown replica state machine
          replicaStateMachine.shutdown()
          // shutdown controller channel manager
          if(controllerContext.controllerChannelManager != null) {
            controllerContext.controllerChannelManager.shutdown()
            controllerContext.controllerChannelManager = null
          }
          // reset controller context
          controllerContext.epoch=0
          controllerContext.epochZkVersion=0
          brokerState.newState(RunningAsBroker)
    
        }
      }
    
    

    KafkaController成为leader的过程

    KafkaController内部有一个ZookeeperLeaderElector,用来通过zk选举自己是否是leader

    class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
    ……
    private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
      onControllerResignation, config.brokerId)
    
    /**
     * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
     * is the controller. It merely registers the session expiration listener and starts the controller leader
     * elector
     */
    def startup() = {
      inLock(controllerContext.controllerLock) {
        info("Controller starting up");
        registerSessionExpirationListener()//注册一个会话超时的listener
        isRunning = true
        controllerElector.startup//启动controllerElector
        info("Controller startup complete")
      }
    }
    }
    

    其zk选举的路径为/controller/*,并且对zk集群建立一个会话超时的listener

    class SessionExpirationListener() extends IZkStateListener with Logging {
      this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
      @throws(classOf[Exception])
      def handleStateChanged(state: KeeperState) {
        // do nothing, since zkclient will do reconnect for us.
      }
      /**
       * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
       * any ephemeral nodes here.
       *
       * @throws Exception
       *             On any error.
       */
      @throws(classOf[Exception])
      def handleNewSession() {
        info("ZK expired; shut down all controller components and try to re-elect")
        inLock(controllerContext.controllerLock) {
          onControllerResignation()//当会话超时,重新连接上的时候,调用之前注册在ZookeeperLeaderElector的onControllerResignation函数
          controllerElector.elect//重新选举
        }
      }
    }
    
    

    因此重点关注ZookeeperLeaderElector内部的逻辑:

    class ZookeeperLeaderElector(controllerContext: ControllerContext,
                                 electionPath: String,
                                 onBecomingLeader: () => Unit,
                                 onResigningAsLeader: () => Unit,
                                 brokerId: Int)
      extends LeaderElector with Logging {
      var leaderId = -1
      // create the election path in ZK, if one does not exist
      val index = electionPath.lastIndexOf("/")
      if (index > 0)
        makeSurePersistentPathExists(controllerContext.zkClient, electionPath.substring(0, index))
      val leaderChangeListener = new LeaderChangeListener
    
      def startup {
        inLock(controllerContext.controllerLock) {//其选举路径为/controller/*
          controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
          elect//触发选举
        }
      }
    
      private def getControllerID(): Int = {
        readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
           case Some(controller) => KafkaController.parseControllerId(controller)
           case None => -1
        }
      }
        
      def elect: Boolean = {
        val timestamp = SystemTime.milliseconds.toString
        val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
       
       leaderId = getControllerID 
        /* 
         * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, 
         * it's possible that the controller has already been elected when we get here. This check will prevent the following 
         * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
         */
        if(leaderId != -1) {
           debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
           return amILeader
        }
    
        try {//通过zk创建Ephemeral Node的方式来进行选举,即如果存在并发情况下向zk的同一个路径创建node的话,有且只有1个客户端会创建成功,其它客户端创建失败,但是当创建成功的客户端和zk的链接断开之后,这个node也会消失,其它的客户端从而继续竞争
          createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,
            (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
            controllerContext.zkSessionTimeout)
          info(brokerId + " successfully elected as leader")
          leaderId = brokerId
          onBecomingLeader()//如果成功,则自己成为leader
        } catch {
          case e: ZkNodeExistsException =>
            // If someone else has written the path, then
            leaderId = getControllerID 
    
            if (leaderId != -1)
              debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
            else
              warn("A leader has been elected but just resigned, this will result in another round of election")
    
          case e2: Throwable =>
            error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
            resign()//发生异常,删除路径
        }
        amILeader
      }
    
      def close = {
        leaderId = -1
      }
    
      def amILeader : Boolean = leaderId == brokerId
    
      def resign() = {
        leaderId = -1
        deletePath(controllerContext.zkClient, electionPath)
      }
    
      /**
       * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will
       * have its own session expiration listener and handler
       */
      class LeaderChangeListener extends IZkDataListener with Logging {
        /**
         * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
         * @throws Exception On any error.
         */
        @throws(classOf[Exception])
        def handleDataChange(dataPath: String, data: Object) {
          inLock(controllerContext.controllerLock) {
            leaderId = KafkaController.parseControllerId(data.toString)
            info("New leader is %d".format(leaderId))
          }
        }
    
        /**
         * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
         * @throws Exception
         *             On any error.
         */
        @throws(classOf[Exception])
        def handleDataDeleted(dataPath: String) {//KafkaController在第一次启动的时候没有选举成功,然后当其发现节点已经消失的时候,会重新触发选举
          inLock(controllerContext.controllerLock) {
            debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
              .format(brokerId, dataPath))
            if(amILeader)//可能之前自己的角色是leader,则重新选举未必成为leader,则需要清除之前所有缓存的内容
              onResigningAsLeader()
            elect//触发选举
          }
        }
      }
    }
    

    因此KafkaController成为leader分2种情况:

    1. 第一次启动的时候会主动触发elect,如果被选举成为leader,则做leader该做的事情
    2. 第一次启动的时候选举失败,则通过LeaderChangeListener监控/controller/*路径,发现下面数据被删除的时候,触发handleDataDeleted,从而再次触发选举

    12.2 kafkaController的初始化(leader)

    从上节可以看到,KafkaController选举成功则调用onBecomingLeader,当之前的leader再次触发选举的时候调用onResigningAsLeader,以上2个函数分别对应:onControllerFailover和onControllerResignation。

    onControllerResignation很简单,就是把里面所有的模块shutdown或者注销掉:

    def onControllerResignation() {
      // de-register listeners
      deregisterReassignedPartitionsListener()
      deregisterPreferredReplicaElectionListener()
      // shutdown delete topic manager
      if (deleteTopicManager != null)
        deleteTopicManager.shutdown()
      // shutdown leader rebalance scheduler
      if (config.autoLeaderRebalanceEnable)
        autoRebalanceScheduler.shutdown()
      inLock(controllerContext.controllerLock) {
        // de-register partition ISR listener for on-going partition reassignment task
        deregisterReassignedPartitionsIsrChangeListeners()
        // shutdown partition state machine
        partitionStateMachine.shutdown()
        // shutdown replica state machine
        replicaStateMachine.shutdown()
        // shutdown controller channel manager
        if(controllerContext.controllerChannelManager != null) {
          controllerContext.controllerChannelManager.shutdown()
          controllerContext.controllerChannelManager = null
        }
        // reset controller context
        controllerContext.epoch=0
        controllerContext.epochZkVersion=0
        brokerState.newState(RunningAsBroker)
      }
    }
    

    以上各种模块会在onControllerFailover介绍,onControllerFailover本质上就是开启里面所有的功能。

    onControllerFailover的逻辑如下:

     def onControllerFailover() {
        if(isRunning) {
          info("Broker %d starting become controller state transition".format(config.brokerId))
          readControllerEpochFromZookeeper()
    //记录选举的时钟,每成功选举一次,递增1
          incrementControllerEpoch(zkClient)
    /*leader初始化,具体内容见评注*/
          registerReassignedPartitionsListener()
          registerPreferredReplicaElectionListener()
          partitionStateMachine.registerListeners()
          replicaStateMachine.registerListeners()
          initializeControllerContext()
          replicaStateMachine.startup()
          partitionStateMachine.startup()
          controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
          info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
          brokerState.newState(RunningAsController)
          maybeTriggerPartitionReassignment()
          maybeTriggerPreferredReplicaElection()
          sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
          if (config.autoLeaderRebalanceEnable) {
            info("starting the partition rebalance scheduler")
            autoRebalanceScheduler.startup()
            autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
              5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
          }
          deleteTopicManager.start()
        }
        else
          info("Controller has been shut down, aborting startup/failover")
      }
    
    

    其中步骤如下:

    1) 在/admin/reassign_partitions目录注册partitionReassignedListener监听函数

    2) 在/admin/preferred_replica_election目录注册preferredReplicaElectionListener监听函数

    3) 在/brokers/topics目录注册topicChangeListener监听函数

    4) 在/admin/delete_topics目录注册deleteTopicsListener监听函数
    5) 在/brokers/ids目录注册brokerChangeListener监听函数

    6) 初始化ControllerContext上下文,里面包含了topic的各种元数据信息,除此之外ControllerContext内部的ControllerChannelManager负责和kafka集群内部的其它KafkaServer建立channel来进行通信,TopicDeletionManager

    负责删除topic
    7)通过replicaStateMachine初始化所有的replica状态
    8)通过partitionStateMachine初始化所有的partition状态

    1. 在brokers/topics/***(具体的topic名字)/目录下注册AddPartitionsListener函数
    2. 通过处理之前启动留下的partition重分配的情况
    3. 处理之前启动留下的replica重新选举的情况
      12)向其它KafkaServer发送集群topic的元数据信息已进行数据的同步更新
      13)根据配置是否开启自动均衡
      14)开始删除topic

    KafkaControl主要通过以上各种监听函数来完成kafka集群元数据的管理,接下来先详细描述PartitionStateMachine和ReplicaStateMachine原理,因为kafka topic 的partition状态和内容主要是通过以上2个管理类来实现的,然后按照上面的流程描述不同的listener的作用。

    相关文章

      网友评论

        本文标题:Kafka-server源码分析之ZK和controller

        本文链接:https://www.haomeiwen.com/subject/hsjszktx.html