美文网首页Kafka
Kafka源码分析(五)Kafka Broker逻辑骨架分析

Kafka源码分析(五)Kafka Broker逻辑骨架分析

作者: 81e2cd2747f1 | 来源:发表于2019-09-29 22:42 被阅读0次
    阅读目标:从Kafka复杂的源码中找到初始阅读源码的突破口,然后简单分析下Broker是如何处理生产者发送的消息。

    Broker和Client(P和C)通信部分是Nio实现的,虽然说有点类似简化版本的Netty,但是分析难度还是比较高,也不推荐从网络层通信层开始了解Kafka。

    虽说Netty本身有一定的难度,后续我也会写大量源码分析文章,但是阅读Netty服务端的处理代码还是相对容易的,因为骨架很清晰,只要找到被添加在ChannelPipeline的具体的Handler就可以开始分析代码。Kafka也一样,而且也一种更加面向过程式的方式描述了整个Server的处理逻辑。

    kafka.server.KafkaApis

    /**
       * Top-level method that handles all requests and multiplexes to the right api
       */
      def handle(request: RequestChannel.Request) {
        try {
          trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
            s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
          request.header.apiKey match {
            // 生产消息
            case ApiKeys.PRODUCE => handleProduceRequest(request)
            // 获取消息,包括消费者获取Broker消息和Follow获取Leader消息
            case ApiKeys.FETCH => handleFetchRequest(request)
            case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
            case ApiKeys.METADATA => handleTopicMetadataRequest(request)
            case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
            case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
            case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
            case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
            case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
            case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
            case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
            case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
            case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
            case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
            case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
            case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
            case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
            case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
            case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
            case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
            case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
            case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
            case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
            case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
            case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
            case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
            case ApiKeys.END_TXN => handleEndTxnRequest(request)
            case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
            case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
            case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
            case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
            case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
            case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
            case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
            case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
            case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
            case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
            case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
          }
        } catch {
          case e: FatalExitError => throw e
          case e: Throwable => handleError(request, e)
        } finally {
          request.apiLocalCompleteTimeNanos = time.nanoseconds
        }
      }
    

    简单分析下消息发送流程

    case ApiKeys.PRODUCE => handleProduceRequest(request)
    

    跟进handleProduceRequest(request)处理逻辑,为了使代码清晰,把复杂逻辑折叠起来

    handleProduceRequest

    源码分解

    367,368行
    val produceRequest = request.body[ProduceRequest]
    val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes
    

    将Request转成ProducerRequest并且计算整个消息的长度(header + body)

    370->382行
    if (produceRequest.hasTransactionalRecords) {...} 
    else if (produceRequest.hasIdempotentRecords && !authorize(request.session, IdempotentWrite, Resource.ClusterResource)) {...}
    

    先不细究,这里是关于Request认证,只有通过认证的Request才会被处理

    384->386行
    创建3个map,可以从map的名字简单分析出它们要存放的数据是干嘛的

    388->395行
    给上面创建的3个map存放各自的数据

    389->446行
    scala特有的语法,定义了一个叫做sendResponseCallback的函数,接收responseStatus: Map[TopicPartition, PartitionResponse]这个参数,用于给Client发送Response。此时只是定义,并没有执行。

    448->452行
    和上面类似,定义了一个叫做sendResponseCallback的函数,接收processingStats: Map这个参数,用于批量修改传入参数processingStats的内部状态

    454->471行
    如果通过认证的Request为空,那么直接给Client发送Empty的响应
    如果不为空,调用replicaManager.appendRecords来继续处理请求。显然,具体的逻辑从这里往下跟进。

    骨架大致到这里,欢迎后续继续深入分析。

    相关文章

      网友评论

        本文标题:Kafka源码分析(五)Kafka Broker逻辑骨架分析

        本文链接:https://www.haomeiwen.com/subject/gimqpctx.html