美文网首页
【Kafka零基础学习】Kafka请求分类

【Kafka零基础学习】Kafka请求分类

作者: 文竹小二 | 来源:发表于2018-06-05 23:46 被阅读7次

概要

Kafka 0.10.0.1支持的请求有19种,可以从KafkaApis.handle中查看。如下:

def handle(request: RequestChannel.Request) {
    try {
      trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
        format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
      ApiKeys.forId(request.requestId) match {
        case ApiKeys.PRODUCE => handleProducerRequest(request)
        case ApiKeys.FETCH => handleFetchRequest(request)
        case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
        case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
        case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
        case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
        case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
        case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
        case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
        case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
        case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
        case requestId => throw new KafkaException("Unknown api code " + requestId)
      }
    } 

后面部分将根据作用域对这些请求进行分类并做简要说明。个人认为只要对这些请求的创建,处理过程有一个熟练的掌握,将会对Kafka有一个质的认识。所以后续有一系列文章将会围绕这些请求逐一进行详细介绍。

生产者

  • ProducerRequest
    用于发送消息到Kafka服务端,服务端也即Broker。

消费者

  • GroupCoordinatorRequest
    Reblance第一步:用于从Broker查找GroupCoordinator。每个ConsumerGroup映射为服务端的一个GroupCoordinator。

  • JoinGroupRequest
    Reblance第二步:Consumer首先向GroupCoordinator发送JoinGroupRequest请求,其中包含消费者的相关消息;服务端的GroupCoordinator收到JoinGroupRequest后会暂存消息,收集到全部消费者后,根据JoinGroupRequest中的信息来确定Consumer Group中可用的消费者,从中选取一个消费者成为Group Leader,还会选取使用的分区分配策略,最后将这些消息封装成JoinGroupResponse返回给消费者。
    虽然每个消费者都会收到JoinGroupResponse,但是只有Group Leader收到的JoinGroupReponse中封装了所有消费者的消息。当消费者确定自己时Group Leader后,会根据消费者的信息及选定的分区分配策略进行分区分配。

  • SyncGroupRequest
    Reblance第三步:消费者会发送SyncGroupRequest到GroupCoordinator,但是只有GroupLeader的SyncGroupRequest请求包含了分区的分配结果,GroupCoordinator根据Group Leader的分区分配结果,形成SyncGroupResponse返回给所有消费者。消费者收到SyncGroupResponse后进行解析,便可获取分配给自身的分区。

  • HeartbeatRequest
    发送此请求是为了告诉GroupCoordinator此消费者正常在线。

  • FetchRequest
    用于从Kafka服务端获取消息。

  • LeaveGroupRequest
    当Consumer正常离开ConsumerGroup时会发送LeaveGroupRequest。

  • OffsetCommitRequest
    在进行消费者正常消费过程中以及Rebalance操作开始之前,都会提交一次offset记录Consumer的当前消费位置。

  • OffsetFetchRequest
    在Rebalance操作结束后,每个消费者都确定了其需要消费的分区。在开始消费之前,消费者需要确定拉取消息的起始位置。

  • OffsetRequest
    在有些场景下,例如第一次消费某个Topic的分区,服务端的内部Offsets Topic中并没有记录当前消费者在此分区上的消费位置,所以消费者无法从服务端获取最近提交的offset。此时,如果用户手动指定消费的其实offset,则可以从指定offset开始消费,否者就需要重置TopicPartitionState.position字段。重置TopicPartitionState.position字段的过程中涉及OffsetsRequest和OffsetResponse。

Broker

  • LeaderAndIsrRequest
    用于从ISR集合中选取新的Leader。

  • StopReplicaRequest
    在分区的副本进行重新分配、关闭Broker等过程中会使用到此请求。当Broker收到来至KafkaController的StopReplicaRequest请求时,会关闭其指定的副本,并根据StopReplicaRequest中的字段决定是否删除副本对应的log。

  • UpdateMetadataRequest
    MetadataCache是Broker用来缓存整个集群中全部分区状态的组件。KafkaController通过向集群中的Broker发送UpdateMetadataRequest来更新Metadata中缓存的数据,每个Broker在收到该请求后会异步更新MetadataCache中的数据。

  • ControlledShutdownRequest
    在关闭JVM时触发(作为JVM的关闭钩子触发)。有两个好处,一是可以让日志文件完全同步到磁盘上,在Broker下次重现上线时不需要进行Log的恢复操作;二是对其上的Leader副本进行迁移。

脚本

  • ListGroupsRequest
    用于kafka-consumer-groups脚本。

  • DescribeGroupRequest
    用于kafka-consumer-groups脚本

测试

  • ApiVersionsRequest
    测试类中用到了。

通用

  • TopicMetadataRequest
    用于生产者/消费者请求Topic metadata。

  • SaslHandshakeRequest
    Kafka的身份认证请求。

相关文章

网友评论

      本文标题:【Kafka零基础学习】Kafka请求分类

      本文链接:https://www.haomeiwen.com/subject/bztvsftx.html