美文网首页kafka
KafkaController源码分析之Broker的上线与下线

KafkaController源码分析之Broker的上线与下线

作者: tracy_668 | 来源:发表于2021-02-18 14:12 被阅读0次

[TOC]
本文主要聊聊某一个broker上线与下线时,集群是如何感知的

zk事件

在KafkaController#onControllerFailover方法中,会向zk注册一个brokerChangeHandler,它主要监听/brokers/ids下的子节点变化事件,我们知道该节点下就是每一个broker的id,里面的数据是broker的ip端口,协议等信息

BrokerChangeHandler的源码如下

class BrokerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
  override val path: String = BrokerIdsZNode.path

  override def handleChildChange(): Unit = {
    eventManager.put(controller.BrokerChange)
  }
}

处理逻辑

事件的处理逻辑在BrokerChange类中,isActive之前也说过了,表示当前broker是否是Controller,这里我们要明确一点时,broker的上下线事件只能由Controller处理,其他broker虽然也会监听/brokers/ids节点,但不会做任何处理

process方法中的curBrokers表示zk中当前的broker列表信息,liveOrShuttingDownBrokerIds表示本地缓存的broker列表信息,假设liveOrShuttingDownBrokerIds是[0,1,2],如果新增了一个broker3,curBrokers就是[0,1,2,3];如果broker2下线了,curBrokers就是[0,1]。只需要简单的对curBrokers和liveOrShuttingDownBrokerIds做差集运算,我们就知道上线和下线的broker集合分别是什么,这也是该方法前半部分的大致思路

case object BrokerChange extends ControllerEvent {
    override def state: ControllerState = ControllerState.BrokerChange

    override def process(): Unit = {
      if (!isActive) return
      // 获取zk中当前所有broker的信息
      val curBrokers = zkClient.getAllBrokersInCluster.toSet
      val curBrokerIds = curBrokers.map(_.id)
      val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
      // 新增的broker
      val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds

      val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds

      // 新broker的信息
      val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
      // 更新缓存
      controllerContext.liveBrokers = curBrokers

      val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
      val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
      val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
      info(s"Newly added brokers: ${newBrokerIdsSorted.mkString(",")}, " +
        s"deleted brokers: ${deadBrokerIdsSorted.mkString(",")}, all live brokers: ${liveBrokerIdsSorted.mkString(",")}")

      // 建立当前broker与新增broker之间的channel
      newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
      // 关闭与dead broker直接的所有资源
      deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)

      if (newBrokerIds.nonEmpty)
        onBrokerStartup(newBrokerIdsSorted)
      if (deadBrokerIds.nonEmpty)
        onBrokerFailure(deadBrokerIdsSorted)
    }
}

在KafkaController源码分析之LeaderAndIsr请求中已经提到了ControllerChannelManager,它是Controller节点与其它broker之间网络通信管理器,在得到上线和下线的broker集合后,分别做以下处理:

  • 上线,建立网络连接,并启动请求发送线程,用于处理leaderAndIsrRequest,stopReplicaRequest,updateMetadataRequest三类请求
  • 下线,关闭网络连接,中断(interrupt)请求发送线程,清空请求队列

此处源码较为简单,篇幅有限,不再贴出源码了。最后的两个if,表示如果有新增的broker,执行onBrokerStartup方法,有下线的broker执行onBrokerFailure方法

broker上线处理onBrokerStartup

  1. 发送update metadata request,Controller把最新的broker列表同步给别的broker
  2. 将新broker上的分区和副本都置为Online状态,并选举分区leader副本,注意这里面已经包含了LeaderAndIsr请求
  3. 新broker上是否有重分配的副本,有就执行
  4. 如果新broker上有需要删除的topic,开始删除
  5. 注册了一个/broker/ids/0的数据变化的监听器——BrokerModificationsHandler,个人觉得broker元信息不会改变
