美文网首页kafkaJava学习笔记程序员
Kafka初始化流程与请求处理

Kafka初始化流程与请求处理

作者: 扫帚的影子 | 来源:发表于2016-12-30 18:36 被阅读657次

    Kafka的初始化启动流程

    • 由KafkaServer::startup来负责;
    • KafkaServer::startup主要是创建并启动各种Manager;
    • 上图:
    kafkaserver_startup.png
    • KafkaHealthcheck: core/src/main/scala/kafka/server/KafkaHealthcheck.scala,其作用是在broker info注册到zk的/brokers/id路径下, 且监听zk的session expiration事件,触发时重新注册;

    • 上图中的各个启动的组件我们慢慢都会介绍到, 先从请求的接收与响应开始~~~

    请求处理

    • SocketServer: 负责处理网络连接, 数据的接收和发送, 其中的RequestChannel负责向应用层转递请求,也负责把应用层的response传回网络层后发送出去;
      详细见:Kafka源码分析-网络层-1 Kafka源码分析-网络层-2 Kafka源码分析-网络层-3
    • KafkaRequestHandlerPool: 线程池, 每个线程里跑一个KafkaRequestHandler
    • KafkaRequestHandler: 循环调用RequestChannel::receiveRequest来poll到新的request交给KafkaApis处理;
    • KafkaApis: 处理request的分发
    request.requestId match {
            case RequestKeys.ProduceKey => handleProducerRequest(request)
            case RequestKeys.FetchKey => handleFetchRequest(request)
            case RequestKeys.OffsetsKey => handleOffsetRequest(request)
            case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
            case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
            case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
            case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
            case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
            case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
            case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
            case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
            case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
            case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
            case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
            case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
            case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
            case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
            case requestId => throw new KafkaException("Unknown api code " + requestId
    

    上图:

    kafkaapis.png

    下篇我们开始KafkaController分析-选主和Failover

    Kafka源码分析-汇总

    相关文章

      网友评论

        本文标题:Kafka初始化流程与请求处理

        本文链接:https://www.haomeiwen.com/subject/vqhyvttx.html