美文网首页
Kafka源码分析-Server-api层

Kafka源码分析-Server-api层

作者: 陈阳001 | 来源:发表于2019-01-29 22:24 被阅读0次

    由Kafka网络层我们知道,Handler线程会取出Processor线程放入RequestChannel的请求进行处理,并将产生的响应通过RequestChannel传递给Processor线程。Handler线程属于Kafka的Api层,Handler线程对请求的处理通过调用KafkaApis中的方法实现。

    KafkaRequestHandler

    KafkaRequestHandler的作用是从RequestChannel获取请求并调用KafkaApis.handle()方法处理请求。KafkaRequestHandler的实现:

    class KafkaRequestHandler(id: Int,
                              brokerId: Int,
                              val aggregateIdleMeter: Meter,
                              val totalHandlerThreads: Int,
                              val requestChannel: RequestChannel,
                              apis: KafkaApis) extends Runnable with Logging {
      this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
    
      def run() {
        while(true) {
          try {
            var req : RequestChannel.Request = null
            while (req == null) {
              // We use a single meter for aggregate idle percentage for the thread pool.
              // Since meter is calculated as total_recorded_value / time_window and
              // time_window is independent of the number of threads, each recorded idle
              // time should be discounted by # threads.
              val startSelectTime = SystemTime.nanoseconds
              //从RequestChannel.requestQueue获取请求通过调用requestQueue.poll()实现
              req = requestChannel.receiveRequest(300)
              val idleTime = SystemTime.nanoseconds - startSelectTime
              //统计监控
              aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
            }
              //读取RequestChannel.AllDone请求,KafkaRequestHandler线程结束
            if(req eq RequestChannel.AllDone) {
              debug("Kafka request handler %d on broker %d received shut down command".format(
                id, brokerId))
              return
            }
            req.requestDequeueTimeMs = SystemTime.milliseconds
            trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
            
            /*
            KafkaApis类中实现了处理请求的逻辑,KafkaApis还负责将响应写回对应的
            RequestChannel.responseQueue,唤醒processor处理
             */
            apis.handle(req)
          } catch {
            case e: Throwable => error("Exception when handling request", e)
          }
        }
      }
     //发送RequestChannel.AllDone
      def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
    }
    

    API层通过KafkaRequestHandlerPool来管理所有的KafkaRequestHandler线程,KafkaRequestHandlerPool是一个简单的线程池,创建了多个KafkaRequestHandler线程。KafkaRequestHandlerPool代码如下:

    class KafkaRequestHandlerPool(val brokerId: Int,
                                  val requestChannel: RequestChannel,
                                  val apis: KafkaApis,
                                  numThreads: Int) extends Logging with KafkaMetricsGroup {
    
      /* a meter to track the average free capacity of the request handlers */
      private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
    
      this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
      //用于保存执行KafkaRequestHandler线程
      val threads = new Array[Thread](numThreads)
      //KafkaRequestHandler集合
      val runnables = new Array[KafkaRequestHandler](numThreads)
      for(i <- 0 until numThreads) {
        //创建KafkaRequestHandler对象及对应的线程
        runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
        threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
        threads(i).start()//启动线程
      }
    
      def shutdown() {
        info("shutting down")
        for(handler <- runnables)
          handler.shutdown  //停止所有KafkaRequestHandler线程
        for(thread <- threads)
          thread.join  //阻塞等待所有KafkaRequestHandler线程结束
        info("shut down completely")
      }
    }
    

    KafkaApis

    KafkaApis是Kafka服务器处理请求的入口类。负责把KafkaRequestHandler传递过来的请求分发到不同的handle*()处理方法中,分发的依据是RequestChannel.Request中的requestId,此字段保存了请求的ApiKeys的值,不同的ApiKeys的值表示不同请求的类型。分发功能handle处理方法的代码实现:

    /**
       * Top-level method that handles all requests and multiplexes to the right api
       */
      def handle(request: RequestChannel.Request) {
        try {
          trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
            format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
          //根据requestId来分发请求
          ApiKeys.forId(request.requestId) match {
              //ApiKeys.PRODUCE表示ProducerRequest,
              // ProducerRequest交给handleProducerRequest方法进行处理
            case ApiKeys.PRODUCE => handleProducerRequest(request)
            //ApiKeys.FETCH 表示 FetchRequest,
            // FetchRequest 交给 handleFetchRequest 方法进行处理
            case ApiKeys.FETCH => handleFetchRequest(request)
            case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
            //ApiKeys.METADATA 表示 MetadataRequest,
            // MetadataRequest 交给 handleTopicMetadataRequest 方法进行处理
            case ApiKeys.METADATA => handleTopicMetadataRequest(request)
            case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
            case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
            case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
            case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
            case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
            case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
            case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
            case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
            case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
            case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
            case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
            case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
            case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
            case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
            case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
            case requestId => throw new KafkaException("Unknown api code " + requestId)
          }
        } catch {
          case e: Throwable =>
            if (request.requestObj != null) {
              request.requestObj.handleError(e, requestChannel, request)
              error("Error when handling request %s".format(request.requestObj), e)
            } else {
              val response = request.body.getErrorResponse(request.header.apiVersion, e)
              val respHeader = new ResponseHeader(request.header.correlationId)
    
              /* If request doesn't have a default error response, we just close the connection.
                 For example, when produce request has acks set to 0 */
              if (response == null)
                requestChannel.closeConnection(request.processor, request)
              else
                requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))
    
              error("Error when handling request %s".format(request.body), e)
            }
        } finally
          request.apiLocalCompleteTimeMs = SystemTime.milliseconds
      }
    

    相关文章

      网友评论

          本文标题:Kafka源码分析-Server-api层

          本文链接:https://www.haomeiwen.com/subject/afvfsqtx.html