美文网首页kafka
KafkaController之LeaderAndIsr请求

KafkaController之LeaderAndIsr请求

作者: tracy_668 | 来源:发表于2021-02-14 18:17 被阅读0次

[TOC]
在KafkaController初始化的过程中,多次遇见了LeaderAndIsr请求,这是broker之间通信的一个重要请求,它也是副本同步的关键步骤,本文主要分析KafkaApis对该请求的处理

ControllerChannelManager

在讲解LeaderAndIsr请求之前,我们先来看下ControllerChannelManager,在kafka-server端源码分析之Controller选举与初始化我曾提到过它,说它是broker之间通信的管理器,那么它是如何工作的呢?

又见内存队列
和ControllerEventManager一样,ControllerChannelManager也是用的异步内存队列来处理请求的发送,它只用于Controller节点和其它broker通信,它的大致原理如下:

  • ControllerBrokerRequestBatch用3个Map分别维护了leaderAndIsrRequest,stopReplicaRequest,updateMetadataRequest三种请求的缓存
  • 当KafkaController等组件想要发送请求时,仅仅是通过addXXXRequestForBrokers方法,将请求参数添加到缓存中,而在调用sendRequestsToBrokers方法后,它会遍历3中请求的缓存,将请求参数,回调函数等封装为QueueItem对象,放入一个类型为BlockingQueue[QueueItem]的messageQueue中
  • 在RequestSendThread线程启动后,从messageQueue中取出请求对象,发送请求,响应后调用回调函数进行处理

请求流程如下

image.png

请求对象解析

添加LeaderAndIsr请求到缓存

虽然这个方法很简单,但我需要提2个关键点

  • 第一个参数叫brokerIds,但是调用时传的是replicaIds或者Isr,这里要加强大家对副本id即brokerId的印象
  • 注意最后面还添加了一个UpdateMetadata请求
/**
  * @param brokerIds 通常是副本id,这里也是要请求的目标broker
  * @param topicPartition 分区
  * @param leaderIsrAndControllerEpoch leader, isr,controllerEpoch
  * @param replicas 通常是controllerContext缓存的分区副本集合
  * @param isNew 新建副本,新建分区是为true
  */
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topicPartition: TopicPartition,
                                     leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
                                     replicas: Seq[Int], isNew: Boolean) {
  brokerIds.filter(_ >= 0).foreach { brokerId =>
    // 每个broker的LeaderAndIsr请求都有一个缓存
    // result: Map[TopicPartition, LeaderAndIsrRequest.PartitionState]
    val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
    val alreadyNew = result.get(topicPartition).exists(_.isNew)
    // 添加到目标broker的 leaderAndIsr请求队列中
    result.put(topicPartition, new LeaderAndIsrRequest.PartitionState(leaderIsrAndControllerEpoch.controllerEpoch,
      leaderIsrAndControllerEpoch.leaderAndIsr.leader,
      leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch,
      leaderIsrAndControllerEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava,
      leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion,
      replicas.map(Integer.valueOf).asJava,
      isNew || alreadyNew))
  }
  // 同时增加了一次UpdateMetadata请求
  addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
}

请求体的推理需要点篇幅,我这里直接贴出一个请求的样例json, 其中isNew只有在新分区,新副本请求时才为true,leaderEpoch在后续的副本同步会讲到,用于保证recovery时的数据一致性
liveLeaders表示的是上面partitionStates参数中每个分区leader所在的broker(存活的)

{
    "version":1,
    "controllerId":1,
    "controllerEpoch":1,
    "partitionStates":[
        {
            "TopicPartition":"test-0",
            "PartitionState":{
                "isNew":false,
                "basePartitionState":{
                    "controllerEpoch":1,
                    "leader":1,
                    "leaderEpoch":1,
                    "isr":[ 0,1,2],
                    "zkVersion":1,
                    "replicas":[ 0,1,2]
                }
            }
        }
    ],
    "liveLeaders":[
        {
            "id":1,
            "idString":"1",
            "host":"localhost",
            "port":9092,
            "rack":"rack-1"
        }
    ]
}

打破思维定式的假设

这里我主要想分享一点我的经验,不要死心眼的认为broker有3个,比如现在的情况是

