美文网首页
kafka server启动流程 & shutdown实现

kafka server启动流程 & shutdown实现

作者: 疯狂的哈丘 | 来源:发表于2018-08-01 20:45 被阅读0次

    平常我们通过命令 kafka-server-start /usr/local/etc/kafka/server.properties 启动,kafka的启动类是Kafka.scala,最终会调用Kafka.scala类的main方法。
    另外,启动脚本中还会设置相关JVM参数,如log4j配置文件地址、JVM堆大小等等。

    下面通过源码简单分析下kafka的启动流程以及shutdown的实现。本系列的源码分析都以0.10.2版本为准

    启动入口

    //Kafka.scala
    def main(args: Array[String]): Unit = {
      try {
        //根据命令行的参数,获取配置文件中的相关配置,这里获取到的也就是/usr/local/etc/kafka/server.properties的内容
        //这里同时还会检查是否有入参,如果没有就会报错
        val serverProps = getPropsFromArgs(args)
        //根据配置构造一个kafkaServerStartable对象,这里面会检验必要的参数是否有值
        val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
    
        //绑定一个进程关闭的钩子
        Runtime.getRuntime().addShutdownHook(new Thread() {
          override def run() = {
            kafkaServerStartable.shutdown
          }
        })
        //在KafkaServerStartable.scala的startup方法中,会继续调用KafkaServer#startup()方法
        //在KafkaServer#startup()方法中,开始初始化并加载各个组件
        kafkaServerStartable.startup
        //阻塞直到kafka被关闭
        //底层用了java的CountDownLatch.await()。当kafka被关闭时,对应的CountDownLatch.countDown()方法会被调用,这时候程序就会真正退出
        kafkaServerStartable.awaitShutdown
      }
      catch {
        case e: Throwable =>
          fatal(e)
          System.exit(1)
      }
      System.exit(0)
    }
    

    broker的生命周期

    kafka的生命周期中的各个状态如下图:

    *
    *                +-----------+
    *                |Not Running|
    *                +-----+-----+
    *                      |
    *                      v
    *                +-----+-----+
    *                |Starting   +--+
    *                +-----+-----+  | +----+------------+
    *                      |        +>+RecoveringFrom   |
    *                      v          |UncleanShutdown  |
    *               +-------+-------+ +-------+---------+
    *               |RunningAsBroker|            |
    *               +-------+-------+<-----------+
    *                       |
    *                       v
    *                +-----+------------+
    *                |PendingControlled |
    *                |Shutdown          |
    *                +-----+------------+
    *                      |
    *                      v
    *               +-----+----------+
    *               |BrokerShutting  |
    *               |Down            |
    *               +-----+----------+
    *                     |
    *                     v
    *               +-----+-----+
    *               |Not Running|
    *               +-----------+
    *
    
    • NOT Running : 未运行状态
    • Starting:正在启动中
    • RunningAsBroker: broker在运行中
    • RecoveringFromUncleanShutdown:从上次不完整的关闭中恢复状态,这个状态和logManager有关
    • PendingControlledShutdown:broker向controller报告关闭,等待controller应答中
    • BrokerShuttingDown:broker在关闭中

    sever启动流程

    //KafkaServer#startup()
    def startup() {
      try {
        info("starting")
    
        if (isShuttingDown.get)
          throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
    
        if (startupComplete.get)
          return
    
        val canStartup = isStartingUp.compareAndSet(false, true)
        if (canStartup) {
          //设置broker状态为Starting
          brokerState.newState(Starting)
    
          //启动一个定时任务的线程池
          kafkaScheduler.startup()
    
          //初始化zk组件,后续用于监听、获取zk数据用
          zkUtils = initZk()
    
          //获取集群的id,如果当前集群尚未生成集群id,那就生成一个,对应zk的 /cluster/id 的值
          _clusterId = getOrGenerateClusterId(zkUtils)
          info(s"Cluster ID = $clusterId")
    
          //获取或者生成一个brokerId
          config.brokerId = getBrokerId
          this.logIdent = "[Kafka Server " + config.brokerId + "], "
    
          //创建一个用于度量的组件
          val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter],
            Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava)
          reporters.add(new JmxReporter(jmxPrefix))
          val metricConfig = KafkaServer.metricConfig(config)
          metrics = new Metrics(metricConfig, reporters, time, true)
    
          quotaManagers = QuotaFactory.instantiate(config, metrics, time)
          notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
    
          //创建日志管理组件,创建时会检查log目录下是否有.kafka_cleanshutdown文件,如果没有的话,broker进入RecoveringFrom UncleanShutdown 状态
          logManager = createLogManager(zkUtils.zkClient, brokerState)
          logManager.startup()
    
          //创建元数据管理组件
          metadataCache = new MetadataCache(config.brokerId)
          //创建凭证提供者组件
          credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
    
          //创建一个sockerServer组件,并启动。该组件启动后,就会开始接收rpc请求了
          socketServer = new SocketServer(config, metrics, time, credentialProvider)
          socketServer.startup()
    
          //创建一个副本管理组件,并启动该组件
          replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager,
            isShuttingDown, quotaManagers.follower)
          replicaManager.startup()
    
          //创建kafka控制器,并启动。该控制器启动后broker会尝试去zk创建节点竞争成为controller
          kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix)
          kafkaController.startup()
          //创建一个集群管理组件
          adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
    
          //创建群组协调器,并且启动
          groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
          groupCoordinator.startup()
    
          //构造授权器
          authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
            val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
            authZ.configure(config.originals())
            authZ
          }
    
          //构造api组件,针对各个接口会处理不同的业务
          apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
            kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
            clusterId, time)
          //请求处理池
          requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
            config.numIoThreads)
    
          Mx4jLoader.maybeLoad()
    
          //动态配置处理器的相关配置
          dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
            ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
            ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
            ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
    
          //初始化动态配置管理器,并启动
          dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
          dynamicConfigManager.startup()
    
          /* tell everyone we are alive */
          val listeners = config.advertisedListeners.map { endpoint =>
            if (endpoint.port == 0)
              endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
            else
              endpoint
          }
          //kafka健康检查组件
          kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
            config.interBrokerProtocolVersion)
          kafkaHealthcheck.startup()
    
          //记录一下恢复点
          checkpointBrokerId(config.brokerId)
    
          /* register broker metrics */
          registerStats()
          //broker进入RunningAsBroker状态
          brokerState.newState(RunningAsBroker)
          shutdownLatch = new CountDownLatch(1)
          startupComplete.set(true)
          isStartingUp.set(false)
          AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
          info("started")
        }
      }
      catch {
        case e: Throwable =>
          fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
          isStartingUp.set(false)
          shutdown()
          throw e
      }
    }
    

    启动流程就是初始化一堆组件,然后该启动的启动。这些组件以后我会一个一个介绍,现在大家先简单了解一下它们的启动过程就好了。

    shutdown实现

    在启动代码那里,可以看到kafka在那里面已经做了shutdown的钩子。当kafka关闭的是会执行kafkaServerStartable#shutdown()方法。最后调用了KafkaServer#shutdown()方法

    def shutdown() {
      try {
        info("shutting down")
    
        if (isStartingUp.get)
          throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
    
        // To ensure correct behavior under concurrent calls, we need to check `shutdownLatch` first since it gets updated
        // last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to
        // `true` at the end of this method.
        if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
          //controlledShutdown()里面会通知controller自己关闭了,会一直阻塞到通知成功,这时候broker会进入PendingControlled Shutdown状态
          CoreUtils.swallow(controlledShutdown())
          //broker进入BrokerShutting Down状态
          brokerState.newState(BrokerShuttingDown)
          if (socketServer != null)
            CoreUtils.swallow(socketServer.shutdown())
          if (requestHandlerPool != null)
            CoreUtils.swallow(requestHandlerPool.shutdown())
          CoreUtils.swallow(kafkaScheduler.shutdown())
          if (apis != null)
            CoreUtils.swallow(apis.close())
          CoreUtils.swallow(authorizer.foreach(_.close()))
          if (replicaManager != null)
            CoreUtils.swallow(replicaManager.shutdown())
          if (adminManager != null)
            CoreUtils.swallow(adminManager.shutdown())
          if (groupCoordinator != null)
            CoreUtils.swallow(groupCoordinator.shutdown())
          if (logManager != null)
            CoreUtils.swallow(logManager.shutdown())
          if (kafkaController != null)
            CoreUtils.swallow(kafkaController.shutdown())
          if (zkUtils != null)
            CoreUtils.swallow(zkUtils.close())
          if (metrics != null)
            CoreUtils.swallow(metrics.close())
          //broker进入Not Running状态
          brokerState.newState(NotRunning)
    
          startupComplete.set(false)
          isShuttingDown.set(false)
          CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString))
          shutdownLatch.countDown()
          info("shut down completed")
        }
      }
      catch {
        case e: Throwable =>
          fatal("Fatal error during KafkaServer shutdown.", e)
          isShuttingDown.set(false)
          throw e
      }
    }
    

    做的事情就是将之前的那些组件优雅的关闭掉。在关闭这些组件前,可能还会去通知controller自己关闭了,收到controller答复后再继续关闭剩下的组件。

    向controller报告关闭

    如果 controlled.shutdown.enable 开启的话,broker在关闭时会向controller报告自己关闭,这样controller可以对broker的下线及时做一些操作,比如partition的重新选举、分区副本的关闭、通知其他的broker元数据变动等。该配置默认是开启的。

    broker向controller报告关闭的流程是这样的:

    1. 检查controlled.shutdown.enable是否开启,如果开启,broker就进入PendingControlledShutdown状态,同时开始向controller发送shutdown请求
    2. 检查重试次数是否达到controlled.shutdown.max.retries配置的值,如果是的话则退出不在发送shutdown请求。重试次数默认是3次
    3. 从zk获取当前controller所在的id,然后向该节点发送shutdown请求
    4. 如果请求失败,休眠controlled.shutdown.retry.backoff.ms毫秒后,重试次数+1,重新进入步骤二
    5. 如果请求成功,说明controller接受到了shutdown请求,并处理完成。

    相关文章

      网友评论

          本文标题:kafka server启动流程 & shutdown实现

          本文链接:https://www.haomeiwen.com/subject/bymcyftx.html