private def onBrokerStartup(newBrokers: Seq[Int]) {
    info(s"New broker startup callback for ${newBrokers.mkString(",")}")
    newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
    val newBrokersSet = newBrokers.toSet
    // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new
    // broker via this update.
    // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the
    // common controlled shutdown case, the metadata will reach the new brokers faster
    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
    // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
    // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
    val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
    replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers.toSeq, OnlineReplica)
    // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
    // to see if these brokers can become leaders for some/all of those
    partitionStateMachine.triggerOnlinePartitionStateChange()
    // check if reassignment of some partitions need to be restarted
    val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
      case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains)
    }
    partitionsWithReplicasOnNewBrokers.foreach { case (tp, context) => onPartitionReassignment(tp, context) }
    // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
    // on the newly restarted brokers, there is a chance that topic deletion can resume
    val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
    if (replicasForTopicsToBeDeleted.nonEmpty) {
      info(s"Some replicas ${replicasForTopicsToBeDeleted.mkString(",")} for topics scheduled for deletion " +
        s"${topicDeletionManager.topicsToBeDeleted.mkString(",")} are on the newly restarted brokers " +
        s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics")
      topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
    }
    registerBrokerModificationsHandler(newBrokers)
}

broker下线处理onBrokerFailure

onBrokerFailure方法首先更新了本地缓存,之后调用了onReplicasBecomeOffline来处理副本下线的情况,之后移除了BrokerModificationsHandler,对应onBrokerStartup的最后一步

private def onBrokerFailure(deadBrokers: Seq[Int]) {
    info(s"Broker failure callback for ${deadBrokers.mkString(",")}")
    // 移除缓存中下线的broker上的分区
    deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
    val deadBrokersThatWereShuttingDown =
      deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
    info(s"Removed $deadBrokersThatWereShuttingDown from list of shutting down brokers.")
    // 下线broker上的副本
    val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)

    onReplicasBecomeOffline(allReplicasOnDeadBrokers)

    // 移除BrokerModificationsHandler
    unregisterBrokerModificationsHandler(deadBrokers)
}
处理下线broker中的副本

该方法看似复杂,但是逻辑很严谨,建议大家仔细看看。它主要做了以下事情:

  1. 将leader副本在dead broker上的分区找出来
  2. 将这些分区置为OfflinePartition状态
  3. 用这些分区剩下的副本触发一次leader选举
  4. 将dead broker上的副本置为Offline
  5. 如果dead broker上有要删除的topic,标记为删除失败,毕竟都下线了怎么删?
  6. 如果dead broker上没有分区的leader副本,也就是第1步返回的是空,就发送UpdateMetadataRequest给剩下活着的broker

总之该方法逻辑很严谨,算是一个比较重点的步骤

private def onReplicasBecomeOffline(newOfflineReplicas: Set[PartitionAndReplica]): Unit = {
    val (newOfflineReplicasForDeletion, newOfflineReplicasNotForDeletion) =
      newOfflineReplicas.partition(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))

    // 将leader在dead broker上的分区找出来
    val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
      !controllerContext.isReplicaOnline(partitionAndLeader._2.leaderAndIsr.leader, partitionAndLeader._1) &&
        !topicDeletionManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet

    // trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas
    // 标记这些分区为OfflinePartition状态
    partitionStateMachine.handleStateChanges(partitionsWithoutLeader.toSeq, OfflinePartition)
    // 用这些剩余分区剩余的副本选举leader
    // trigger OnlinePartition state changes for offline or new partitions
    partitionStateMachine.triggerOnlinePartitionStateChange()
    // trigger OfflineReplica state change for those newly offline replicas
    // dead broker上的副本置为Offline
    replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)

    // broker已下线,删除失败
    // fail deletion of topics that are affected by the offline replicas
    if (newOfflineReplicasForDeletion.nonEmpty) {
      // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
      // deleted when its log directory is offline. This will prevent the replica from being in TopicDeletionStarted state indefinitely
      // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state
      topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
    }

    // If replica failure did not require leader re-election, inform brokers of the offline replica
    // Note that during leader re-election, brokers update their metadata
    // 如果dead broker上没有分区的leader副本,就发送UpdateMetadataRequest给活着的broker
    if (partitionsWithoutLeader.isEmpty) {
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
    }
}

本篇接着讲述 Controller 对于监听器的处理内容 —— Broker 节点上下线的处理流程。每台 Broker 在上线时,都会与 ZK 建立一个建立一个 session,并在 /brokers/ids 下注册一个节点,节点名字就是 broker id,这个节点是临时节点,该节点内部会有这个 Broker 的详细节点信息。Controller 会监听 /brokers/ids 这个路径下的所有子节点,如果有新的节点出现,那么就代表有新的 Broker 上线,如果有节点消失,就代表有 broker 下线,Controller 会进行相应的处理,Kafka 就是利用 ZK 的这种 watch 机制及临时节点的特性来完成集群 Broker 的上下线。

BrokerChangeListener

