美文网首页
Kafka 通信协议(译)

Kafka 通信协议(译)

作者: Alex90 | 来源:发表于2019-06-06 17:37 被阅读0次

    原文链接 A Guide To The Kafka Protocol

    Kafka 的 Producer、Broker 和 Consumer 之间采用的基于 TCP 的二进制协议,完全是为了 Kafka 自身的业务需求而定制的,协议定义了所有 API 的请求及响应消息。所有的消息都是通过长度来分隔,并且由后面描述的数据类型组成。

    数据类型

    • 定长基本类型
      INT8INT16INT32INT64UINT32

    • 变长基本类型
      STRINGBYTES
      一个表示长度的带符号整数 N 以及后续 N 字节的内容组成,长度如果为-1则表示空(NULL)。
      STRING 使用 INT16 表示长度,BYTES 使用 INT32 表示长度。

    • 数组
      用来处理重复的结构体数据,总是由一个代表元素个数的整数 N(INT32),以及后续 N 个重复结构体组成。
      这些结构体自身是由其他的基本数据类型组成。后面会把结构 foo 的数组显示为 [foo]

    通用的请求和响应格式

    所有请求和响应都源于以下语法,将会对这些语法逐步进行描述:

    RequestOrResponse => Size (RequestMessage | ResponseMessage)
      Size => int32
    
    Field Description
    MessageSize 以字节为单位给出后续请求或响应消息的大小

    请求(Request)

    所有请求都具有以下格式:

    RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
      ApiKey => int16
      ApiVersion => int16
      CorrelationId => int32
      ClientId => string
      RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
    
    Field Description
    ApiKey 表示正在调用的API的ID(即它是元数据请求、生产请求、获取请求等)。
    ApiVersion 表示正在调用的API的版本号。
    版本号允许服务器正确地解释请求内容。响应消息也始终对应于所述请求的版本的格式
    CorrelationId 这是一个用户提供的整数。
    将会被服务器回传给客户端。用于在客户机和服务器之间匹配请求和响应
    ClientId 客户端的自定义的标识。
    可以使用任意标识符,会被用在记录错误、监测统计信息等场景

    响应(Responses)

    Response => CorrelationId ResponseMessage
    CorrelationId => int32
    ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse
    
    Field Description
    CorrelationId 服务器回传给客户端的信息于请求中的信息一致

    所有响应都是与请求成对匹配

    消息集(Message sets)

    生产和获取消息指令请求使用同一个消息集结构。Kafka 中的消息由一个键值对以及少量相关的元数据组成。消息集只是一系列带有偏移量和大小信息的消息序列。这种格式同时用于 broker 上的磁盘存储和线上数据格式(on-the-wire format)。

    在 Kafka 中,消息集也是压缩的基本单位,同时允许消息递归地包含压缩的消息集,以允许批压缩。

    在通讯协议中,消息集的前面没有类似的数组元素前面的 INT32 整数。

    MessageSet => [Offset MessageSize Message]
      Offset => int64
      MessageSize => int32
    

    消息体:

    v0
    Message => Crc MagicByte Attributes Key Value
      Crc => int32
      MagicByte => int8
      Attributes => int8
      Key => bytes
      Value => bytes
      
    v1 (supported since 0.10.0)
    Message => Crc MagicByte Attributes Key Value
      Crc => int32
      MagicByte => int8
      Attributes => int8
      Timestamp => int64
      Key => bytes
      Value => bytes
    
    Field Description
    Offset Kafka 中消息的偏移量
    Crc 剩余消息字节的 CRC32 值,用来检查信息的完整性
    MagicByte 这是一个版本ID,用于允许消息二进制格式的向后兼容演进,当前值是1
    Attributes 此字节保存有关消息的元数据属性
    Timestamp 消息的时间戳
    Key 一个可选项,主要用来进行指派分区。可以为 null
    Value 消息的实际内容,类型是字节数组,也可能是一个消息集。可以为 null

    在 Kafka 0.11 中,消息集和消息的结构发生了变化。不仅添加了新字段以支持新功能,如一次性语义和记录头,而且消除了以前版本消息格式的递归性质,消息集(Messageset)称为 RecordBatch,包含一个或多个 Record(而不是 Message)。当启用压缩时,RecordBatch 将保持未压缩状态,但 Records 被压缩在一起。

    RecordBatch =>
      FirstOffset => int64
      Length => int32
      PartitionLeaderEpoch => int32
      Magic => int8 
      CRC => int32
      Attributes => int16
      LastOffsetDelta => int32
      FirstTimestamp => int64
      MaxTimestamp => int64
      ProducerId => int64
      ProducerEpoch => int16
      FirstSequence => int32
      Records => [Record]
      
    Record =>
      Length => varint
      Attributes => int8
      TimestampDelta => varint
      OffsetDelta => varint
      KeyLen => varint
      Key => data
      ValueLen => varint
      Value => data
      Headers => [Header]
      
    Header => HeaderKey HeaderVal
      HeaderKeyLen => varint
      HeaderKey => string
      HeaderValueLen => varint
      HeaderValue => data
    

    压缩(Compression)

    Kafka 支持压缩消息以提高效率,当然,这比压缩一条消息要更复杂。单个消息可能没有足够的冗余信息以达到良好的压缩比,因此必须以特殊的批处理方式发送压缩消息。要被发送的消息被包装(未压缩)在一个 MessageSet 结构中,然后将其压缩并存储在一个 MessageValue 中,一起保存的还有相应的压缩编解码集。接收系统通过解压缩得到实际的消息集。

    Kafka 目前支持两个压缩编解码器,编解码器编号如下:

    Compression Codec
    None 0
    GZIP 1
    Snappy 2

    接口(APIs)

    Kafka 六个核心的客户端请求的 API:

    • 元数据接口(Metadata API)
    • 生产消息接口(Produce API)
    • 获取消息接口(Fetch API)
    • 偏移量接口(Offset API)
    • 偏移量提交接口(Offset Commit API)
    • 偏移量获取接口(Offset Fetch API)

    元数据接口(Metadata API)

    这个 API 能获取以下信息:

    • 存在哪些 Topic?
    • 每个 Topic 有几个 Partition?
    • 每个 Partition 的 Leader 是哪个 broker?
    • 这些 broker 的地址和端口分别是什么?

    这是唯一一个能发往集群中任意一个 broker 的请求消息。

    因为可能有很多 Topic 存在,客户端可以提供一个可选 Topic 名称列表,只返回这些 Topic 的元数据。

    返回的元数据是 Partition 级别的信息,以 Topic 分组集中在一起。每个分区的元数据中包含了 leader 以及所有副本,包括正在同步的副本的信息。

    Topic Metadata Request

    TopicMetadataRequest => [TopicName]
      TopicName => string
    
    Field Description
    TopicName 请求元数据的 Topic。如果为空,则请求所有主题元数据

    Metadata Response

    响应包含的每个分区的元数据,以 Topic 分组,使用 Broker id 指向具体的 Broker。

    MetadataResponse => [Broker][TopicMetadata]
      Broker => NodeId Host Port  (any number of brokers may be returned)
        NodeId => int32
        Host => string
        Port => int32
      TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
        TopicErrorCode => int16
      PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
        PartitionErrorCode => int16
        PartitionId => int32
        Leader => int32
        Replicas => [int32]
        Isr => [int32]
    
    Field Description
    Leader 分区的 Leader 节点的 Broker id。如果在 Leader 选举过程中,没有 Leader 存在,值为 -1
    Replicas 分区的活着的 slave 节点集合
    Isr Replicas 集合中,所有处在与 Leader 跟随(表示数据已经完全复制到这些节点)状态的子集
    Broker Broker 节点ID、主机名和端口信息

    生产消息接口(Produce API)

    用于向服务器发送消息集。为了提高效率,允许在一个请求中为多个主题分区发送消息集,使用通用消息集格式。

    Produce Request

    v0, v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
    ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
      RequiredAcks => int16
      Timeout => int32
      Partition => int32
      MessageSetSize => int32
    
    Field Description
    RequiredAcks 表示服务端收到多少确认后才发送 Response 给客户端。
    如果设置为0,那么服务端将不发送 Response。
    如果设置为1,那么服务器将等到数据写入到本地日之后发送。
    如果设置为-1,那么服务端将等到数据被所有的同步副本写入后再发送
    Timeout 服务器等待接收 RequiredAcks 中确认数的最长时间(毫秒)
    TopicName 发布到的 Topic
    Partition 发布到的 Partition
    MessageSetSize 后续消息集的长度(字节)
    MessageSet 标准格式的消息集合

    Produce Response

    v0
    ProduceResponse => [TopicName [Partition ErrorCode Offset]]
      TopicName => string
      Partition => int32
      ErrorCode => int16
      Offset => int64
      
    v1 (supported in 0.9.0 or later)
    ProduceResponse => [TopicName [Partition ErrorCode Offset]] ThrottleTime
      TopicName => string
      Partition => int32
      ErrorCode => int16
      Offset => int64
      ThrottleTime => int32
      
    v2 (supported in 0.10.0 or later)
    ProduceResponse => [TopicName [Partition ErrorCode Offset Timestamp]] ThrottleTime
      TopicName => string
      Partition => int32
      ErrorCode => int16
      Offset => int64
      Timestamp => int64
      ThrottleTime => int32
    
    Field Description
    Topic 此响应对应的 Topic
    Partition 此响应对应的 Partition
    ErrorCode 分区对应的错误信息
    Offset 追加到该分区的消息集中的为第一个消息分配的偏移量
    Timestamp 如果该主题使用了 LogAppendTime,消息集中的所有消息都有相同的时间戳
    ThrottleTime 由于配额冲突而阻止请求的持续时间(毫秒)

    获取消息接口(Fetch API)

    用于获取某些主题分区的一个或多个日志块(a chunk of one or more logs),指定了主题、分区和起始偏移量,开始提取并返回消息块。通常,返回消息的偏移量大于等于起始偏移量。但是,对于压缩的消息,返回的消息的偏移量可能小于起始偏移量。这类的消息的数量通常较少,并且调用者必须负责过滤掉这些消息。

    接口使用长轮询模型,如果没有足够的数据立即可用,可以在一段时间内阻塞。

    服务器被允许在消息集末尾返回部分消息,客户端应该处理这种情况。

    Fetch Request

    FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
      ReplicaId => int32
      MaxWaitTime => int32
      MinBytes => int32
      TopicName => string
      Partition => int32
      FetchOffset => int64
      MaxBytes => int32
    
    Field Description
    ReplicaId 发起这个请求的副本节点ID,普通消费者客户端应该始终将其指定为-1
    MaxWaitTime 如果没有足够的数据可发送时,最大阻塞等待时间(毫秒)
    MinBytes 返回响应消息的最小字节数目,必须设置。
    如果设为0,服务器将会立即返回,如果没有新的数据,会返回一个空消息集。
    如果设为1,服务器将在至少一个分区收到一个字节的数据的情况下立即返回,或者等到超时时间达到
    TopicName 获取数据的 Topic
    Partition 获取数据的 Partition
    FetchOffset 获取数据的起始 Offset
    MaxBytes 分区返回消息集所能包含的最大字节数

    Fetch Response

    v0
    FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
      TopicName => string
      Partition => int32
      ErrorCode => int16
      HighwaterMarkOffset => int64
      MessageSetSize => int32
      
    v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
    FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
      ThrottleTime => int32
      TopicName => string
      Partition => int32
      ErrorCode => int16
      HighwaterMarkOffset => int64
      MessageSetSize => int32
    
    Field Description
    ThrottleTime 由于配额冲突而阻止请求的持续时间(毫秒)
    TopicName 此响应对应的 Topic
    Partition 此响应对应的 Partition
    HighwaterMarkOffset 该分区日志结尾处的偏移量,可以用来确定日志结尾后面有多少条消息。
    MessageSetSize 此分区的消息集的大小(字节)
    MessageSet 此分区获取到的消息集

    偏移量接口(Offset API,AKA ListOffset)

    描述了一组主题分区的偏移量有效范围。

    对于 v0 版本,响应包含请求分区的每个段的起始偏移量以及“日志结束偏移量(log end offset)”,也就是将附加到给定分区的下一条消息的偏移量。
    对于 v1 版本(0.10.1.0+),Kafka 支持按消息中使用的时间戳搜索偏移量的时间索引,并对此 API 进行了更改以支持这一点。

    Offset Request

    // v0
    ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
      ReplicaId => int32
      TopicName => string
      Partition => int32
      Time => int64
      MaxNumberOfOffsets => int32
     
      
    // v1 (supported in 0.10.1.0 and later)
    ListOffsetRequest => ReplicaId [TopicName [Partition Time]]
      ReplicaId => int32
      TopicName => string
      Partition => int32
      Time => int64
    
    Field Description
    Time 用来请求一定时间(毫秒)前的所有消息。
    两个特殊取值:
    -1 表示获取最后一个offset
    -2 表示获取最早的有效偏移量

    Offset Response

    // v0
    OffsetResponse => [TopicName [PartitionOffsets]]
      PartitionOffsets => Partition ErrorCode [Offset]
      Partition => int32
      ErrorCode => int16
      Offset => int64
     
      
    // v1
    ListOffsetResponse => [TopicName [PartitionOffsets]]
      PartitionOffsets => Partition ErrorCode Timestamp [Offset]
      Partition => int32
      ErrorCode => int16
      Timestamp => int64
      Offset => int64
    

    偏移量提交接口(Offset Commit API)

    Offset Commit Request

    v0 (supported in 0.8.1 or later)
    OffsetCommitRequest => ConsumerGroupId [TopicName [Partition Offset Metadata]]
      ConsumerGroupId => string
      TopicName => string
      Partition => int32
      Offset => int64
      Metadata => string
     
      
    v1 (supported in 0.8.2 or later)
    OffsetCommitRequest => ConsumerGroupId ConsumerGroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]]
      ConsumerGroupId => string
      ConsumerGroupGenerationId => int32
      ConsumerId => string
      TopicName => string
      Partition => int32
      Offset => int64
      TimeStamp => int64
      Metadata => string
     
    v2 (supported in 0.9.0 or later)
    OffsetCommitRequest => ConsumerGroup ConsumerGroupGenerationId ConsumerId RetentionTime [TopicName [Partition Offset Metadata]]
      ConsumerGroupId => string
      ConsumerGroupGenerationId => int32
      ConsumerId => string
      RetentionTime => int64
      TopicName => string
      Partition => int32
      Offset => int64
      Metadata => string
    

    在 v0、v1 版本中,每个分区的时间戳定义为提交时间戳,偏移量协调者将保存消费者所提交的偏移量,直到当前时间超过提交时间加偏移量保留时间(在 broker 配置中);如果时间戳没有设值,那么 broker 会将此值设定为接收到提交偏移量请求的时间。

    在v2版本中,移除了时间戳,但是增加了一个全局保存时间域(KAFKA-1634)代替。

    Offset Commit Response

    v0, v1 and v2:
    OffsetCommitResponse => [TopicName [Partition ErrorCode]]]
      TopicName => string
      Partition => int32
      ErrorCode => int16
    

    偏移量获取接口(Offset Fetch API)

    Offset Fetch Request

    v0 and v1 (supported in 0.8.2 or after):
    OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
      ConsumerGroup => string
      TopicName => string
      Partition => int32
    

    Offset Fetch Response

    v0 and v1 (supported in 0.8.2 or after):
    OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]]
      TopicName => string
      Partition => int32
      Offset => int64
      Metadata => string
      ErrorCode => int16
    

    另外还有一些管理接口,这里不做介绍。再来看看下面的内容。

    Some Common Philosophical Questions

    有些人问,为什么不使用 HTTP。有许多原因,最主要的是客户端实现可以使用一些更高级的 TCP 特性,同时 HTTP 库在许多编程语言中是令人惊讶的烂。

    还有人问,为什么不支持许多不同的协议。此前的经验是,如果必须在许多协议实现中移植新特性,是很难添加和测试的,并且大多数用户并不在乎支持多个协议这些特性,只是希望在自己选择的语言中实现了良好可靠的客户端。

    另一个问题是,为什么不采用 XMPP,STOMP,AMQP 或现有的协议。这一问题的答案因协议而异,共同的问题是,协议决定了大部分的实现,如果没有对协议的控制权,就不能做一些想要做的事情。也许现在的实现方式比现有的协议更好。

    最后一个问题是,为什么不使用的 Protocol Buffers 或 Thrift 来定义请求消息格式。这些库擅长帮助管理非常多的序列化的消息,然而,这里只有几个种类的消息,而且这些库跨语言的支持程度不同。

    最后,通过对二进制日志格式和传输协议之间映射的管理,让 API 有明确的版本并且更细致地控制兼容性。

    相关文章

      网友评论

          本文标题:Kafka 通信协议(译)

          本文链接:https://www.haomeiwen.com/subject/xjbuxctx.html