美文网首页
Kafka-14.实现-日志

Kafka-14.实现-日志

作者: 悠扬前奏 | 来源:发表于2019-06-21 22:09 被阅读0次

    有两个分区的名为“my_topic”的主题的日志包含两个目录(即my_topic_0和my_topic_1),其中填充了包含该主题的消息的数据文件。日志文件的格式是一系列“日志条目”;每个日志条目是一个4字节整型变量N,存储消息长度,后跟N个消息字节。每条消息由64位整数偏移量给出消息在这个分去中所有发送到这个主题的消息的流中开始的字节位置。每个日志文件都以其包含的第一条消息的便宜量命名,因此创建的第一个文件都将是00000000000.kafka,并且每个附加文件将具有一个整数名称,大约是从前一个文件中的S个字节,其中S是配置中给出的最大日志文件的大小。

    record的确切二进制格式被版本化并维护为标准接口,因此record批次可以在生产者,broker,和客户端之间传输,而无需进行重新复制或转换。上一节包含了有关磁盘上对record进行格式化的详细信息。

    使用消息偏移量作为消息ID是不常见的。我们最初的想法是使用生产者生成的GUID,并在每个broker上维护从GUID到偏移的映射。但由于消费者必须为每个服务器维护一个ID,因此GUID的全局唯一性不提供任何价值。此外,保持从随机id到偏移的映射的复杂性需要heavy的索引结构,其必须与磁盘同步,基本上需要完全持久的随机访问数据结构。 因此,为了简化查找结构,我们决定使用一个简单的per-partition原子计数器,它可以由分区id和节点id组成来唯一的标识消息;这使得查找的结构更加简单,尽管仍然可能针对每个消费者请求进行多次搜索。但是,一旦我们确定了一个计数器,直接使用偏移的跳转看起来就很自然了——毕竟这之后都是在分区中单调地增加整型。由于偏移量是从消费者API隐藏的,因此这个决定最终是一个实现细节,我们采用了更加有效的方法。

    Kafka 日志实现

    写入

    日志允许串行追加始终去到最后一个文件。当文件达到可配置的大小(例如1GB)时,改文件将转移到一个新文件中。该日志有两个配置参数:M,它给出了在强制操作系统把文件flush到硬盘之前写入的消息数,以及S,它给出了强制刷新的秒数。这提供了在系统崩溃时最多丢失M个消息或S秒数据的持久性保证。

    读取

    通过给出消息的64位逻辑偏移量和S字节的最大块大小来完成读取。这将返回包含着唉S字节缓冲区中的消息的迭代器。S旨在比任何单个消息都大,但是如果消息异常的大,则可以多次重试读取,每次将缓冲区大小加倍,直到消息被成功读取。可以指定最大消息和缓冲区大小,以使服务器拒绝大于某个大小的消息,并在需要读取的最大值上为客户端提供绑定以获得完整的消息。读缓冲区很可能以部分消息结束,这很容易通过大小分隔来实现。

    从偏移量读取数据的实际过程需要首先定位存储数据的日志段文件,从全局偏移量计算文件特定的偏移量,然后从该文件偏移量中读取。搜索值针对每个文件维护的内存范围的简单二进制搜索变体来完成的。

    日志提供了获取最近编写信息的功能,以允许客户端“立即”开始订阅。在消费者未能在其SLA-specified的天数内使用其数据的情况下,这很有用。在这种情况下,当客户端尝试使用不存在的偏移量时,会给出OutOfRangeException,并且可以自行重置或根据用例进行失败。

    以下是发送给消费者的结果格式:

    MessageSetSend (fetch result)
     
    total length     : 4 bytes
    error code       : 2 bytes
    message 1        : x bytes
    ...
    message n        : x bytes
    1
    2
    3
    4
    5
    6
    7
    MultiMessageSetSend (multiFetch result)
     
    total length       : 4 bytes
    error code         : 2 bytes
    messageSetSend 1
    ...
    messageSetSend n
    

    相关文章

      网友评论

          本文标题:Kafka-14.实现-日志

          本文链接:https://www.haomeiwen.com/subject/xmsuqctx.html