KafkaController 在启动时,会通过副本状态机注册一个监控 broker 上下线的监听器,通过 ReplicaStateMachine 的 registerListeners() 方法实现的,该方法的实现如下:

 // register ZK listeners of the replica state machine
 def registerListeners() {
   // register broker change listener
   registerBrokerChangeListener() //note: 监听【/brokers/ids】,broker 的上线下线
 }

private def registerBrokerChangeListener() = {
  zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
}

BrokerChangeListener 是监听 /brokers/ids 节点的监听器,当该节点有变化时会触发 doHandleChildChange() 方法,具体实现如下:

//note: 如果 【/brokers/ids】 目录下子节点有变化将会触发这个操作
class BrokerChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {

  protected def logName = "BrokerChangeListener"

  def doHandleChildChange(parentPath: String, currentBrokerList: Seq[String]) {
    info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
    inLock(controllerContext.controllerLock) {
      if (hasStarted.get) {
        ControllerStats.leaderElectionTimer.time {
          try {
            //note: 当前 zk 的 broker 列表
            val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
            //note: ZK 中的 broker id 列表
            val curBrokerIds = curBrokers.map(_.id)
            //note: Controller 缓存中的 broker 列表
            val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
            //note: 新上线的 broker id 列表
            val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
            //note: 掉线的 broker id 列表
            val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
            //note: 新上线的 Broker 列表
            val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
            controllerContext.liveBrokers = curBrokers //note: 更新缓存中当前 broker 列表
            val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
            val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
            val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
            info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
              .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
            //note: Broker 上线, 在 Controller Channel Manager 中添加该 broker
            newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
            //note: Broker 下线处理, 在 Controller Channel Manager 移除该 broker
            deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
            if(newBrokerIds.nonEmpty) //note: 启动该 Broker
              controller.onBrokerStartup(newBrokerIdsSorted)
            if(deadBrokerIds.nonEmpty) //note: broker 掉线后开始 leader 选举
              controller.onBrokerFailure(deadBrokerIdsSorted)
          } catch {
            case e: Throwable => error("Error while handling broker changes", e)
          }
        }
      }
    }
  }
}

这里需要重点关注 doHandleChildChange() 方法的实现,该方法处理逻辑如下:

  1. 从 ZK 获取当前的 Broker 列表(curBrokers)及 broker id 的列表(curBrokerIds);
  2. 获取当前 Controller 中缓存的 broker id 列表(liveOrShuttingDownBrokerIds);
  3. 获取新上线 broker id 列表:newBrokerIds = curBrokerIds – liveOrShuttingDownBrokerIds;
  4. 获取掉线的 broker id 列表:deadBrokerIds = liveOrShuttingDownBrokerIds – curBrokerIds;
  5. 对于新上线的 broker,先在 ControllerChannelManager 中添加该 broker(即建立与该 Broker 的连接、初始化相应的发送线程和请求队列),最后 Controller 调用 onBrokerStartup() 上线该 Broker;
  6. 对于掉线的 broker,先在 ControllerChannelManager 中移除该 broker(即关闭与 Broker 的连接、关闭相应的发送线程和清空请求队列),最后 Controller 调用 onBrokerFailure() 下线该 Broker。

整体的处理流程如下图所示:


image.png

Broker 上线

本节主要讲述一台 Broker 上线的过程,如前面图中所示,一台 Broker 上线主要有以下两步:

controllerContext.controllerChannelManager.addBroker
controller.onBrokerStartup(newBrokerIdsSorted)
  1. 在 Controller Channel Manager 中添加该 Broker 节点,主要的内容是:Controller 建立与该 Broker 的连接、初始化相应的请求发送线程与请求队列;
  2. 调用 Controller 的 onBrokerStartup() 方法上线该节点。

Controller Channel Manager 添加 Broker 的实现如下,这里就不重复讲述了,前面讲述 Controller 服务初始化的文章( Controller Channel Manager )已经讲述过这部分的内容。

def addBroker(broker: Broker) {
  // be careful here. Maybe the startup() API has already started the request send thread
  brokerLock synchronized {
    if(!brokerStateInfo.contains(broker.id)) {
      addNewBroker(broker)
      startRequestSendThread(broker.id)
    }
  }
}

下面再看下 Controller 如何在 onBrokerStartup() 方法中实现 Broker 上线操作的,具体实现如下所示:

