[TOC]
Zookeeper与集群管理原理
我们知道所有的broker启动之后,都会连接到Zookeeper上面。那具体来讲,Zookeeper要帮助Kafka完成什么工作呢?
集群管理的思路
broker的“生“与“死“
任何时候,当集群中有1个新的broker加入,或者某个旧的broker死亡,集群中其它机器都需要知道这件事。
其实现方式就是监听Zookeeper上面的/broker/ids结点,其每个子结点就对应1台broker机器,当broker机器添加,子结点列表增大;broker机器死亡,子结点列表减小。
Controller
为了减小Zookeeper的压力,同时也降低整个分布式系统的复杂度,Kafka引入了一个“中央控制器“,也就是Controller。
其基本思路是:先通过Zookeeper在所有broker中选举出一个Controller,然后用这个Controller来控制其它所有的broker,而不是让zookeeper直接控制所有的机器。
比如上面对/broker/ids的监听,并不是所有broker都监听此结点,而是只有Controller监听此结点,这样就把一个“分布式“问题转化成了“集中式“问题,即降低了Zookeeper负担,也便于控制逻辑的编写。
topic与partition的增加/删除
同样,作为1个分布式集群,当增加/删除一个topic或者partition的时候,不可能挨个通知集群的每1台机器。
这里的实现思路也是:管理端(Admin/TopicCommand)把增加/删除命令发送给Zk,Controller监听Zk获取更新消息, Controller再分发给相关的broker。
I0ITec ZkClient
关于Zookeeper的客户端,我们知道常用的有Apache Curator,但Kafka用的不是这个。而是另外一个叫做I0ITec ZkClient的,记得没错的话,阿里的dubbo框架,也用的这个。相对Curator,它更加轻量级。
具体来说,其主要有3个Listener:
//当某个session断了重连,就会调用这个监听器
public interface IZkStateListener {
public void handleStateChanged(KeeperState state) throws Exception;
public void handleNewSession() throws Exception;
public void handleSessionEstablishmentError(final Throwable error) throws Exception;
}
//当某个结点的data变化之后(data变化,或者结点本事被删除)
public interface IZkDataListener {
public void handleDataChange(String dataPath, Object data) throws Exception;
public void handleDataDeleted(String dataPath) throws Exception;
}
//当某个结点的子结点发生变化
public interface IZkChildListener {
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception;
}
Kafka正是利用上面3个listener实现了所有zookeeper相关状态变化的监听,其具体应用,将在后续序列逐个展开!
KafkaController选举过程/Failover与Resignation
Kafka集群的几大核心组件
在正式进入源码分析之前,我们先看一下整个Kafka集群的几大核心组件。让我们从整个服务器的main函数开始:
//Kafka
def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
kafkaServerStartable.shutdown //注册一个JVM关闭的钩子
}
})
kafkaServerStartable.startup //启动程序
kafkaServerStartable.awaitShutdown
}
catch {
case e: Throwable =>
fatal(e)
System.exit(1)
}
System.exit(0)
}
//KafkaServerStartable
class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
private val server = new KafkaServer(serverConfig)
def startup() {
try {
server.startup()
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
// KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
System.exit(1)
}
}
...
}
//KafkaServer
def startup() {
try {
info("starting")
if(isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
if(startupComplete.get)
return
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
brokerState.newState(Starting)
//核心组件0
kafkaScheduler.startup()
zkUtils = initZk()
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup()
config.brokerId = getBrokerId
this.logIdent = "[Kafka Server " + config.brokerId + "], "
//核心组件1
socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()
//核心组件2
replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
isShuttingDown)
replicaManager.startup()
//核心组件3
kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
kafkaController.startup()
//核心组件4
consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager)
consumerCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
//核心组件5
apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
Mx4jLoader.maybeLoad()
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))
// Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
// TODO: Move this logic to DynamicConfigManager
AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
}
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
dynamicConfigManager.startup()
/* tell everyone we are alive */
val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
if (endpoint.port == 0)
(protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
else
(protocol, endpoint)
}
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils)
kafkaHealthcheck.startup()
/* register broker metrics */
registerStats()
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
除了代码,我们也看一下服务器的启动/关闭的shell脚本:
bin/kafka-server-start.sh
通过 nohup 守护进程,具体脚本细节就不在此列出了。
//bin/kafak-server-stop.sh 可以看到,进程的关闭很简单,就是通过kill命令,发送SIGTERM信号。JVM收到信号,执行上面的钩子函数
ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM
通过看Server的启动函数,我们可以看到有以下几大核心组件:
1。SocketServer + KafkaApis 前者接收所有网络请求,后者处理请求
2。KafkaController 负责Controller选举
3。ConsumerCoordinator 用于consumer group的负载均衡
4。ReplicaManager 机器的管理
5。KafkaSchedule
这里着重分析KafkaController,其它核心组件,后面会一一讲述。
选举的基本原理
整个选举过程是通过zk上的一个临时节点来实现的:/controller节点,其data结构为:核心信息就是记录当前的controller的brokerId。
"version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp
当controller挂了,其它所有broker监听到此临时节点消失,然后争相创建此临时节点,谁创建成功,谁就成为新的Controller。
除了/controller节点,还有一个辅助的/controller_epoch,记录当前Controller的轮值数。
KafkaController与ZookeeperLeaderElector
整个选举过程是通过这2个核心类实现的,其中ZookeeperLeaderElector是KafkaController的一个成员变量:
//KafkaController的一个成员变量
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
onControllerResignation, config.brokerId)
下图展示了选举的整个交互过程:
(1)KafkaController和ZookeeperLeaderElector内部各有1个Listener,一个监听session重连,1个监听/controller节点变化。
(2)当session重连,或者/controller节点被删除,则调用elect()函数,发起重新选举。在重新选举之前,先判断自己是否旧的Controller,如果是,则先调用onResignation退位。
image.png
下面从KakfaController的startup函数看起:
def startup() = {
inLock(controllerContext.controllerLock) {
info("Controller starting up")
registerSessionExpirationListener() //第1种监听:SessionExpirationListener
isRunning = true
controllerElector.startup //第2种监听:LeaderChangeListener
info("Controller startup complete")
}
}
class SessionExpirationListener() extends IZkStateListener with Logging {
...
@throws(classOf[Exception])
def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
inLock(controllerContext.controllerLock) {
onControllerResignation() //先退位
controllerElector.elect //发起重新选举
}
}
...
}
class LeaderChangeListener extends IZkDataListener with Logging {
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
inLock(controllerContext.controllerLock) {
val amILeaderBeforeDataChange = amILeader
leaderId = KafkaController.parseControllerId(data.toString)
if (amILeaderBeforeDataChange && !amILeader)
onResigningAsLeader() //自己以前是controller,现在不是,退位
}
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
inLock(controllerContext.controllerLock) {
debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
.format(brokerId, dataPath))
if(amILeader)
onResigningAsLeader() //关键点:controller死了,有可能不是因为自己死了。而是和zookeeper的session断了。但是自己还在。此时,自己先退休,再重新发起选举。
elect //发起重现选举
}
}
}
2个关键回调:Failover(上任)与Resignation(退位)
在上面的选举过程中,存在2个关键的callback:也就是新Controller上任要做的事情和旧Controller退位要做的事情。
“上任“这个比较容易理解,也就是新的broker选举为controller;那为什么会有“退位“呢?
这是因为zk是用心跳来判断controller是否存活,可能controller存活,但zk认为它挂了,这个时候选举出了新的controller。那旧的controller发现自己是旧的,就得主动退位。
下面看一下“新官上任“和“旧官退位“时,分别做了什么:
def onControllerFailover() {
if(isRunning) {
readControllerEpochFromZookeeper()
//递增controller epoch
incrementControllerEpoch(zkUtils.zkClient)
//关键点:接管所有对broker/partition节点的监听
registerReassignedPartitionsListener()
registerIsrChangeNotificationListener()
registerPreferredReplicaElectionListener()
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
initializeControllerContext()
replicaStateMachine.startup()
partitionStateMachine.startup()
// register the partition change listeners for all existing topics on failover
controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
brokerState.newState(RunningAsController)
maybeTriggerPartitionReassignment()
maybeTriggerPreferredReplicaElection()
/* send partition leadership info to all live brokers */
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
deleteTopicManager.start()
}
else
info("Controller has been shut down, aborting startup/failover")
}
def onControllerResignation() {
//关键点:放弃对所有broker/partition的监听
deregisterIsrChangeNotificationListener()
deregisterReassignedPartitionsListener()
deregisterPreferredReplicaElectionListener()
// shutdown delete topic manager
if (deleteTopicManager != null)
deleteTopicManager.shutdown()
// shutdown leader rebalance scheduler
if (config.autoLeaderRebalanceEnable)
autoRebalanceScheduler.shutdown()
inLock(controllerContext.controllerLock) {
// de-register partition ISR listener for on-going partition reassignment task
deregisterReassignedPartitionsIsrChangeListeners()
// shutdown partition state machine
partitionStateMachine.shutdown()
// shutdown replica state machine
replicaStateMachine.shutdown()
// shutdown controller channel manager
if(controllerContext.controllerChannelManager != null) {
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager = null
}
// reset controller context
controllerContext.epoch=0
controllerContext.epochZkVersion=0
brokerState.newState(RunningAsBroker)
}
}
KafkaController成为leader的过程
KafkaController内部有一个ZookeeperLeaderElector,用来通过zk选举自己是否是leader
class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
……
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
onControllerResignation, config.brokerId)
/**
* Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
* is the controller. It merely registers the session expiration listener and starts the controller leader
* elector
*/
def startup() = {
inLock(controllerContext.controllerLock) {
info("Controller starting up");
registerSessionExpirationListener()//注册一个会话超时的listener
isRunning = true
controllerElector.startup//启动controllerElector
info("Controller startup complete")
}
}
}
其zk选举的路径为/controller/*,并且对zk集群建立一个会话超时的listener
class SessionExpirationListener() extends IZkStateListener with Logging {
this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
}
/**
* Called after the zookeeper session has expired and a new session has been created. You would have to re-create
* any ephemeral nodes here.
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
inLock(controllerContext.controllerLock) {
onControllerResignation()//当会话超时,重新连接上的时候,调用之前注册在ZookeeperLeaderElector的onControllerResignation函数
controllerElector.elect//重新选举
}
}
}
因此重点关注ZookeeperLeaderElector内部的逻辑:
class ZookeeperLeaderElector(controllerContext: ControllerContext,
electionPath: String,
onBecomingLeader: () => Unit,
onResigningAsLeader: () => Unit,
brokerId: Int)
extends LeaderElector with Logging {
var leaderId = -1
// create the election path in ZK, if one does not exist
val index = electionPath.lastIndexOf("/")
if (index > 0)
makeSurePersistentPathExists(controllerContext.zkClient, electionPath.substring(0, index))
val leaderChangeListener = new LeaderChangeListener
def startup {
inLock(controllerContext.controllerLock) {//其选举路径为/controller/*
controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
elect//触发选举
}
}
private def getControllerID(): Int = {
readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
case Some(controller) => KafkaController.parseControllerId(controller)
case None => -1
}
}
def elect: Boolean = {
val timestamp = SystemTime.milliseconds.toString
val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
leaderId = getControllerID
/*
* We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
* it's possible that the controller has already been elected when we get here. This check will prevent the following
* createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
*/
if(leaderId != -1) {
debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
return amILeader
}
try {//通过zk创建Ephemeral Node的方式来进行选举,即如果存在并发情况下向zk的同一个路径创建node的话,有且只有1个客户端会创建成功,其它客户端创建失败,但是当创建成功的客户端和zk的链接断开之后,这个node也会消失,其它的客户端从而继续竞争
createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,
(controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
controllerContext.zkSessionTimeout)
info(brokerId + " successfully elected as leader")
leaderId = brokerId
onBecomingLeader()//如果成功,则自己成为leader
} catch {
case e: ZkNodeExistsException =>
// If someone else has written the path, then
leaderId = getControllerID
if (leaderId != -1)
debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
else
warn("A leader has been elected but just resigned, this will result in another round of election")
case e2: Throwable =>
error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
resign()//发生异常,删除路径
}
amILeader
}
def close = {
leaderId = -1
}
def amILeader : Boolean = leaderId == brokerId
def resign() = {
leaderId = -1
deletePath(controllerContext.zkClient, electionPath)
}
/**
* We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will
* have its own session expiration listener and handler
*/
class LeaderChangeListener extends IZkDataListener with Logging {
/**
* Called when the leader information stored in zookeeper has changed. Record the new leader in memory
* @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
inLock(controllerContext.controllerLock) {
leaderId = KafkaController.parseControllerId(data.toString)
info("New leader is %d".format(leaderId))
}
}
/**
* Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {//KafkaController在第一次启动的时候没有选举成功,然后当其发现节点已经消失的时候,会重新触发选举
inLock(controllerContext.controllerLock) {
debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
.format(brokerId, dataPath))
if(amILeader)//可能之前自己的角色是leader,则重新选举未必成为leader,则需要清除之前所有缓存的内容
onResigningAsLeader()
elect//触发选举
}
}
}
}
因此KafkaController成为leader分2种情况:
- 第一次启动的时候会主动触发elect,如果被选举成为leader,则做leader该做的事情
- 第一次启动的时候选举失败,则通过LeaderChangeListener监控/controller/*路径,发现下面数据被删除的时候,触发handleDataDeleted,从而再次触发选举
12.2 kafkaController的初始化(leader)
从上节可以看到,KafkaController选举成功则调用onBecomingLeader,当之前的leader再次触发选举的时候调用onResigningAsLeader,以上2个函数分别对应:onControllerFailover和onControllerResignation。
onControllerResignation很简单,就是把里面所有的模块shutdown或者注销掉:
def onControllerResignation() {
// de-register listeners
deregisterReassignedPartitionsListener()
deregisterPreferredReplicaElectionListener()
// shutdown delete topic manager
if (deleteTopicManager != null)
deleteTopicManager.shutdown()
// shutdown leader rebalance scheduler
if (config.autoLeaderRebalanceEnable)
autoRebalanceScheduler.shutdown()
inLock(controllerContext.controllerLock) {
// de-register partition ISR listener for on-going partition reassignment task
deregisterReassignedPartitionsIsrChangeListeners()
// shutdown partition state machine
partitionStateMachine.shutdown()
// shutdown replica state machine
replicaStateMachine.shutdown()
// shutdown controller channel manager
if(controllerContext.controllerChannelManager != null) {
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager = null
}
// reset controller context
controllerContext.epoch=0
controllerContext.epochZkVersion=0
brokerState.newState(RunningAsBroker)
}
}
以上各种模块会在onControllerFailover介绍,onControllerFailover本质上就是开启里面所有的功能。
onControllerFailover的逻辑如下:
def onControllerFailover() {
if(isRunning) {
info("Broker %d starting become controller state transition".format(config.brokerId))
readControllerEpochFromZookeeper()
//记录选举的时钟,每成功选举一次,递增1
incrementControllerEpoch(zkClient)
/*leader初始化,具体内容见评注*/
registerReassignedPartitionsListener()
registerPreferredReplicaElectionListener()
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
initializeControllerContext()
replicaStateMachine.startup()
partitionStateMachine.startup()
controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
brokerState.newState(RunningAsController)
maybeTriggerPartitionReassignment()
maybeTriggerPreferredReplicaElection()
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
}
deleteTopicManager.start()
}
else
info("Controller has been shut down, aborting startup/failover")
}
其中步骤如下:
1) 在/admin/reassign_partitions目录注册partitionReassignedListener监听函数
2) 在/admin/preferred_replica_election目录注册preferredReplicaElectionListener监听函数
3) 在/brokers/topics目录注册topicChangeListener监听函数
4) 在/admin/delete_topics目录注册deleteTopicsListener监听函数
5) 在/brokers/ids目录注册brokerChangeListener监听函数
6) 初始化ControllerContext上下文,里面包含了topic的各种元数据信息,除此之外ControllerContext内部的ControllerChannelManager负责和kafka集群内部的其它KafkaServer建立channel来进行通信,TopicDeletionManager
负责删除topic
7)通过replicaStateMachine初始化所有的replica状态
8)通过partitionStateMachine初始化所有的partition状态
- 在brokers/topics/***(具体的topic名字)/目录下注册AddPartitionsListener函数
- 通过处理之前启动留下的partition重分配的情况
- 处理之前启动留下的replica重新选举的情况
12)向其它KafkaServer发送集群topic的元数据信息已进行数据的同步更新
13)根据配置是否开启自动均衡
14)开始删除topic
KafkaControl主要通过以上各种监听函数来完成kafka集群元数据的管理,接下来先详细描述PartitionStateMachine和ReplicaStateMachine原理,因为kafka topic 的partition状态和内容主要是通过以上2个管理类来实现的,然后按照上面的流程描述不同的listener的作用。
网友评论