美文网首页
05、Kafka的索引机制

05、Kafka的索引机制

作者: 技术灭霸 | 来源:发表于2020-07-08 21:30 被阅读0次

    Kafka中有三大类索引:位移索引、时间戳索引和已中止事务索引。分别对应了.index、.timeindex、.txnindex文件。

    与之相关的源码如下:

    1. AbstractIndex.scala:抽象类,封装了所有索引的公共操作
    2. OffsetIndex.scala:位移索引,保存了位移值和对应磁盘物理位置的关系
    3. TimeIndex.scala:时间戳索引,保存了时间戳和对应位移值的关系
    4. TransactionIndex.scala:事务索引,启用Kafka事务之后才会出现这个索引

    先来看看AbstractIndex的定义


    AbstractIndex的定义在代码里已经注释了,成员变量里面还有个entrySize。这个变量其实是每个索引项的大小,每个索引项的大小是固定的。

    entrySize

    在OffsetIndex位移索引中是override def entrySize = 8,8个字节。
    在TimeIndex时间戳索引中是override def entrySize = 12,12个字节。

    为什么要这么麻烦,还要存个差值?
    这其实和MySQL InnoDB 为何建议主键不宜过长一样。每个辅助索引都会存储主键的值,主键越长,每条索引项占用的内存就越大,缓存页一次从磁盘获取的索引数就越少,一次查询需要访问磁盘次数就可能变多。而磁盘访问我们都知道,很慢。

    位移索引

    不同索引类型保存不同的<Key , Value>对,对OffsetIndex位移索引而言,Key就是消息的相对位移,Value保存该消息的日志段文件中该消息第一个字节的物理文件位置。

    为何是8 ?

    在OffsetIndex位移索引中是override def entrySize = 8,8个字节。

    相对位移是一个整型,占用4个字节,物理文件位置也是一个整型,同样占用4个字节,因此总共8个字节。

    我们知道,Kafka中的消息位移值是一个长整型,应该占用8个字节才对,在保存OffsetIndex<Key , Value>对,Kafka做了一些优化,每个OffsetIndex对象在创建时,都已经保存了对应日志段对象的起始位移,因此保存与起始位移的差值就够了。

    1. 为了节省空间,一个索引项节省了4字节,想想那些日消息处理数万亿的公司。
    2. 因为内存资源是很宝贵的,索引项越短,内存中能存储的索引项就越多,索引项多了直接命中的概率就高了。

    写入索引项

    写入索引项append方法的实现

     def append(offset: Long, position: Int): Unit = {
        inLock(lock) {
         // 索引文件如果已经写满,直接抛出异常
          require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
        // 要保证待写入的位移offset比当前索引文件中所存的位移值要大
      // 这主要是为了维护索引的单调性
          if (_entries == 0 || offset > _lastOffset) {
            trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
            mmap.putInt(relativeOffset(offset))//向mmap写入相对位移值
            mmap.putInt(position)//向mmap写入物理文件位置
            _entries += 1//更新索引项个数
            _lastOffset = offset//更新当前索引文件最大位移值
          // 确保写入索引项格式符合要求
            require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
          } else {
            throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
              s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
          }
        }
      }
    
    image.png

    时间戳索引

    TimeIndex保存的是<时间戳,相对位移值>,时间戳需要长整型来保存,相对位移值使用Integer来保存。因此TimeIndex单个索引项需要占用12个字节。

    写入索引项

    def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = {
        inLock(lock) {
          if (!skipFullCheck)
      // 索引文件如果已经写满,直接抛出异常
            require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
        // 这主要是为了维护索引的单调性
          if (_entries != 0 && offset < lastEntry.offset)
            throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" +
              s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.")
    // 这主要是为了维护索引的单调性
          if (_entries != 0 && timestamp < lastEntry.timestamp)
            throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" +
              s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.")
    
          if (timestamp > lastEntry.timestamp) {
            trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.")
            mmap.putLong(timestamp)//向mmap写入时间戳
            mmap.putInt(relativeOffset(offset))//向mmap写入相对位移值
            _entries += 1
            _lastEntry = TimestampOffset(timestamp, offset)
            require(_entries * entrySize == mmap.position(), s"${_entries} entries but file position in index is ${mmap.position()}.")
          }
        }
      }
    

    位移索引和时间戳索引的区别是什么?

    image.png

    改进的二分查找

    就Kafka而言,索引是在文件末尾追加的写入的,并且一般写入的数据立马就会被读取。所以数据的热点集中在尾部。并且操作系统基本上都是用页为单位缓存和管理内存的,内存又是有限的,因此会通过类LRU机制淘汰内存。

    看起来LRU非常适合Kafka的场景,但是使用标准的二分查找会有缺页中断的情况,毕竟二分是跳着访问的。

    简单的来讲,假设某索引占page cache 13页,此时数据已经写到了12页。按照kafka访问的特性,此时访问的数据都在第12页,因此二分查找的特性,此时缓存页的访问顺序依次是0,6,9,11,12。因为频繁被访问,所以这几页一定存在page cache中。


    当第12页不断被填充,满了之后会申请新页第13页保存索引项,而按照二分查找的特性,此时缓存页的访问顺序依次是:0,7,10,12。这7和10很久没被访问到了,很可能已经不再缓存中了,然后需要从磁盘上读取数据。注释说:在他们的测试中,这会导致至少会产生从几毫秒跳到1秒的延迟。

    基于以上问题,Kafka使用了改进版的二分查找,改的不是二分查找的内部,而且把所有索引项分为热区和冷区
    这个改进可以让查询热数据部分时,遍历的Page永远是固定的,这样能避免缺页中断。
    看到这里其实我想到了一致性hash,一致性hash相对于普通的hash不就是在node新增的时候缓存的访问固定,或者只需要迁移少部分数据。

    Kafka为什么不采用InnoDB的索引机制

    InnoDB中维护索引的代价比Kafka中的要高。Kafka中当有新的索引文件建立的时候ConcurrentSkipListMap才会更新,而不是每次有数据写入时就会更新,这块的维护量基本可以忽略,B+树中数据有插入、更新、删除的时候都需要更新索引,还会引来“页分裂”等相对耗时的操作。Kafka中的索引文件也是顺序追加文件的操作,和B+树比起来工作量要小很多。

    说到底还是应用场景不同所决定的。MySQL中需要频繁地执行CRUD的操作,CRUD是MySQL的主要工作内容,而为了支撑这个操作需要使用维护量大很多的B+树去支撑。Kafka中的消息一般都是顺序写入磁盘,再到从磁盘顺序读出(不深入探讨page cache等),他的主要工作内容就是:写入+读取,很少有检索查询的操作,换句话说,检索查询只是Kafka的一个辅助功能,不需要为了这个功能而去花费特别太的代价去维护一个高level的索引。前面也说过,Kafka中的这种方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。↵

    相关文章

      网友评论

          本文标题:05、Kafka的索引机制

          本文链接:https://www.haomeiwen.com/subject/wvztcktx.html