//note: 这个是被 副本状态机触发的
//note: 1. 发送 update-metadata 请求给所有存活的 broker;
//note: 2. 对于所有 new/offline partition 触发选主操作, 选举成功的, Partition 状态设置为 Online
//note: 3. 检查是否有分区的重新副本分配分配到了这个台机器上, 如果有, 就进行相应的操作
//note: 4. 检查这台机器上是否有 Topic 被设置为了删除标志, 如果是, 那么机器启动完成后, 重新尝试删除操作
def onBrokerStartup(newBrokers: Seq[Int]) {
  info("New broker startup callback for %s".format(newBrokers.mkString(",")))
  val newBrokersSet = newBrokers.toSet //note: 新启动的 broker
  // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new
  // broker via this update.
  // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the
  // common controlled shutdown case, the metadata will reach the new brokers faster
  //note: 发送 metadata 更新给所有的 broker, 这样的话旧的 broker 将会知道有机器新上线了
  sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
  // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
  // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
  //note:  获取这个机器上的所有 replica 请求
  val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
  //note: 将这些副本的状态设置为 OnlineReplica
  replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
  // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
  // to see if these brokers can become leaders for some/all of those
  //note: 新的 broker 上线也会触发所有处于 new/offline 的 partition 进行 leader 选举
  partitionStateMachine.triggerOnlinePartitionStateChange()
  // check if reassignment of some partitions need to be restarted
  //note: 检查是否副本的重新分配分配到了这台机器上
  val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
    case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
  }
  //note: 如果需要副本进行迁移的话,就执行副本迁移操作
  partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
  // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
  // on the newly restarted brokers, there is a chance that topic deletion can resume
  //note: 检查 topic 删除操作是否需要重新启动
  val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  if(replicasForTopicsToBeDeleted.nonEmpty) {
    info(("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. " +
      "Signaling restart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","),
      deleteTopicManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
    deleteTopicManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
  }
}

onBrokerStartup() 方法在实现的逻辑上分为以下几步:

  1. 调用 sendUpdateMetadataRequest() 方法向当前集群所有存活的 Broker 发送 Update Metadata 请求,这样的话其他的节点就会知道当前的 Broker 已经上线了;
  2. 获取当前节点分配的所有的 Replica 列表,并将其状态转移为 OnlineReplica 状态;
  3. 触发 PartitionStateMachine 的 triggerOnlinePartitionStateChange() 方法,为所有处于 NewPartition/OfflinePartition 状态的 Partition 进行 leader 选举,如果 leader 选举成功,那么该 Partition 的状态就会转移到 OnlinePartition 状态,否则状态转移失败;
  4. 如果副本迁移中有新的 Replica 落在这台新上线的节点上,那么开始执行副本迁移操作(见Kafka 源码解析之 Partition 副本迁移实现);
  5. 如果之前由于这个 Topic 设置为删除标志,但是由于其中有 Replica 掉线而导致无法删除,这里在节点启动后,尝试重新执行删除操作。

到此为止,一台 Broker 算是真正加入到了 Kafka 的集群中,在上述过程中,涉及到 leader 选举的操作,都会触发 LeaderAndIsr 请求及 Metadata 请求的发送。

Broker 掉线

本节主要讲述一台 Broker 掉线后的处理过程,正如前面图中所示,一台 Broker 掉线后主要有以下两步:

controllerContext.controllerChannelManager.removeBroker
controller.onBrokerFailure(deadBrokerIdsSorted)
  1. 首先在 Controller Channel Manager 中移除该 Broker 节点,主要的内容是:关闭 Controller 与 Broker 的连接和相应的请求发送线程,并清空请求队列;
  2. 调用 Controller 的 onBrokerFailure() 方法下线该节点。

Controller Channel Manager 下线 Broker 的处理如下所示:

def removeBroker(brokerId: Int) {
  brokerLock synchronized {
    removeExistingBroker(brokerStateInfo(brokerId))
  }
}

//note: 移除旧的 broker(关闭网络连接、关闭请求发送线程)
private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
  try {
    brokerState.networkClient.close()
    brokerState.messageQueue.clear()
    brokerState.requestSendThread.shutdown()
    brokerStateInfo.remove(brokerState.brokerNode.id)
  } catch {
    case e: Throwable => error("Error while removing broker by the controller", e)
  }
}

在 Controller Channel Manager 处理完掉线的 Broker 节点后,下面 KafkaController 将会调用 onBrokerFailure() 进行相应的处理,其实现如下:

