平常我们通过命令 kafka-server-start /usr/local/etc/kafka/server.properties
启动,kafka的启动类是Kafka.scala,最终会调用Kafka.scala类的main方法。
另外,启动脚本中还会设置相关JVM参数,如log4j配置文件地址、JVM堆大小等等。
下面通过源码简单分析下kafka的启动流程以及shutdown的实现。本系列的源码分析都以0.10.2
版本为准
启动入口
//Kafka.scala
def main(args: Array[String]): Unit = {
try {
//根据命令行的参数,获取配置文件中的相关配置,这里获取到的也就是/usr/local/etc/kafka/server.properties的内容
//这里同时还会检查是否有入参,如果没有就会报错
val serverProps = getPropsFromArgs(args)
//根据配置构造一个kafkaServerStartable对象,这里面会检验必要的参数是否有值
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
//绑定一个进程关闭的钩子
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
kafkaServerStartable.shutdown
}
})
//在KafkaServerStartable.scala的startup方法中,会继续调用KafkaServer#startup()方法
//在KafkaServer#startup()方法中,开始初始化并加载各个组件
kafkaServerStartable.startup
//阻塞直到kafka被关闭
//底层用了java的CountDownLatch.await()。当kafka被关闭时,对应的CountDownLatch.countDown()方法会被调用,这时候程序就会真正退出
kafkaServerStartable.awaitShutdown
}
catch {
case e: Throwable =>
fatal(e)
System.exit(1)
}
System.exit(0)
}
broker的生命周期
kafka的生命周期中的各个状态如下图:
*
* +-----------+
* |Not Running|
* +-----+-----+
* |
* v
* +-----+-----+
* |Starting +--+
* +-----+-----+ | +----+------------+
* | +>+RecoveringFrom |
* v |UncleanShutdown |
* +-------+-------+ +-------+---------+
* |RunningAsBroker| |
* +-------+-------+<-----------+
* |
* v
* +-----+------------+
* |PendingControlled |
* |Shutdown |
* +-----+------------+
* |
* v
* +-----+----------+
* |BrokerShutting |
* |Down |
* +-----+----------+
* |
* v
* +-----+-----+
* |Not Running|
* +-----------+
*
- NOT Running : 未运行状态
- Starting:正在启动中
- RunningAsBroker: broker在运行中
- RecoveringFromUncleanShutdown:从上次不完整的关闭中恢复状态,这个状态和logManager有关
- PendingControlledShutdown:broker向controller报告关闭,等待controller应答中
- BrokerShuttingDown:broker在关闭中
sever启动流程
//KafkaServer#startup()
def startup() {
try {
info("starting")
if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
if (startupComplete.get)
return
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
//设置broker状态为Starting
brokerState.newState(Starting)
//启动一个定时任务的线程池
kafkaScheduler.startup()
//初始化zk组件,后续用于监听、获取zk数据用
zkUtils = initZk()
//获取集群的id,如果当前集群尚未生成集群id,那就生成一个,对应zk的 /cluster/id 的值
_clusterId = getOrGenerateClusterId(zkUtils)
info(s"Cluster ID = $clusterId")
//获取或者生成一个brokerId
config.brokerId = getBrokerId
this.logIdent = "[Kafka Server " + config.brokerId + "], "
//创建一个用于度量的组件
val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter],
Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava)
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
metrics = new Metrics(metricConfig, reporters, time, true)
quotaManagers = QuotaFactory.instantiate(config, metrics, time)
notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
//创建日志管理组件,创建时会检查log目录下是否有.kafka_cleanshutdown文件,如果没有的话,broker进入RecoveringFrom UncleanShutdown 状态
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup()
//创建元数据管理组件
metadataCache = new MetadataCache(config.brokerId)
//创建凭证提供者组件
credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
//创建一个sockerServer组件,并启动。该组件启动后,就会开始接收rpc请求了
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup()
//创建一个副本管理组件,并启动该组件
replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager,
isShuttingDown, quotaManagers.follower)
replicaManager.startup()
//创建kafka控制器,并启动。该控制器启动后broker会尝试去zk创建节点竞争成为controller
kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix)
kafkaController.startup()
//创建一个集群管理组件
adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
//创建群组协调器,并且启动
groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
groupCoordinator.startup()
//构造授权器
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
//构造api组件,针对各个接口会处理不同的业务
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
clusterId, time)
//请求处理池
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)
Mx4jLoader.maybeLoad()
//动态配置处理器的相关配置
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
//初始化动态配置管理器,并启动
dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
dynamicConfigManager.startup()
/* tell everyone we are alive */
val listeners = config.advertisedListeners.map { endpoint =>
if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
else
endpoint
}
//kafka健康检查组件
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()
//记录一下恢复点
checkpointBrokerId(config.brokerId)
/* register broker metrics */
registerStats()
//broker进入RunningAsBroker状态
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
启动流程就是初始化一堆组件,然后该启动的启动。这些组件以后我会一个一个介绍,现在大家先简单了解一下它们的启动过程就好了。
shutdown实现
在启动代码那里,可以看到kafka在那里面已经做了shutdown的钩子。当kafka关闭的是会执行kafkaServerStartable#shutdown()方法。最后调用了KafkaServer#shutdown()方法
def shutdown() {
try {
info("shutting down")
if (isStartingUp.get)
throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
// To ensure correct behavior under concurrent calls, we need to check `shutdownLatch` first since it gets updated
// last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to
// `true` at the end of this method.
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
//controlledShutdown()里面会通知controller自己关闭了,会一直阻塞到通知成功,这时候broker会进入PendingControlled Shutdown状态
CoreUtils.swallow(controlledShutdown())
//broker进入BrokerShutting Down状态
brokerState.newState(BrokerShuttingDown)
if (socketServer != null)
CoreUtils.swallow(socketServer.shutdown())
if (requestHandlerPool != null)
CoreUtils.swallow(requestHandlerPool.shutdown())
CoreUtils.swallow(kafkaScheduler.shutdown())
if (apis != null)
CoreUtils.swallow(apis.close())
CoreUtils.swallow(authorizer.foreach(_.close()))
if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown())
if (adminManager != null)
CoreUtils.swallow(adminManager.shutdown())
if (groupCoordinator != null)
CoreUtils.swallow(groupCoordinator.shutdown())
if (logManager != null)
CoreUtils.swallow(logManager.shutdown())
if (kafkaController != null)
CoreUtils.swallow(kafkaController.shutdown())
if (zkUtils != null)
CoreUtils.swallow(zkUtils.close())
if (metrics != null)
CoreUtils.swallow(metrics.close())
//broker进入Not Running状态
brokerState.newState(NotRunning)
startupComplete.set(false)
isShuttingDown.set(false)
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString))
shutdownLatch.countDown()
info("shut down completed")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer shutdown.", e)
isShuttingDown.set(false)
throw e
}
}
做的事情就是将之前的那些组件优雅的关闭掉。在关闭这些组件前,可能还会去通知controller自己关闭了,收到controller答复后再继续关闭剩下的组件。
向controller报告关闭
如果 controlled.shutdown.enable 开启的话,broker在关闭时会向controller报告自己关闭,这样controller可以对broker的下线及时做一些操作,比如partition的重新选举、分区副本的关闭、通知其他的broker元数据变动等。该配置默认是开启的。
broker向controller报告关闭的流程是这样的:
- 检查controlled.shutdown.enable是否开启,如果开启,broker就进入PendingControlledShutdown状态,同时开始向controller发送shutdown请求
- 检查重试次数是否达到controlled.shutdown.max.retries配置的值,如果是的话则退出不在发送shutdown请求。重试次数默认是3次
- 从zk获取当前controller所在的id,然后向该节点发送shutdown请求
- 如果请求失败,休眠controlled.shutdown.retry.backoff.ms毫秒后,重试次数+1,重新进入步骤二
- 如果请求成功,说明controller接受到了shutdown请求,并处理完成。
网友评论