美文网首页
Kafka源码分析-Server-日志存储(1)

Kafka源码分析-Server-日志存储(1)

作者: 陈阳001 | 来源:发表于2019-02-11 20:13 被阅读0次

    基本概念

    Kafka使用日志文件的方式保存生产者发送的消息。每条消息都有一个offset值来表示它在分区的偏移量,这个offset值是逻辑值,并不是消息实际的存放物理地址。offset类似数据库表的主键,主键唯一确定了数据库表中的一条记录,offset唯一确定了分区的一条消息。Kafka存储机制在逻辑上如下图:


    日志存储结构.png

    为了提高写入的性能,同一个分区中的消息是顺序写入的,这就避免了随机写入带来的性能问题。一个topic可以有n个分区,每个分区也有多个副本。当一个分区的副本(无论是Leader副本还是Follower副本)被划分到某个Broker上时,Kafka就要在此Broker上为此分区建立相应的Log,生产者发送的消息会存储在Log里,然后被消费者拉取消费。
    Kafka中存储的数据都是海量的,为了避免日志文件太大,Log并不是直接对应磁盘上的一个日志文件,而是对应磁盘上的一个目录,目录的命名规则是<topic_name>_<partition_id>,Log和分区直接的关系是一一对应的,对应分区的全部消息都存储在这个目录中。
    Kafka通过分段的方式将Log分为多个LogSegment,LogSegment是一个逻辑上的概念,一个LogSegment对应磁盘上的一个日志文件和一个索引文件,,日志文件用于记录消息,索引文件保存消息的索引。日志文档到一个阈值时,就会创建新的日志文件继续写入后续的消息和索引信息。日志文件的文件名的命名规则是[baseOffset].log,baseOffset是日志文件中的第一条消息的offset。下面是Log的结构:


    Log结构.png
    为了提高查询消息的效率,每个日志文件都对应一个索引文件,这个索引文件并没有为每条消息都建索引,而是使用稀疏索引方式为日志文件中的部分消息建立了索引。下面的图展示了所有文件和日志文件的关系:
    log的索引.png

    FileMessageSet

    Kafka使用FileMessageSet管理日志文件,它对应磁盘上一个真正的日志文件。FileMessageSet继承了MessageSet抽象类,如下图:


    FileMessageSet类图.png

    MessageSet中保存的数据格式有三个部分:8个字节的offset值,4个字节的size表示message data的大小,这两个结合成为LogOverhead, message data保存了消息的数据,逻辑上对应一个Message对象:


    FileMessageSet定义的消息结构.png
    Kafka使用Message类表示消息,Message类使用ByteBuffer保存数据,格式和各个部分的含义如下:
    Message类定义的保存数据.png
    • CRC32:4个字节,消息的校验码。
    • magic:1个字节,魔数标识,与消息的格式相关,取值为0或1。当magic为0时,消息的offset使用绝对offset且消息格式中没有timestamp部分;当magic为1时,消息的offset使用相对的offset且消息格式中有timestamp部分。所以,magic不同,消息的长度不同。
    • attributes:1个字节,消息的属性。其中0~2位表示消息使用的压缩类型,0表示gzip压缩,2表示snappy压缩,3表示lz4压缩。第3位表示时间戳类型,0表示创建时间,1表示追加时间。
    • timestamp:时间戳,含义有attributes的第3位确定。
    • key length:消息key的长度。
    • key:消息的key。
    • value length:消息value的长度。
    • value: 消息的value。
      MessageSet抽象类中定义了两个比较关键的方法:
    /** Write the messages in this set to the given channel starting at the given offset byte. 
        * Less than the complete amount may be written, but no more than maxSize can be. The number
        * of bytes written is returned
        * 将当前MessageSet中的消息写入到Channel中
        * */
      def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
    /**
       * Provides an iterator over the message/offset pairs in this set
        * 提供迭代器,顺序读取MessageSet中的消息
       */
      def iterator: Iterator[MessageAndOffset]
    

    这两个方法说明MessageSet具有顺序写入消息和顺序读取的特性。后面介绍FileMessageSet和ByteBufferMessageSet时会说明这两个方法的实现。

    分析FileMessageSet实现类

    核心字段:
    *file: java.io.File类型,指向磁盘上对应的日志文件。
    *channel:FileChannel类型,用于读写对应的日志文件。
    *start和end:MessageSet对象除了表示一个完整的日志文件,还可以表示日志文件分片(Slice),start和end分别表示分片的起始位置和结束位置。文件分配的相关概念可以找资料了解下。

    • isSlice:Boolean类型,表示当前FileMessageSet是日志文件的分片。
    • _size:FileMessageSet大小,单位是字节。如果FileMessageSet是日志文件的分片,就表示分片的大小(即end-start的值);如果不是分片,则表示整个日志文件的大小。因为会有多个Handler线程并发向一个分区写入消息,所有的_size是AtomicInteger类型。
      FileMessageSet中有多个重载的构造方法,这里选择一个比较重要的构造方法来介绍。此构造方法会创建一个非分片的FileMessageSet对象。在Window NTFS文件系统或老版本的Linux文件系统上,进行文件的预分配会提高后续写操作的性能,为此FileMessageSet提供了preallocate的选项,决定是否开启预分配的功能。我们也可以通过FileMessageSet构造方法的mutable参数决定是否创建只读的FileMessageSet。
     * Create a file message set with no slicing, and with initFileSize and preallocate.
       * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
       * with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance.
       * If it's new file and preallocate is true, end will be set to 0.  Otherwise set to Int.MaxValue.
       */
      def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) =
          this(file,
            //如果使用preallocate进行预分配,end会初始化为零
            channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
            start = 0,
            end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
            isSlice = false)
    /**
       * Open a channel for the given file
       * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
       * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
       * @param file File path
       * @param mutable mutable
       * @param fileAlreadyExists File already exists or not
       * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
       * @param preallocate Pre allocate file or not, gotten from configuration.
        * FileMessageSet.openChannel()方法的具体实现。
       */
      def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
        if (mutable) {//根据mutable参数创建的FileChannel是否可写
          if (fileAlreadyExists)
            new RandomAccessFile(file, "rw").getChannel()
          else {
            if (preallocate) {//进行文件预分配
              val randomAccessFile = new RandomAccessFile(file, "rw")
              randomAccessFile.setLength(initFileSize)
              randomAccessFile.getChannel()
            }
            else
              new RandomAccessFile(file, "rw").getChannel()//创建可读可写的FileChannel
          }
        }
        else
          new FileInputStream(file).getChannel()//创建只读的FileChannel
      }
    

    在FileMessageSet对象初始化的过程中,会移动FileChannel的position指针,原因是为了每次写入的消息都在日志文件的尾部,避免重启服务后的写入操作覆盖之前的操作。对应新创建的且进行了预分配空间的日志文件,其end会初始化为0,所以也是从文件的起始写入数据的。

    /* if this is not a slice, update the file pointer to the end of the file
      * 将position移动到最后一个字节,之后从此position开始写消息,这样防止重启后覆盖之前的操作
      *
      * */
      if (!isSlice)
        /* set the file position to the last byte in the file */
        channel.position(math.min(channel.size.toInt, end))
    

    介绍完FileMessageSet的构造过程,下面介绍其读写过程。FileMessageSet.append()方法实现了写日志文件的功能,其参数必须是ByteBufferMessageSet对象,下面是FileMessageSet.append()方法的代码:

    /**
       * Append these messages to the message set
       */
      def append(messages: ByteBufferMessageSet) {
        val written = messages.writeFullyTo(channel)//写文件
        _size.getAndAdd(written)//修改FileMessageSet的大小
      }
    /** Write the messages in this set to the given channel 
        * 下面是 ByteBufferMessageSet.writeFullyTo()方法
        * */
      def writeFullyTo(channel: GatheringByteChannel): Int = {
        buffer.mark()
        var written = 0
        while (written < sizeInBytes)//将ByteBufferMessageSet中的数据全部写入文件
          written += channel.write(buffer)
        buffer.reset()
        written
      }
    

    查找指定消息的功能在FileMessageSet.searchFor()方法中实现。searchFor()方法的逻辑是从指定的startPosition开始逐条遍历FileMessageSet中的消息,并将每个消息的offset和targetOffset,最后返回查找到的offset。在遍历过程中不会将消息的key和value读取到内存,只是只读取LogOverhead(即offset和size),并通过size定位到下一条消息的开始位置。FileMessageSet.searchFor()方法代码如下:

    /**
       * Search forward for the file position of the last offset that is greater than or equal to the target offset
       * and return its physical position. If no such offsets are found, return null.
       * @param targetOffset The offset to search for.
       * @param startingPosition The starting position in the file to begin searching from.
       */
      def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
        var position = startingPosition //起始位置
        //创建用于读取 LogOverhead(即offset和size)的ByteBuffer(长度12)
        val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
        val size = sizeInBytes()//当前FileMessageSet的大小,单位是字节
        //从position开始逐条消息遍历
        while(position + MessageSet.LogOverhead < size) {
          buffer.rewind()//重置ByteBuffer的position指针,准备读入数据
          //读取LogOverhead。这里会确保startingPosition位于一个消息的开头,否则
          //读取到的并不是 LogOverhead,这个条件的保证会在后面提到
          channel.read(buffer, position)
          if(buffer.hasRemaining)//未读取到12个字节的LogOverhead,抛出异常
            throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
                                            .format(targetOffset, startingPosition, file.getAbsolutePath))
          buffer.rewind()//重置ByteBuffer的position指针,准备从ByteBuffer中读取数据
          val offset = buffer.getLong()//读取消息的offset,8个字节
          if(offset >= targetOffset)//判断是否符合退出条件
            return OffsetPosition(offset, position)//得到消息的位置
          val messageSize = buffer.getInt()//获取消息的size,4个字节
          if(messageSize < Message.MinMessageOverhead)
            throw new IllegalStateException("Invalid message size: " + messageSize)
          //移动Position,准备读取下个消息
          position += MessageSet.LogOverhead + messageSize
        }
        null//找不到offset大于等于targetOffset,则返回Null
      }
    

    FileMessageSet.writeTo()方法是将FileMessageSet中的数据写入指定的其他Channel中,这里先了解此方法的功能,具体实现会在后面介绍“零拷贝”的时候一起介绍。FileMessageSet.read*()方法是从FileMessageSet中读取数据,可以将FileMessageSet中的数据读入到别的ByteBuffer中返回,也可以按照指定位置和长度形成分片的FileMessageSet对象返回。FileMessageSet.delete()方法是将整个日志文件删除。
    FileMessageSet还有一个truncateTo()方法,主要负责将日志文件截断到targetSize大小。此方法在后面介绍分区中Leader副本切换时还会提到。下面是truncateTo()方法的具体实现:

    /**
       * Truncate this file message set to the given size in bytes. Note that this API does no checking that the
       * given size falls on a valid message boundary.
       * In some versions of the JDK truncating to the same size as the file message set will cause an
       * update of the files mtime, so truncate is only performed if the targetSize is smaller than the
       * size of the underlying FileChannel.
       * It is expected that no other threads will do writes to the log when this function is called.
       * @param targetSize The size to truncate to. Must be between 0 and sizeInBytes.
       * @return The number of bytes truncated off
       */
      def truncateTo(targetSize: Int): Int = {
        val originalSize = sizeInBytes
        if(targetSize > originalSize || targetSize < 0)//检测targetSize的有效性
          throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
                                   " size of this log segment is " + originalSize + " bytes.")
        if (targetSize < channel.size.toInt) {
          channel.truncate(targetSize)//裁剪文件
          channel.position(targetSize)//移动position
          _size.set(targetSize)//修改_size
        }
        originalSize - targetSize//返回剪裁掉的字节数
      }
    

    FileMessageSet还实现了iterator()方法,返回一个迭代器。FileMessageSet迭代器读取消息的逻辑是:先读取消息的LogOverhead部分,然后按照size分配合适的ByteBuffer,再读取message data部分,最后将message data和offset封装成MessageOffset对象返回。迭代器的实现和searchFor()方法类似。

    相关文章

      网友评论

          本文标题:Kafka源码分析-Server-日志存储(1)

          本文链接:https://www.haomeiwen.com/subject/vahceqtx.html