15台broker,有一个叫test的topic,它有12个分区,每个分区3个副本,以第一个分区test-0为例,它目前的leader是8,即第8台broker上的test-0分区的副本是leader, ISR列表为[8,10,14], 它的replica是[8,10,14],即所有副本都在同步列表

现在Controller是broker-0,LeaderAndISR请求需要变更一批分区的信息,其中刚好有一个要把test-0的leader变为10,因此它要向broker 8,10,14发送LeaderAndIsr请求,下面的请求讲解都以这个为例

image.png

注:只有15台broker,最后一个有16,19不是我写错了

可以看到这一批LeaderAndIsr请求要发送到多个broker,leaderAndIsrRequestMap的类型是Map[brokerId, Map[TopicPartition, LeaderAndIsrRequest.PartitionState]],发送的代码如下

leaderAndIsrRequestMap.foreach { case (broker, leaderAndIsrPartitionStates) =>
  leaderAndIsrPartitionStates.foreach { case (topicPartition, state) =>
    
  val leaderIds = leaderAndIsrPartitionStates.map(_._2.basePartitionState.leader).toSet

  val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
    _.node(controller.config.interBrokerListenerName)
  }

  val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
    controllerEpoch, leaderAndIsrPartitionStates.asJava, leaders.asJava)

  controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequestBuilder,
    (r: AbstractResponse) => controller.eventManager.put(controller.LeaderAndIsrResponseReceived(r, broker)))
}

可以看到kafka的本意就是积攒一批请求,然后按照brokerId分组,再发送出去,和生产者发送消息是同样的味道

再看上面的表格,变更后的ISR与Replica列表就是我们要发送的broker,我这里故意让三个分区都包含10,那么我们往broker-10发送的LeaderAndIsr请求同时包含3个分区的信息变更请求

KafkaApis处理LeaderAndIsr请求

LeaderAndIsr请求由handleLeaderAndIsrRequest方法处理,仅做了2件事:定义回调函数,认证预处理,关键的处理在调用的becomeLeaderOrFollower方法中

def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
  val correlationId = request.header.correlationId
  val leaderAndIsrRequest = request.body[LeaderAndIsrRequest]

  def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
    // 按惯例,事先定义好的回调函数先不看,扰乱我们的视线
  }

  if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { // 认证步骤不在细究
    val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
    sendResponseExemptThrottle(request, response)
  } 
  // 省略
}

becomeLeaderOrFollower

该方法分为三段,前面部分只是做了下检查,中间部分是我们重点关注的

def becomeLeaderOrFollower(correlationId: Int,
                           leaderAndIsrRequest: LeaderAndIsrRequest,
                           onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {

  replicaStateChangeLock synchronized {
    if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
      // Controller已换届,忽略leaderAndIsr请求,即请求过期
     
      leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
    } else {
      val responseMap = new mutable.HashMap[TopicPartition, Errors]

      val controllerId = leaderAndIsrRequest.controllerId
      // 更新controllerEpoch,记录了最新一次执行LeaderAndIsr请求的controllerEpoch
      // controller选举必定会发生LeaderAndIsr请求
      controllerEpoch = leaderAndIsrRequest.controllerEpoch

      // First check partition's leader epoch
      val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()

      // 缓存里没有的是新分区
      val newPartitions = leaderAndIsrRequest.partitionStates.asScala.keys.filter(topicPartition => getPartition(topicPartition).isEmpty)

      // 一堆检查,省略部分代码... 
      leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
        val partition = getOrCreatePartition(topicPartition) // 返回的是Partition对象,新的partition有Pool的valueFactory初始化
        val partitionLeaderEpoch = partition.getLeaderEpoch
        if (partitionLeaderEpoch < stateInfo.basePartitionState.leaderEpoch) { 
          // 本地缓存的leader epoch要比请求中的leader epoch小,因为请求里的leader epoch是加1了的
          // 最终想要的数据
          partitionState.put(partition, stateInfo)
        }
      }

      // ================重点关注下面的代码==================

      // 过滤出leader是当前broker的分区,*要将当前broker上的副本变为leader* 这句话最重要
      val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
        stateInfo.basePartitionState.leader == localBrokerId
      }
      // 其余的副本在当前broker都是follower
      val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys

      val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
        // 标记为leader的分区
        makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
      else
        Set.empty[Partition]

      val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
      // 标记为follower的分区
        makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap)
      else
        Set.empty[Partition]

      // ================重点关注==================


      // 先不看 ....
      leaderAndIsrRequest.partitionStates.asScala.keys.foreach(topicPartition =>
        if (getReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition))
          allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
      )

      if (!hwThreadInitialized) {
        startHighWaterMarksCheckPointThread()
        hwThreadInitialized = true
      }

      val newOnlineReplicas = newPartitions.flatMap(topicPartition => getReplica(topicPartition))
      val futureReplicasAndInitialOffset = newOnlineReplicas.filter { replica =>
        logManager.getLog(replica.topicPartition, isFuture = true).isDefined
      }.map { replica =>
        replica.topicPartition -> BrokerAndInitialOffset(BrokerEndPoint(config.brokerId, "localhost", -1), replica.highWatermark.messageOffset)
      }.toMap
      futureReplicasAndInitialOffset.keys.foreach(tp => getPartition(tp).get.getOrCreateReplica(Request.FutureLocalReplicaId))

      futureReplicasAndInitialOffset.keys.foreach(logManager.abortAndPauseCleaning)
      replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)

      replicaFetcherManager.shutdownIdleFetcherThreads()
      replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
      onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
      new LeaderAndIsrResponse(Errors.NONE, responseMap.asJava)
    }
  }
}
image.png

