美文网首页
Kafka/RocketMQ存储结构对比

Kafka/RocketMQ存储结构对比

作者: 进击的蚂蚁zzzliu | 来源:发表于2021-06-21 19:04 被阅读0次

    一、Kafka

    存储结构.png

    Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)、位移索引文件(.index)、时间戳索引文件(.timeindex)以及已中止(Aborted)事务的索引文件(.txnindex)(如果没有使用 Kafka 事务,已中止事务的索引文件是不会被创建出来的)。图中的一串数字 0 是该日志段的起始位移值,也就是该日志段中所存的第一条消息的位移值。

    一般情况下,一个 Kafka 主题有很多分区,每个分区就对应一个 Log 对象,在物理磁盘上则对应于一个子目录。比如你创建了一个双分区的主题 test-topic,那么,Kafka 在磁盘上会创建两个子目录:test-topic-0 和 test-topic-1。而在服务器端(Broker),这就是两个 Log 对象。每个子目录下存在多组日志段,也就是多组.log、.index、.timeindex 文件组合,只不过文件名不同,因为每个日志段的起始位移不同。

    文件内容

    # 1、执行下面命令即可将日志数据文件内容dump出来
    ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /apps/svr/Kafka/kafkalogs/kafka-topic-01-0/00000000000022372103.log --print-data-log > 00000000000022372103_txt.log
    
    #2、dump出来的具体日志数据内容
    Dumping /apps/svr/Kafka/kafkalogs/kafka-topic-01-0/00000000000022372103.log
    Starting offset: 22372103
    offset: 22372103 position: 0 CreateTime: 1532433067157 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 5d2697c5-d04a-4018-941d-881ac72ed9fd
    offset: 22372104 position: 0 CreateTime: 1532433067159 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 0ecaae7d-aba5-4dd5-90df-597c8b426b47
    offset: 22372105 position: 0 CreateTime: 1532433067159 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 87709dd9-596b-4cf4-80fa-d1609d1f2087
    ......
    ......
    offset: 22372444 position: 16365 CreateTime: 1532433067166 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 8d52ec65-88cf-4afd-adf1-e940ed9a8ff9
    offset: 22372445 position: 16365 CreateTime: 1532433067168 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 5f5f6646-d0f5-4ad1-a257-4e3c38c74a92
    offset: 22372446 position: 16365 CreateTime: 1532433067168 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 51dd1da4-053e-4507-9ef8-68ef09d18cca
    offset: 22372447 position: 16365 CreateTime: 1532433067168 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 80d50a8e-0098-4748-8171-fd22d6af3c9b
    ......
    ......
    offset: 22372785 position: 32730 CreateTime: 1532433067174 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: db80eb79-8250-42e2-ad26-1b6cfccb5c00
    offset: 22372786 position: 32730 CreateTime: 1532433067176 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 51d95ab0-ab0d-4530-b1d1-05eeb9a6ff00
    ......
    ......
    #3、同样地,dump出来的具体偏移量索引内容
    Dumping /apps/svr/Kafka/kafkalogs/kafka-topic-01-0/00000000000022372103.index
    offset: 22372444 position: 16365
    offset: 22372785 position: 32730
    offset: 22373467 position: 65460
    offset: 22373808 position: 81825
    offset: 22374149 position: 98190
    offset: 22374490 position: 114555
    ......
    ......
    #4、dump出来的时间戳索引文件内容
    Dumping /apps/svr/Kafka/kafkalogs/kafka-topic-01-0/00000000000022372103.timeindex
    timestamp: 1532433067174 offset: 22372784
    timestamp: 1532433067191 offset: 22373466
    timestamp: 1532433067206 offset: 22373807
    timestamp: 1532433067214 offset: 22374148
    timestamp: 1532433067222 offset: 22374489
    timestamp: 1532433067230 offset: 22374830
    ......
    ......
    
    1. 消息日志文件(.log)

    消息日志文件就是存储具体的消息内容,默认1G;其主要内容为:

    • offset:唯一确定了同一分区中(不是同一消息日志文件中,也不是同一Broker上)一条Message的逻辑位置,同一个分区下的消息偏移量按照顺序递增
    • position:表示该条Message在磁盘上消息日志文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message;
    • key:消息Key实际数据
    • payload:消息内容实际数据
    2. 偏移量索引文件(.index)

    偏移量索引文件中存储的 offset -> position 的映射关系;

    • 为了减少存储空间的使用,Kafka采用稀疏索引存储的方式,每隔一定的字节数建立了一条索引,可以通过“index.interval.bytes”设置索引的跨度;
    • 根据指定的偏移量,
      首先使用二分法找到该消息所在的.index文件和.log文件;
      然后在.index文件中再通过二分查找法,继续查找出小于等于指定偏移量的最大偏移量,同时也得出了对应的position;
      最后根据该position在.log文件中顺序扫描查找偏移量与指定偏移量相等的消息;

    为了解决传统二分查找算法导致的不必要的Page Fault的问题,Kafka进行了改进,把所有索引项分成两部分:热区和冷区,然后分别在这两个区域内执行二分查找算法;

    3. 时间戳索引文件(.timeindex)

    时间戳索引文件存储 时间戳 -> offset映射关系;

    Kafka零拷贝机制:
    1.Kafka索引都是基于 MappedByteBuffer 的,也就是让用户态和内核态共享内核态的数据缓冲区,此时,数据不需要复制到用户态空间。不过,mmap 虽然避免了不必要的拷贝,但不一定就能保证很高的性能。在不同的操作系统下,mmap 的创建和销毁成本可能是不一样的。很高的创建和销毁开销会抵消 Zero Copy 带来的性能优势。由于这种不确定性,在 Kafka 中,只有索引应用了 mmap,最核心的日志并未使用 mmap 机制。
    2.TransportLayer 是 Kafka 传输层的接口。它的某个实现类使用了
    FileChannel 的 transferTo 方法。该方法底层使用 sendfile 实现了 Zero Copy。对 Kafka而言,如果 I/O 通道使用普通的 PLAINTEXT,那么,Kafka 就可以利用 Zero Copy 特性,直接将页缓存中的数据发送到网卡的 Buffer 中,避免中间的多次拷贝。相反,如果I/O 通道启用了 SSL,那么,Kafka 便无法利用 Zero Copy 特性了。

    二、RocketMQ

    1. 文件结构

    |-- store
        |-- commitlog
            |-- 00000003384434229248
            |-- 00000003385507971072
            |-- 00000003386581712896 
        |-- consumequeue
            |-- TopicA
                |-- 0
                    |-- 00000000002604000000
                    |-- 00000000002610000000
                |-- 1
                    |-- 00000000002610000000
                    |-- 00000000002616000000
            |-- TopicB
                |-- 0
                    |-- 00000000000732000000
                |-- 1
                    |-- 00000000004610000000
                |-- 3
                    |-- 00000000005610000000
                |-- 4
                    |-- 00000000006610000000
        |-- index
            |-- 20210620170423012
    

    RocketMQ主要有三中日志文件:

    • commitlog:存储原始消息,单个文件默认1G,写满后生成新文件;Broker上所有Topic共享commitlog文件;
    • consumequeue:创建Topic时可以指定queue数量,queue平均分配(或者指定具体Broker)到Broker上,每个queue对应一个consumequeue文件;
    • index:存储key -> 消息在CommitLog文件中的物理偏移量 的映射关系,方便通过key或时间区间进行查找;Broker上所有Topic共享index文件;

    2. 消息存储

    消息存储.png

    RocketMQ的混合型存储结构采用了数据(commitlog)和索引(index/consumequeue)部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失,因此Consumer也就肯定有机会去消费这条消息,至于消费的时间可以稍微滞后一些也没有太大的关系。
    这里,RocketMQ的具体做法是,使用Broker端的后台服务线程ReputMessageService不停地分发请求并异步构建ConsumeQueue和IndexFile。

    2.1 commitlog
    commitlog.png

    RocketMQ的commitLog是WAL的一种,通过对文件的顺序追加写入,提高了文件的写入性能;

    2.2 consumequeue
    ConsumeQueue.png
    • consumequeue文件中主要是offset,即在commitLog中的位置信息,通过这个offset偏移量保证消息读取阶段能够定位到消息的物理位置;
    • 单条数据大小为20字节,单个ConsumeQueue文件能够保存30万条数据,每个文件大约占用5.7MB;
    • Topic下每个MessageQueue对应了Broker上多个ConsumeQueue文件,这些ConsumeQueue文件保存了该MessageQueue的所有消息在CommitLog文件中的物理位置,即offset偏移量;
    • ConsumeQueue能够区分不同Topic下的不同MessageQueue的消息,同时能够为消费消息起到一定的缓冲作用(当只有ReputMessageService异步服务线程通过doDispatch异步生成了ConsumeQueue队列的元素后,Consumer端才能进行消费);
    2.3 index
    IndexFile.png

    为了满足根据msgId以及消息key查询消息的需求,每个Broker对应一组indexFile;

    • index文件分为三部分:文件头indexHeader,一系列槽位slots,真正的索引数据index;
    • index文件最大大小为40+50000004+5000000 * 420byte,写完后继续写下一个,indexFile文件名比较特殊,是一串时间戳;
    • indexFile结构与hash表很相似,固定数量的slot组成数组,每个slot对应一条index链,index之间通过链表方式组织在一起。slot的值对应当前slot下最新的那个index的序号,index中存储了当前slot下、当前index的前一个index序号,这就把slot下的所有index链起来了;·
      由于indexHeader,slot,index都是固定大小,所以:
      公式1:第n个slot在indexFile中的起始位置是这样:40+(n-1)4
      公式2: 第s个index在indexFile中的起始位置是这样:40+5000000
      4+(s-1)*20
    • 查询流程:
      key-->计算hash值-->hash值对500万取余算出对应的slot序号-->根据40+(n-1)4(公式1)算出该slot在文件中的位置-->读取slot值,也就是index序号-->根据40+50000004+(s-1)*20(公式2)算出该index在文件中的位置-->读取该index-->将key的hash值与index的keyHash值进行比对;
      -->不满足则根据index中的preIndexNo找到上一个index,继续上一步
      -->满足则根据index中的phyOffset拿到commitLog中的消息

    三、对比

      1. Kafka针对Producer和Consumer使用了同一份存储结构;
        RocketMQ却为Producer和Consumer分别设计了不同的存储结构,Producer对应CommitLog, Consumer对应ConsumeQueue,CommitLog和ConsumeQueue采用“最终一致性”的方案保证一致性;
      1. Kafka每个partition对应一个日志文件,Producer对该日志文件进行“顺序写”,Consumer对该文件进行“顺序读”,这种存储方式,对于每个文件来说是顺序IO,但是当并发的读写多个partition的时候,对应多个文件的顺序IO,表现在文件系统的磁盘层面,还是随机IO。因此当partition或者topic个数过多时,Kafka的性能急剧下降;
        RocketMQ采用了单一的日志文件,即把同一台机器上面所有topic的所有queue的消息,存放在一个文件里面,从而避免了随机的磁盘写入;
      1. Kafka:消息的读写都是基于 FileChannel;索引读写基于 MMAP;
        RocketMQ:读盘基于 MMAP,写盘默认使用 MMAP,可通过修改配置,配置成 FileChannel,原因是可以避免 PageCache 的锁竞争,通过两层架构实现读写分离;
    • 参考《消息中间件—Kafka数据存储(一)

    相关文章

      网友评论

          本文标题:Kafka/RocketMQ存储结构对比

          本文链接:https://www.haomeiwen.com/subject/jixreltx.html