美文网首页Kafka
Kafka 源码解析之Broker请求处理流程

Kafka 源码解析之Broker请求处理流程

作者: 专职掏大粪 | 来源:发表于2019-08-11 14:27 被阅读0次

    kafka在设计上大量使用了Selector+Channel+Buffer的设计原理.所以在开始之前简单介绍一下NIO 的Selector+Channel+Buffer

    NIO 的Selector+Channel+Buffer

    Buffers(缓冲区)
    Java NIO中的Buffer用于和NIO通道进行交互。
    缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存
    标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。

    Channels(通道)
    Java NIO的通道类似流,但又有些不同:
    既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
    通道可以异步地读写。

    如下面图示是Buffers与Channel交互:


    image.png

    Selectors(选择器)

    选择器用于监听多个通道的事件(比如:连接打开,数据到达)。Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接
    下面是单线程中使用一个Selector处理3个Channel的图示:


    image.png

    Non-blocking IO(非阻塞IO)
    当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情。当数据被写入到缓冲区时,线程可以继续处理它。从缓冲区写入通道也类似。

    Broker请求处理流程

    下面通过重要环节的源码分析,来梳理请求处理的整个过程(kafka2.3)

    • KafkaServer Kafka的网络层入口类是SocketServer。
      kafka.Kafka是Kafka Broker的入口类,kafka.Kafka.main()是Kafka Server的main()方法,即Kafka Broker的启动入口。我们跟踪代码,即沿着方法调用栈kafka.Kafka.main() -> kafkaServerStartable.startup() -> KafkaServer().startup可以从main()方法入口一直跟踪到SocketServer即网络层对象的创建,这意味着Kafka Server启动的时候会初始化并启动SocketServer。
      def main(args: Array[String]): Unit = {
        try {
          val serverProps = getPropsFromArgs(args)
          val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
          // 部分省略 ... 
          kafkaServerStartable.startup()
          kafkaServerStartable.awaitShutdown()
        }
        catch {
          case e: Throwable =>
            fatal("Exiting Kafka due to fatal exception", e)
            Exit.exit(1)
        }
        Exit.exit(0)
      }
    
    class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
      private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters)
    ...
      def startup() {
        try server.startup()
        catch {
         ...
        }
      }
      }
    
    • SocketServer处理与代理之间的新连接、请求和响应。
      Kafka支持两种类型的请求

    • 数据层面:处理来自集群中的客户端和其他代理的请求。
      线程模型是每个监听器有一个Acceptor线程,用来处理新的连接。可以通过在KafkaConfig中为“ listeners”指定多个“、”分隔的endpoint来配置多个监听端口。
      Acceptor有N个处理器线程(每个线程都有自己的selector并从套接字中读取请求)和M处理程序线程(它处理请求并将响应返回给处理器线程进行编写)

    • 控制层面:处理来自控制器的请求。这是可选的,可以通过指定“control.plan .listener.name”来配置。如果没有配置,控制器请求由数据层面处理。
      线程模型是处理新连接的接受线程Acceptor有一个处理器线程(它有自己的选择器并从套接字中读取请求)和1处理程序线程,它处理请求并将响应生成回处理器线程进行编写

    • SocketServer的startup方法,创建Control和Data层面的Acceptor和Processor线程并启动所有的processor线程

     def startup(startupProcessors: Boolean = true) {
       this.synchronized {
         connectionQuotas = new ConnectionQuotas(config, time)
        //控制层面
         createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
        //数据层面
         createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
         if (startupProcessors) {
           //在控制层面启动Processor线程
           startControlPlaneProcessor()
          //在数据层面启动Processor线程
           startDataPlaneProcessors()
         } 
       }
     }
    
     private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
                                                       endpoints: Seq[EndPoint]): Unit = synchronized {
       endpoints.foreach { endpoint =>
         connectionQuotas.addListener(config, endpoint.listenerName)
       //每一个endPoint创建一个Acceptor,创建多个Processor放入processor线程数组
         val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
         addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
       }
     }
    
    • Acceptor的构造方法中,首先通过openServerSocket()打开自己负责的EndPoint的Socket,即打开端口并启动监听。
      然后,Acceptor会负责构造并管理的一个Processor的ArrayBuffer。其实,每一个Processor都是一个独立线程
    • Acceptor线程的run()方法,是不断监听对应ServerChannel上的连接请求(ACCEPT),如果有新的连接请求,使用的轮询方式将通道分配给Processor.
      新连接交付给Processor的具体的调用是在方法assignNewConnection方法中
    private[kafka] class Acceptor(val endPoint: EndPoint,
                                  val sendBufferSize: Int,
                                  val recvBufferSize: Int,
                                  brokerId: Int,
                                  connectionQuotas: ConnectionQuotas,
                                  metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
    
      private val nioSelector = NSelector.open()
      val serverChannel = openServerSocket(endPoint.host, endPoint.port)
      private val processors = new ArrayBuffer[Processor]()
    
      /**
       * Accept loop that checks for new connection attempts
       */
      def run() {
       //将ServerChannel注册到Selector,并监听ACCEPT事件
        serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
        startupComplete()
        try {
          var currentProcessorIndex = 0
          while (isRunning) {
            try {
    
              val ready = nioSelector.select(500)
              if (ready > 0) {
                val keys = nioSelector.selectedKeys()
                val iter = keys.iterator()
                while (iter.hasNext && isRunning) {
                  try {
                    val key = iter.next
                    iter.remove()
    
                    if (key.isAcceptable) {
                      accept(key).foreach { socketChannel =>
    
                        var retriesLeft = synchronized(processors.length)
                        var processor: Processor = null
                        do {
                          retriesLeft -= 1
                          processor = synchronized {
                 
                            processors(currentProcessorIndex)
                          }
                          currentProcessorIndex += 1
                        // 此处调用assignNewConnection
                        } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                      }
                    } else
                      throw new IllegalStateException("Unrecognized key state for acceptor thread.")
                  } catch {
                    case e: Throwable => error("Error while accepting connection", e)
                  }
                }
              }
            }
            catch {
            // ...
            }
          }
        } 
      //...
      }
    
    
    • assignNewConnection中通过processor.accept的调用,将SocketChannel放入每个processor自己维护的新连接的队列,后面processor会从队列取出做后续处理
      private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
        //调用processor.accept
        if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
         // ...
          true
        } else
          false
      }
    
    • 每一个Processor都维护了一个单独的Selector对象,这个Selector只负责这个Processor上所有channel的监听。这样最大程度上保证了不同Processor线程之间的完全并行和业务隔离.同时每一个processor维护一个responseQueue,用于KafkaRequestHandler交互,在下面的流程会提到
    private[kafka] class Processor(val id: Int,
                                   time: Time,
                                   maxRequestSize: Int,
                                   requestChannel: RequestChannel,
                                   connectionQuotas: ConnectionQuotas,
                                   connectionsMaxIdleMs: Long,
                                   failedAuthenticationDelayMs: Int,
                                   listenerName: ListenerName,
                                   securityProtocol: SecurityProtocol,
                                   config: KafkaConfig,
                                   metrics: Metrics,
                                   credentialProvider: CredentialProvider,
                                   memoryPool: MemoryPool,
                                   logContext: LogContext,
                                   connectionQueueSize: Int = ConnectionQueueSize) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
    // 维护一个新连接队列,在run方法里会取出处理
     private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
      //每一个processor维护一个responseQueue
      private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
    
    // processor都维护了一个单独的Selector
     private val selector = createSelector(
        ChannelBuilders.serverChannelBuilder(listenerName,
          listenerName == config.interBrokerListenerName,
          securityProtocol,
          config,
          credentialProvider.credentialCache,
          credentialProvider.tokenCache,
          time))
      // Visible to override for testing
      protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
        channelBuilder match {
          case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)
          case _ =>
        }
        new KSelector(
          maxRequestSize,
          connectionsMaxIdleMs,
          failedAuthenticationDelayMs,
          metrics,
          time,
          "socket-server",
          metricTags,
          false,
          true,
          channelBuilder,
          memoryPool,
          logContext)
      }
    
      override def run() {
         //表示初始化流程已经结束,通过这个CountDownLatch代表初始化已经结束,这个Processor已经开始正常运行了
        startupComplete() 
        try {
          while (isRunning) {
            try {
              // setup any new connections that have been queued up
              configureNewConnections()
              // register any new responses for writing
             //处理响应队列,这个响应队列是Handler线程处理以后的结果,会交付给RequestChannel.responseQueue.同时调用unmute,开始接受请求
              processNewResponses() 
              //调用KSelector.poll(),进行真正的数据读写
              poll()
              //调用Selector.mute,不再接受Read请求,发送响应之前,不可以再接收任何请求
              processCompletedReceives()
              processCompletedSends()
              processDisconnected()
              closeExcessConnections()
            } catch {
             // ...
            }
          }
        } finally {
          // ...
        }
      }
      }
    
    • run方法中configureNewConnections是processor从自己维护的newConnections队列取出新连接,并将其注册到selector并监听OR_READ事件。configureNewConnections 内部调用register()方法,会将新接收的新连接SocketChannel注册到服务器端的Selector,并监听OP_READ事件,如果发生读请求,可以取出对应的request进行后续处理
     private def configureNewConnections() {
        var connectionsProcessed = 0
        while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {
         // 取出新连接SocketChannel
          val channel = newConnections.poll()
          try {
         
           // 将SocketChannel注册到selector
            selector.register(connectionId(channel.socket), channel)
            connectionsProcessed += 1
          } catch {
           
            case e: Throwable =>
              // ...
          }
        }
      }
    
     public void register(String id, SocketChannel socketChannel) throws IOException {
            ensureNotRegistered(id);
            registerChannel(id, socketChannel, SelectionKey.OP_READ);
            this.sensors.connectionCreated.record();
        }
    
    • RequestChannel 负责消息从网络层转接到业务层,以及将业务层的处理结果交付给网络层进而返回给客户端。每一个SocketServer只有一个RequestChannel对象,在SocketServer中构造。RequestChannel构造方法中初始化了requestQueue,用来存放网络层接收到的请求,这些请求即将交付给业务层进行处理。同时,初始化了responseQueues,为每一个Processor建立了一个response队列,用来存放这个Processor的一个或者多个Response,这些response即将交付给网络层返回给客户端。
        class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup {
          import RequestChannel._
          val metrics = new RequestChannel.Metrics
          private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
          private val processors = new ConcurrentHashMap[Int, Processor]()
        
          /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
          def sendRequest(request: RequestChannel.Request) {
            requestQueue.put(request)
          }
    
        
          }
    
        }
    
    
    • Processor.processCompletedReceives()通过遍历completedReceives,对于每一个已经完成接收的数据,对数据进行解析和封装,交付给RequestChannel,RequestChannel会交付给具体的业务处理层进行处理。其中RequestChannel拿到请求数据,会调用RequestChannel.sendRequest方法,将请求put到requestQueue中,以供后续的处理请求线程处理
      private def processCompletedReceives() {
    
        selector.completedReceives.asScala.foreach { receive =>
          try {
            openOrClosingChannel(receive.source) match {
              case Some(channel) =>
           
                else {
                  val nowNanos = time.nanoseconds()
                  if (channel.serverAuthenticationSessionExpired(nowNanos)) {
                   // ...
                  } else {
                   
                  //将请求通过RequestChannel.requestQueue交付给Handler
                    requestChannel.sendRequest(req)
                    selector.mute(connectionId)//不再接受Read请求,发送响应之前,不可以再接收任何请求
                    handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
                  }
                }
            
          } catch {
           // ...
          }
        }
      }
    
    
    • KafkaRequestHandler请求处理线程和KafkaRequestHandlerPool线程池
      KafkaRequestHandler 主要关注run方法,该方法的具体逻辑是从RequestChannel取出processor之前put请求,调用KafkaApi针对不同请求类型分别处理
    class KafkaRequestHandler(id: Int,
                              brokerId: Int,
                              val aggregateIdleMeter: Meter,
                              val totalHandlerThreads: AtomicInteger,
                              val requestChannel: RequestChannel,
                              apis: KafkaApis,
                              time: Time) extends Runnable with Logging {
      def run() {
        while (!stopped) {
        
          //从RequestChannel.requestQueue中取出请求
          val req = requestChannel.receiveRequest(300)
          req match {
            case RequestChannel.ShutdownRequest =>
              shutdownComplete.countDown()
              return
    
            case request: RequestChannel.Request =>
              try {
              // 调用KafkaApi.handle(),将请求交付给业务
                apis.handle(request)
              } catch {
              // 异常处理 ...
              } finally {
                request.releaseBuffer()
              }
    
            case null => // continue
          }
        }
        shutdownComplete.countDown()
      }
    
    
    • KafkaRequestHandlerPool构造方法中初始化并启动了多个KafkaRequestHandler线程对象,线程池大小通过Kafka配置文件配置项num.io.threads进行配置。
      KafkaRequestHandlerPool线程池中的所有KafkaRequestHandler,通过竞争方式从RequestChannel.requestQueue中获取请求进行处理。由于requestQueue的类型是ArrayBlockingQueue,通过调用ArrayBlockingQueue.poll()方法取出请求.
    class KafkaRequestHandlerPool(val brokerId: Int,
                                  val requestChannel: RequestChannel,
                                  val apis: KafkaApis,
                                  time: Time,
                                  numThreads: Int,
                                  requestHandlerAvgIdleMetricName: String,
                                  logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
    
      private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
    
      //初始化由KafkaRequestHandler线程构成的线程数组
      val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
      for (i <- 0 until numThreads) {
        createHandler(i)
      }
      def createHandler(id: Int): Unit = synchronized {
        runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
        KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
      }
    // ...
    }
    
    • KafkaApis类似一个工具类,解析用户请求并将请求交付给业务层,我们可以把它看做Kafka的API层。从上面KafkaRequestHandler.run()方法可以看到,这是通过调用KafkaApis.handle()方法完成的
      def handle(request: RequestChannel.Request) {
          request.header.apiKey match {
            case ApiKeys.PRODUCE => handleProduceRequest(request)
            case ApiKeys.FETCH => handleFetchRequest(request)
            case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
            case ApiKeys.METADATA => handleTopicMetadataRequest(request)
            case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
                    //其它ApiKeys,略
                   //异常处理略
          }
      
      }
    
    • 我们以ApiKeys.PRODUCE 的流程来分析后续流程,handleProduceRequest方法中有两个重要的方法sendResponseCallback()和replicaManager.appendRecords() .其中sendResponseCallback回调函数中调用requestChannel.sendResponse()将response交付给RequestChannel
     def handleProduceRequest(request: RequestChannel.Request) {
        val produceRequest = request.body[ProduceRequest]
    
            // 回调函数,内部将业务层处理的最终结果发送到对应processor负责的响应队列
        def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
        
          // Send the response immediately. In case of throttling, the channel has already been muted.
          if (produceRequest.acks == 0) {
            // 通过RequestChannel将response放入processor的响应队列,调用requestChannel.sendResponse()将response交付给RequestChannel
            sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None)
          }
        }
          // appendRecords方法是records写入的逻辑
          replicaManager.appendRecords(
            timeout = produceRequest.timeout.toLong,
            requiredAcks = produceRequest.acks,
            internalTopicsAllowed = internalTopicsAllowed,
            isFromClient = true,
            entriesPerPartition = authorizedRequestInfo,
            responseCallback = sendResponseCallback,
            recordConversionStatsCallback = processingStatsCallback)
        // ... 
        }
      }
    
    • 最后,在上文讲解Processor的时候说过,Procossor.processNewResponses()就是从requestChannel.responseQueues取出属于自己的连接上的响应,准备返回给客户端

    一图胜千言,最后通过一张图来回顾整个Broker请求处理流程

    整体流程图示如下:


    image.png

    参考自

    https://blog.csdn.net/zhanyuanlin/article/details/76906583
    http://ifeve.com/channels/

    相关文章

      网友评论

        本文标题:Kafka 源码解析之Broker请求处理流程

        本文链接:https://www.haomeiwen.com/subject/pjejjctx.html