根据前面的假设,broker-0(controller)向broker-10发送了一批分区变更需求请求,假设当前处理请求的是broker-10,先进行分组

partitionsTobeLeader: 变更后的leader是当前broker,也就是broker-10的一组,即test-0分区,用makeLeaders方法处理

我们先看makeLeaders方法,它首先停止了这些副本的同步操作,然后遍历每个分区处理

private def makeLeaders(controllerId: Int,
                          epoch: Int,
                          partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
                          correlationId: Int,
                          responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = {
  // 返回结果
  val partitionsToMakeLeaders = mutable.Set[Partition]()
  // 从fetch线程中移除这些分区副本的同步操作
  replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
  //遍历每一个分区,调用makeLeader
  partitionState.foreach{ case (partition, partitionStateInfo) =>
    if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) {
      partitionsToMakeLeaders += partition
    }
}

那么我们直接来到Partition的makeLeader方法,它是处理单个分区信息变更的方法

makeLeader

看代码之前先稍微解释下leader epoch以及leader-epoch-checkpoint文件

leader epoch在分区的leader副本变更时更新,每次更新加1,相当于记录了分区leader的更新次数,也可以理解为leader的版本号

leader-epoch-checkpoint在每一个分区日志目录都有一个,这里以topic为test-1,分区为0的日志目录为例
它的内容是一个key value,key是leader epoch,value是上一代leader的LEO,我们知道LEO是即将写入的下一条消息的offset,这里也可以理解为新leader要写入的第一条消息

image.png

它里面的内容一般是这样的,其他check-point文件也是同理

0
1
2 9832

第一行的0表示版本号,第二行表示记录个数,第三行才是真正的数据

言归正传,继续看makeLeader方法。该方法更新了本地的一些缓存,如controllerEpoch,inSyncReplicas,leaderEpoch,leaderEpochStartOffsetOpt(上面说的value),zkVersion。接着更新了check-point文件

最后是关于新的leader副本的处理,比如初始化它的HW

def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
  val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
    // 请求中的AR
    val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
    
    // Partition里也有一份controllerEpoch,更新
    controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch

    // 获取isr对应的Replica
    val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet

    // 副本重分配场景: 该分区已有的副本-新分配的副本=controller要移除的副本,从本地缓存allReplicasMap = new Pool[Int, Replica]中删除
    (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)

    // 新的isr是controller传过来的,更新
    inSyncReplicas = newInSyncReplicas

    // 获取replicas对应的Replica
    newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew))

    // 不是说当前replica是leader副本,而是说它即将要成为leader
    val leaderReplica = getReplica().get
    // 获取leader副本的LEO
    val leaderEpochStartOffset = leaderReplica.logEndOffset.messageOffset

    // 更新leaderEpoch,以及这一届leaderEpoch对应的StartOffset
    leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
    leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
    zkVersion = partitionStateInfo.basePartitionState.zkVersion

    // 将leader epoch及其开始位移写入文件
    leaderReplica.epochs.foreach { epochCache =>
      epochCache.assign(leaderEpoch, leaderEpochStartOffset)
    }

    // 如果分区的leader副本就是当前broker,就不用变更了
    // 注:看becomeLeaderOrFollower方法的星号注释
    val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)

    val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset // leader副本的LEO
    val curTimeMs = time.milliseconds
    // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
    // 更新副本的同步时间,LEO
    (assignedReplicas - leaderReplica).foreach { replica =>
      val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
      replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
    }

    if (isNewLeader) {
      // construct the high watermark metadata for the new leader replica
      // 初始化HW(大概率就是当前的HW)
      leaderReplica.convertHWToLocalOffsetMetadata()
      // mark local replica as the leader after converting hw
      leaderReplicaIdOpt = Some(localBrokerId)
      // reset log end offset for remote replicas
      // 初始化同步相关的一堆参数
      assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
    }
    // we may need to increment high watermark since ISR could be down to 1
    (maybeIncrementLeaderHW(leaderReplica), isNewLeader)
  }
  // some delayed operations may be unblocked after HW changed
  if (leaderHWIncremented)
    // HW增加了,fetch请求的max.byte,produce请求的ack=-1等待副本同步就可以try complete了,
    tryCompleteDelayedRequests()
  isNewLeader
}