//note: 这个方法会被副本状态机调用(进行 broker 节点下线操作)
//note: 1. 将 leader 在这台机器上的分区设置为 Offline
//note: 2. 通过 OfflinePartitionLeaderSelector 为 new/offline partition 选举新的 leader
//note: 3. leader 选举后,发送 LeaderAndIsr 请求给该分区所有存活的副本;
//note: 4. 分区选举 leader 后,状态更新为 Online
//note: 5. 要下线的 broker 上的所有 replica 改为 Offline 状态
def onBrokerFailure(deadBrokers: Seq[Int]) {
  info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
  //note: 从正在下线的 broker 集合中移除已经下线的机器
  val deadBrokersThatWereShuttingDown =
    deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
  info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
  val deadBrokersSet = deadBrokers.toSet
  // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
  //note: 1. 将 leader 在这台机器上的、并且未设置删除的分区状态设置为 Offline
  val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
    deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
      !deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
  partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
  // trigger OnlinePartition state changes for offline or new partitions
  //note: 2. 选举 leader, 选举成功后设置为 Online 状态
  partitionStateMachine.triggerOnlinePartitionStateChange()
  // filter out the replicas that belong to topics that are being deleted
  //note: 过滤出 replica 在这个机器上、并且没有被设置为删除的 topic 列表
  var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
  val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  // handle dead replicas
  //note: 将这些 replica 状态转为 Offline
  replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
  // check if topic deletion state for the dead replicas needs to be updated
  //note: 过滤设置为删除的 replica
  val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  if(replicasForTopicsToBeDeleted.nonEmpty) { //note: 将上面这个 topic 列表的 topic 标记为删除失败
    // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
    // deleted when the broker is down. This will prevent the replica from being in TopicDeletionStarted state indefinitely
    // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state
    deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
  }

  // If broker failure did not require leader re-election, inform brokers of failed broker
  // Note that during leader re-election, brokers update their metadata
  if (partitionsWithoutLeader.isEmpty) {
    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
  }
}

Controller 对于掉线 Broker 的处理过程主要有以下几步:

  1. 首先找到 Leader 在该 Broker 上所有 Partition 列表,然后将这些 Partition 的状态全部转移为 OfflinePartition 状态;
  2. 触发 PartitionStateMachine 的 triggerOnlinePartitionStateChange() 方法,为所有处于 NewPartition/OfflinePartition 状态的 Partition 进行 Leader 选举,如果 Leader 选举成功,那么该 Partition 的状态就会迁移到 OnlinePartition 状态,否则状态转移失败(Broker 上线/掉线、Controller 初始化时都会触发这个方法);
  3. 获取在该 Broker 上的所有 Replica 列表,将其状态转移成 OfflineReplica 状态;
  4. 过滤出设置为删除、并且有副本在该节点上的 Topic 列表,先将该 Replica 的转移成 ReplicaDeletionIneligible 状态,然后再将该 Topic 标记为非法删除,即因为有 Replica 掉线导致该 Topic 无法删除;
  5. 如果 leader 在该 Broker 上所有 Partition 列表不为空,证明有 Partition 的 leader 需要选举,在最后一步会触发全局 metadata 信息的更新。

到这里,一台掉线的 Broker 算是真正下线完成了。

Broker 优雅下线

前面部分是关于通过监听节点变化来实现对 Broker 的上下线,这也是 Kafka 上下线 Broker 的主要流程,但是还有一种情况是:主动关闭 Kafka 服务,这种情况又被称为 Broker 的优雅关闭。

优雅关闭的节点会向 Controller 发送 ControlledShutdownRequest 请求,Controller 在收到这个情况会进行相应的处理,如下所示:

def handleControlledShutdownRequest(request: RequestChannel.Request) {
  // ensureTopicExists is only for client facing requests
  // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
  // stop serving data to clients for the topic being deleted
  val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]

  //note: 判断该连接是否经过认证
  authorizeClusterAction(request)

  //note: 处理该请求
  val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
  //note: 返回的 response
  val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
    Errors.NONE.code, partitionsRemaining)
  requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
}

Controller 在接收这个关闭服务的请求,通过 shutdownBroker() 方法进行处理,实现如下所示:

