美文网首页玩转大数据程序员大数据
ReplicaManager源码解析2-LeaderAndIsr

ReplicaManager源码解析2-LeaderAndIsr

作者: 扫帚的影子 | 来源:发表于2017-02-08 18:17 被阅读589次
    • 消息从客户端产生后,发送到哪个broker;
    • 发送到broker后,broker如何接收,如何存储;

    KafkaApis中响应LeaderAndIsr Request

        val correlationId = request.header.correlationId
        val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]
    
          def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
            // for each new leader or follower, call coordinator to handle consumer group migration.
            // this callback is invoked under the replica state change lock to ensure proper order of
            // leadership changes
            updatedLeaders.foreach { partition =>
              if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
                coordinator.handleGroupImmigration(partition.partitionId)
            }
            updatedFollowers.foreach { partition =>
              if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
                coordinator.handleGroupEmigration(partition.partitionId)
            }
          }
    
          val responseHeader = new ResponseHeader(correlationId)
          val leaderAndIsrResponse=
            if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
              val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
              new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
            } else {
              val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
              new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
            }
    
          requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, leaderAndIsrResponse)))
    

    其中最主要的操作调用ReplicaManager.becomeLeaderOrFollower来初始化Partition

    val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
    
    • ReplicaManager.becomeLeaderOrFollower
    1. 判断LeaderAndIsr请求中的controllerEpoch和ReplicaManager保存的controllerEpoch(在处理UpdateMetadata Request时更新, 参见Kafka集群Metadata管理), 如果本地存的controllerEpoch大,则忽略当前的LeaderAndIsr请求, 产生BecomeLeaderOrFollowerResult(responseMap, ErrorMapping.StaleControllerEpochCode)
    2. 处理leaderAndISRRequest.partitionStates中的第个partition state;
      2.1 创建Partition对象,这个我们后面会讲到;
    allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, time, this))
    

    2.2 如果partitionStateInfo中的leaderEpoch更新,则存储它在val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()

    if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
              if(stateInfo.replicas.contains(config.brokerId))
                     partitionState.put(partition, stateInfo)
    }
    

    2.3 分离出转换成leader和follower的partitions;

     val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) =>
              stateInfo.leader == config.brokerId
            }
      val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
    

    2.4 处理转换成leader

    makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
    

    实现上干两件事:
    停止从leader来同步消息: replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))),参见ReplicaManager源码解析1-消息同步线程管理
    调用Partition的makeLeader方法:partition.makeLeader(controllerId, partitionStateInfo, correlationId)来作leader的转换
    2.5 处理转换成follower

    makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.correlationId, responseMap, metadataCache)
    

    2.6 启动HighWaterMarkCheckPointThread, 具体后面章节会讲到,

    if (!hwThreadInitialized) {
              startHighWaterMarksCheckPointThread()
              hwThreadInitialized = true
    }
    

    2.7 回调KafkaApis.handleLeaderAndIsrRequest.onLeadershipChange

    • 针对makeLeadersmakeFollowers的分析我们等分析完Parition, ReplicaFetcherManager后一并分析.
    • LeaderAndIsr 请求响应流程图:
    LeaderAndIsr 请求响应.png

    Kafka源码分析-汇总

    相关文章

      网友评论

        本文标题:ReplicaManager源码解析2-LeaderAndIsr

        本文链接:https://www.haomeiwen.com/subject/weimittx.html