makeLeader的作用可以简单归纳为:

  1. 更新本地缓存数据
  2. 更新leader epoch到文件
  3. 如果本地副本不是leader,那么初始化它的HW,以及同步相关的参数

处理流程如下:


image.png

Follower副本处理

在becomeLeaderOrFollower方法中,makeLeaders处理完leader副本后,makeFollowers方法处理follower副本
该方法同样是遍历每一个分区

private def makeFollowers(controllerId: Int,
                            epoch: Int,
                            partitionStates: Map[Partition, LeaderAndIsrRequest.PartitionState],
                            correlationId: Int,
                            responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = {


  // 定义返回结果
  for (partition <- partitionStates.keys)
    responseMap.put(partition.topicPartition, Errors.NONE)

  // 记录转变为follower副本的分区
  val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()

  partitionStates.foreach { case (partition, partitionStateInfo) =>
    val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
      // 找到leader所在的broker
      metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
        case Some(_) =>
          // makeFollower做主要初始化及更新操作
          if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
            partitionsToMakeFollower += partition
        case None =>
          // 没有就创建,这在分区副本重分配时有用
          partition.getOrCreateReplica(isNew = partitionStateInfo.isNew)
      }

  // leader要发生改变,不能再从以前的leader同步
  replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))

  // 尝试完成一些延迟请求
  partitionsToMakeFollower.foreach { partition =>
    val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
    tryCompleteDelayedProduce(topicPartitionOperationKey)
    tryCompleteDelayedFetch(topicPartitionOperationKey)
  }

  // broker在关闭了
  if (isShuttingDown.get()) {
    // 记录日志....
  }
  else {
    // 正常处理
    // we do not need to check if the leader exists again since this has been done at the beginning of this process
    val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
      // leader所在的broker和当前broker副本的HW作为初始同步位移
      partition.topicPartition -> BrokerAndInitialOffset(
        metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.brokerEndPoint(config.interBrokerListenerName),
        partition.getReplica().get.highWatermark.messageOffset)).toMap
    // 添加到副本到同步线程
    replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
  }
  partitionsToMakeFollower
}

上面调用的makeFollower和makeLeader方法类似

def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
  inWriteLock(leaderIsrUpdateLock) {
    val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
    val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
    val oldLeaderEpoch = leaderEpoch
    // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
    // to maintain the decision maker controller's epoch in the zookeeper path
    controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
    // add replicas that are new
    newAssignedReplicas.foreach(r => getOrCreateReplica(r, partitionStateInfo.isNew))
    // remove assigned replicas that have been removed by the controller
    // 删除缓存里不要的副本了
    (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)

    inSyncReplicas = Set.empty[Replica] 
    leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
    leaderEpochStartOffsetOpt = None
    zkVersion = partitionStateInfo.basePartitionState.zkVersion


    // leader是否更新了
    if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && (leaderEpoch == oldLeaderEpoch || leaderEpoch == oldLeaderEpoch + 1)) {
      false
    }
    else {
      leaderReplicaIdOpt = Some(newLeaderBrokerId)
      true
    }
  }
}

