美文网首页kafka玩转大数据大数据
KafkaController分析3-ControllerCha

KafkaController分析3-ControllerCha

作者: 扫帚的影子 | 来源:发表于2017-01-16 18:35 被阅读170次
    • KafkaController的作用前面我们已经简单介绍过, 基于此KafkaController需要与其他的broker node通信,发送请求;
    • ControllerChannelManager用来管理与其他所有的broker node的网络连接和请求发送等;

    ControllerChannelManager

    • 所在文件: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
    • 创建到各个broker node的连接, 每个连接对应一个新的线程
    controllerContext.liveBrokers.foreach(addNewBroker(_))
    
    • 创建到单个broker node的连接
    private def addNewBroker(broker: Broker) {
        ...
        val networkClient = {
          val selector = new Selector(
            ...
          )
          new NetworkClient(
            selector,
            new ManualMetadataUpdater(Seq(brokerNode).asJava),
            config.brokerId.toString,
            1,
            0,
            Selectable.USE_DEFAULT_BUFFER_SIZE,
            Selectable.USE_DEFAULT_BUFFER_SIZE,
            config.requestTimeoutMs,
            time
          )
        }
    ...
        val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
          brokerNode, config, time, threadName)
        requestThread.setDaemon(false)
        brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
      }
    

    使用NetworkClient连接到broker node, 使用selector处理网络IO;

    • 发送线程RequestSendThread, 继承自ShutdownableThread, 需要发送的request会被add到val queue: BlockingQueue[QueueItem]中, 然后在doWork中被不断取出val QueueItem(apiKey, apiVersion, request, callback) = queue.take(), 再通过clientResponse = networkClient.blockingSendAndReceive(clientRequest, socketTimeoutMs)被发送直到clientResponse返回
    • 主要处理下面三种类型的请求:
    val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
                case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
                case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)
                case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)
                case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
              }
    
    • 如果设置了回调, 则
    if (callback != null) {
                callback(response)
    }
    

    ControllerBrokerRequestBatch

    • 所在文件: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
    • 使用ControllerChannelManagersendRequest方法来批量发送请求到broker node;
    • 主要处理以下三种请求:
     val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
      val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
      val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
    
    Kafka源码分析-汇总

    相关文章

      网友评论

        本文标题:KafkaController分析3-ControllerCha

        本文链接:https://www.haomeiwen.com/subject/qpgubttx.html