//note: 优雅地关闭 Broker
//note: controller 首先决定将这个 broker 上的 leader 迁移到其他可用的机器上
//note: 返回还没有 leader 的迁移的 TopicPartition 集合
def shutdownBroker(id: Int): Set[TopicAndPartition] = {

  if (!isActive) {
    throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
  }

  controllerContext.brokerShutdownLock synchronized { //note: 拿到 broker shutdown 的唯一锁
    info("Shutting down broker " + id)

    inLock(controllerContext.controllerLock) { //note: 拿到 controllerLock 的排它锁
      if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
        throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))

      controllerContext.shuttingDownBrokerIds.add(id) //note: 将 broker id 添加到正在关闭的 broker 列表中
      debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
      debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
    }

    //note: 获取这个 broker 上所有 Partition 与副本数的 map
    val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] =
      inLock(controllerContext.controllerLock) {
        controllerContext.partitionsOnBroker(id)
          .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
      }

    //note: 处理这些 TopicPartition,更新 Partition 或 Replica 的状态,必要时进行 leader 选举
    allPartitionsAndReplicationFactorOnBroker.foreach {
      case(topicAndPartition, replicationFactor) =>
        // Move leadership serially to relinquish lock.
        inLock(controllerContext.controllerLock) {
          controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
            if (replicationFactor > 1) { //note: 副本数大于1
              if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { //note: leader 正好是下线的节点
                // If the broker leads the topic partition, transition the leader and update isr. Updates zk and
                // notifies all affected brokers
                //todo: 这种情况下 Replica 的状态不需要修改么?(Replica 的处理还是通过监听器还实现的,这里只是在服务关闭前进行 leader 切换和停止副本同步)
                //note: 状态变化(变为 OnlinePartition,并且进行 leader 选举,使用 controlledShutdownPartitionLeaderSelector 算法)
                partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
                  controlledShutdownPartitionLeaderSelector)
              } else {
                // Stop the replica first. The state change below initiates ZK changes which should take some time
                // before which the stop replica request should be completed (in most cases)
                try { //note: 要下线的机器停止副本迁移,发送 StopReplica 请求
                  brokerRequestBatch.newBatch()
                  brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
                    topicAndPartition.partition, deletePartition = false)
                  brokerRequestBatch.sendRequestsToBrokers(epoch)
                } catch {
                  case e : IllegalStateException => {
                    // Resign if the controller is in an illegal state
                    error("Forcing the controller to resign")
                    brokerRequestBatch.clear()
                    controllerElector.resign()

                    throw e
                  }
                }
                // If the broker is a follower, updates the isr in ZK and notifies the current leader
                //note: 更新这个副本的状态,变为 OfflineReplica
                replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
                  topicAndPartition.partition, id)), OfflineReplica)
              }
            }
          }
        }
    }
    //note: 返回 leader 在这个要下线节点上并且副本数大于 1 的 TopicPartition 集合
    //note: 在已经进行前面 leader 迁移后
    def replicatedPartitionsBrokerLeads() = inLock(controllerContext.controllerLock) {
      trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
      controllerContext.partitionLeadershipInfo.filter {
        case (topicAndPartition, leaderIsrAndControllerEpoch) =>
          leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
      }.keys
    }
    replicatedPartitionsBrokerLeads().toSet
  }
}

上述方法的处理逻辑如下:

  1. 先将要下线的 Broker 添加到 shuttingDownBrokerIds 集合中,该集合记录了当前正在进行关闭的 broker 列表;
  2. 获取副本在该节点上的所有 Partition 的列表集合;
  3. 遍历上述 Partition 列表进行处理:如果该 Partition 的 leader 是要下线的节点,那么通过 PartitionStateMachine 进行状态转移(OnlinePartition –> OnlinePartition)触发 leader 选举,使用的 leader 选举方法是 ControlledShutdownLeaderSelector,它会选举 isr 中第一个没有正在关闭的 Replica 作为 leader,否则抛出 StateChangeFailedException 异常;
  4. 否则的话,即要下线的节点不是 leader,那么就向要下线的节点发送 StopReplica 请求停止副本同步,并将该副本设置为 OfflineReplica 状态,这里对 Replica 进行处理的原因是为了让要下线的机器关闭副本同步流程,这样 Kafka 服务才能正常关闭。

我在看这部分的代码是有一个疑问的,那就是如果要下线的节点是 Partition leader 的情况下,并没有对 Replica 进行相应的处理,这里的原因是,这部分 Replica 的处理可以放在 onBrokerFailure() 方法中处理,即使通过优雅下线的方法下线了 Broker,但是监听 ZK 的 BrokerChangeListener 监听器还是会被触发的。

相关文章

网友评论

    本文标题:KafkaController源码分析之Broker的上线与下线

    本文链接:https://www.haomeiwen.com/subject/exagxltx.html