makeFollowers主要判断分区的leader副本是否发生了改变,如果改变了,就先移除原来的同步,重新向新leader同步

image.png

becomeLeaderOrFollower第三部分
becomeLeaderOrFollower在调用makeLeaders和makeFollowers之后,处理的源码如下

leaderAndIsrRequest.partitionStates.asScala.keys.foreach(topicPartition =>
  // 判断离线分区
  if (getReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition))
    allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
)

// 为初始化的LeaderAndIsr请求启动hw的check线程,记录到recovery-point-offset-checkpoint文件
if (!hwThreadInitialized) {
  startHighWaterMarksCheckPointThread()
  hwThreadInitialized = true
}

val newOnlineReplicas = newPartitions.flatMap(topicPartition => getReplica(topicPartition))
// Add future replica to partition's map
val futureReplicasAndInitialOffset = newOnlineReplicas.filter { replica =>
  // 新副本就是isFuture副本
  logManager.getLog(replica.topicPartition, isFuture = true).isDefined
}.map { replica =>
  replica.topicPartition -> BrokerAndInitialOffset(BrokerEndPoint(config.brokerId, "localhost", -1), replica.highWatermark.messageOffset)
}.toMap
futureReplicasAndInitialOffset.keys.foreach(tp => getPartition(tp).get.getOrCreateReplica(Request.FutureLocalReplicaId))

// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move replica from source dir to destination dir
futureReplicasAndInitialOffset.keys.foreach(logManager.abortAndPauseCleaning)
replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)

// 看是否有空闲的fetcher线程
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()

// 调用handleLeaderAndIsrRequest中的回调函数
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
new LeaderAndIsrResponse(Errors.NONE, responseMap.asJava)

该部分主要是对future Replica的处理,它们会同步leader副本,之后清空空闲的fetcher线程,这里大家先理解一个fetcher线程管理了多个follower的同步

最后调用handleLeaderAndIsrRequest中的回调函数:onLeadershipChange,下面是该方法的源码

内部topic的特殊处理

kafka内部的topic有2个:__consumer_offsets和__transaction_state,它们的LeaderAndIsr处理比较复杂,这里不再展开细说。

def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
  // for each new leader or follower, call coordinator to handle consumer group migration.
  // this callback is invoked under the replica state change lock to ensure proper order of
  // leadership changes
  updatedLeaders.foreach { partition =>
    if (partition.topic == GROUP_METADATA_TOPIC_NAME)
      groupCoordinator.handleGroupImmigration(partition.partitionId)
    else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
      txnCoordinator.handleTxnImmigration(partition.partitionId, partition.getLeaderEpoch)
  }

  updatedFollowers.foreach { partition =>
    if (partition.topic == GROUP_METADATA_TOPIC_NAME)
      groupCoordinator.handleGroupEmigration(partition.partitionId)
    else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
      txnCoordinator.handleTxnEmigration(partition.partitionId, partition.getLeaderEpoch)
  }
}

总结

LeaderAndIsr请求是在分区leader或者副本集合发生变更时,Controller向其它broker发生的请求,broker在接收到请求后会看分区的新leader是否是当前broker的id

  • 如果是,则先暂停该分区本地副本的同步,因为它们从follower变为leader了,然后更新元数据,记录leader epoch checkpoint等,最终初始化当前副本为leader副本
  • 如果不是,则本地broker上的副本为follower副本,同样的更新本地缓存的元数据,此时按leader是否发生了改变分为2中情况
    1. leader改变了,那么移除当前同步线程对这些副本的同步,重新定位leader所在broker,以当前副本的HW为起始位移加入到副本同步线程中去
    2. leader没有变,什么都不做

相关文章

网友评论

    本文标题:KafkaController之LeaderAndIsr请求

    本文链接:https://www.haomeiwen.com/subject/vsoyxltx.html