美文网首页程序员
kafka源码愫读(2)、broker初始化流程分析

kafka源码愫读(2)、broker初始化流程分析

作者: 桥头放牛娃 | 来源:发表于2021-02-02 11:20 被阅读0次

    Kafka 集群由多个 broker 节点构成,每个节点上都运行着一个 Kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。

    1、KafkaServer 类主要字段:

    KafkaServer 为kafka的启动类,其中包含了kafka的所有组件,如KafkaController、groupCoordinator、replicaManager等;

    class KafkaServer(val config: KafkaConfig, //配置信息
    time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None,
                      kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() //监控上报
                      ) extends Logging with KafkaMetricsGroup {
      //标识节点已经启动完成
      private val startupComplete = new AtomicBoolean(false)
      //标识节点正在执行关闭操作
      private val isShuttingDown = new AtomicBoolean(false)
      //标识节点正在执行启动操作
      private val isStartingUp = new AtomicBoolean(false)
      //阻塞主线程等待 KafkaServer 的关闭
      private var shutdownLatch = new CountDownLatch(1)
    
      private val jmxPrefix: String = "kafka.server"
    
      //日志上下文
      private var logContext: LogContext = null
    
      var metrics: Metrics = null
      //记录节点的当前状态
      val brokerState: BrokerState = new BrokerState
      //API接口类,用于处理数据类请求
      var dataPlaneRequestProcessor: KafkaApis = null
      //API接口,用于处理控制类请求
      var controlPlaneRequestProcessor: KafkaApis = null
    
      //权限管理
      var authorizer: Option[Authorizer] = None
      //启动socket,监听9092端口,等待接收客户端请求 
      var socketServer: SocketServer = null
      //数据类请求处理线程池
      var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
      //命令类处理线程池
      var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
      //日志管理器    
      var logDirFailureChannel: LogDirFailureChannel = null
      var logManager: LogManager = null
    
      //副本管理器
      var replicaManager: ReplicaManager = null
      
      //topic增删管理器
      var adminManager: AdminManager = null
      
      //token管理器
      var tokenManager: DelegationTokenManager = null
    
      //动态配置管理器
      var dynamicConfigHandlers: Map[String, ConfigHandler] = null
      var dynamicConfigManager: DynamicConfigManager = null
      var credentialProvider: CredentialProvider = null
      var tokenCache: DelegationTokenCache = null
      
      //分组协调器
      var groupCoordinator: GroupCoordinator = null
    
      var transactionCoordinator: TransactionCoordinator = null
    
      //集群控制器
      var kafkaController: KafkaController = null
    
      //定时任务调度器
      var kafkaScheduler: KafkaScheduler = null
      
      //集群分区状态信息缓存
      var metadataCache: MetadataCache = null
      
      //配额管理器
      var quotaManagers: QuotaFactory.QuotaManagers = null
    
      //zk客户端配置
      val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
      private var _zkClient: KafkaZkClient = null
      val correlationId: AtomicInteger = new AtomicInteger(0)
      val brokerMetaPropsFile = "meta.properties"
      val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap
    
      private var _clusterId: String = null
      private var _brokerTopicStats: BrokerTopicStats = null
    
      def clusterId: String = _clusterId
    
      // Visible for testing
      private[kafka] def zkClient = _zkClient
    
      private[kafka] def brokerTopicStats = _brokerTopicStats
      }
    

    2、服务启动过程

    kafka的启动过程,主要是对当前节点各个模块进行初始化工作;

    def startup(): Unit = {
      try {
        info("starting")
    
        if (isShuttingDown.get)
          throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
    
        if (startupComplete.get)
          return
    
        val canStartup = isStartingUp.compareAndSet(false, true)
        if (canStartup) {
          //初始化broker状态为starting  
          brokerState.newState(Starting)
    
          //初始化zk连接,并创建根节点
          initZkClient(time)
    
          //从ZK获取或创建集群id,规则:UUID的mostSigBits、leastSigBits组合转base64
          _clusterId = getOrGenerateClusterId(zkClient)
          info(s"Cluster ID = $clusterId")
    
          //获取brokerId及log存储路径,brokerId通过zk生成或者server.properties配置broker.id
           //规则:/brokers/seqid的version值 + maxReservedBrokerId(默认1000),保证唯一性
          val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs
    
          /* check cluster id */
          if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
            throw new InconsistentClusterIdException(
              s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
              s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")
    
          /* generate brokerId */
          config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)
          logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
          this.logIdent = logContext.logPrefix
    
          // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
          // applied after DynamicConfigManager starts.
          config.dynamicConfig.initialize(zkClient)
    
          //初始化定时任务调度器
          kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
          kafkaScheduler.startup()
    
          //创建及配置监控,默认使用JMX及Yammer Metrics
          val reporters = new util.ArrayList[MetricsReporter]
          reporters.add(new JmxReporter(jmxPrefix))
          val metricConfig = KafkaServer.metricConfig(config)
          metrics = new Metrics(metricConfig, reporters, time, true)
    
          /* register broker metrics */
          _brokerTopicStats = new BrokerTopicStats
          
          //初始化配额管理器
          quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
          notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
          //用于保证kafka-log数据目录的存在
          logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
    
          //启动日志管理器,kafka的消息以日志形式存储
          logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
          logManager.startup()
    
          metadataCache = new MetadataCache(config.brokerId)
          // SCRAM认证方式的token缓存enabled dynamically.
          tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
          credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
    
          //启动socket,监听9092端口,等待接收客户端请求 
          socketServer = new SocketServer(config, metrics, time, credentialProvider)
          socketServer.startup(startupProcessors = false)
    
          //启动副本管理器,高可用相关
          replicaManager = createReplicaManager(isShuttingDown)
          replicaManager.startup()
          
          //将broker信息注册到ZK上
          val brokerInfo = createBrokerInfo
          val brokerEpoch = zkClient.registerBroker(brokerInfo)
    
          //校验broker信息
          checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId)))
    
          //启动token管理器
          tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
          tokenManager.startup()
    
          //启动Kafka控制器,只有leader会与ZK建连
          kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, threadNamePrefix)
          kafkaController.startup()
          
          //admin管理器
          adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
    
          /* start group coordinator */
           //启动集群群组协调器
          groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)
          groupCoordinator.startup()
    
         //启动事务协调器
          transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
          transactionCoordinator.startup()
    
          //ACL
          authorizer = config.authorizer
          authorizer.foreach(_.configure(config.originals))
          val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
            case Some(authZ) =>
              authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.mapValues(_.toCompletableFuture).toMap
            case None =>
              brokerInfo.broker.endPoints.map { ep => ep.toJava -> CompletableFuture.completedFuture[Void](null) }.toMap
          }
    
          //创建拉取管理器
          val fetchManager = new FetchManager(Time.SYSTEM,
            new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
              KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
    
          //初始化数据类请求的KafkaApis,负责数据类请求逻辑处理
          dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
            kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
            fetchManager, brokerTopicStats, clusterId, time, tokenManager)
            
          //初始化数据类请求处理的线程池  
          dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
            config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
    
          //初始化控制类请求的KafkaApis
          socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
            controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
              kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
              fetchManager, brokerTopicStats, clusterId, time, tokenManager)
            
            //初始化控制类请求的线程池
            controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
              1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
          }
    
          Mx4jLoader.maybeLoad()
    
          //启动动态配置处理器
          config.dynamicConfig.addReconfigurables(this)
    
          /* start dynamic config manager */
          dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
                                                             ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
                                                             ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
                                                             ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
    
          // Create the config manager. start listening to notifications
          dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
          dynamicConfigManager.startup()
    
          //启动请求处理线程
          socketServer.startControlPlaneProcessor(authorizerFutures)
          socketServer.startDataPlaneProcessors(authorizerFutures)
          
          //更新broker状态
          brokerState.newState(RunningAsBroker)
          shutdownLatch = new CountDownLatch(1)
          startupComplete.set(true)
          isStartingUp.set(false)
          AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics, time.milliseconds())
          info("started")
        }
      }
      catch {
        case e: Throwable =>
          fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
          isStartingUp.set(false)
          shutdown()
          throw e
      }
    }
    

    3、broker节点状态

    broker状态定义:

    sealed trait BrokerStates { def state: Byte }
    case object NotRunning extends BrokerStates { val state: Byte = 0 }
    case object Starting extends BrokerStates { val state: Byte = 1 }
    case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
    case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
    case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
    case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
    
    • NotRunning :初始状态,标识当前 broker 节点未运行。
    • Starting :标识当前 broker 节点正在启动中。
    • RecoveringFromUncleanShutdown :标识当前 broker 节点正在从上次非正常关闭中恢复。
    • RunningAsBroker :标识当前 broker 节点启动成功,可以对外提供服务。
    • PendingControlledShutdown :标识当前 broker 节点正在等待 controlled shutdown 操作完成。
    • BrokerShuttingDown :标识当前 broker 节点正在执行 shutdown 操作。

    节点状态转换图:

    节点状态转换图.png

    相关文章

      网友评论

        本文标题:kafka源码愫读(2)、broker初始化流程分析

        本文链接:https://www.haomeiwen.com/subject/xktstltx.html