美文网首页
Kafka系列4----LogSegment分析

Kafka系列4----LogSegment分析

作者: _六道木 | 来源:发表于2018-04-11 00:03 被阅读109次

    LogSegment代表了一个Segment文件,其中有3个字段,分别代表.log,.index,.timeIndex文件,即log,index和timeIndex

    核心字段

    log:对应.log文件,FileRecords类型,内含有一个FileChannel对象,主要用来操作文件
    index:对应.index文件,OffsetIndex类型,对索引文件的操作进行了一些封装
    timeIndex:对应.timeIndex文件,和OffsetIndex类似
    baseOffset:基础位移,基之前举的例子里的238
    indexIntervalBytes:表示每相隔多少字节就生成一个索引,即以前举的例子里的1kb
    bytesSinceLastIndexEntry:当前segment已经添加了多少个字节的消息,主要用来和indexIntervalBytes比较,判断是否需要生成索引,在生成索引后又重置为0

    接下来看下核心的几个方法

    存储消息

        def append(firstOffset: Long, largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords) {
            if (records.sizeInBytes > 0) {
                val physicalPosition = log.sizeInBytes()
                if (physicalPosition == 0)
                    rollingBasedTimestamp = Some(largestTimestamp)
                //....
                // 写入消息后,返回写入的大小
                val appendedBytes = log.append(records)
                // 更新最大时间戳
                if (largestTimestamp > maxTimestampSoFar) {
                    maxTimestampSoFar = largestTimestamp
                    offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
                }
                // 判断是否需要生成索引,当前批次中,消息总大小为bytesSinceLastIndexEntry
                if (bytesSinceLastIndexEntry > indexIntervalBytes) {
                    // 生成索引
                    index.append(firstOffset, physicalPosition)
                    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
                    // 生成索引后重新计算
                    bytesSinceLastIndexEntry = 0
                }
                // 累加消息大小
                bytesSinceLastIndexEntry += records.sizeInBytes
            }
        }
    

    逻辑很简单,看下注释就OK

    读取消息

    在分析读取消息之前,需要先分析一下OffsetIndex,因为核心之一是通过索引查找消息

    OffsetIndex

    OffsetIndex核心的字段和函数如下:
    _lastOffset:索引文件中最后的offset
    mmap:用来操作索引文件,在其父类中,MappedByteBuffer类型,使用了内存映射,具体看下https://blog.csdn.net/lirx_tech/article/details/51396268
    _entries:索引个数

    lookup函数

    返回小于等于给定offset的OffsetPosition对象(包含offset和position物理位置)

        def lookup(targetOffset: Long): OffsetPosition = {
            maybeLock(lock) {
                val idx = mmap.duplicate//副本
                val slot = indexSlotFor(idx, targetOffset, IndexSearchType.KEY)
                if (slot == -1)// 查找失败,返回
                    OffsetPosition(baseOffset, 0)
                else
                    parseEntry(idx, slot).asInstanceOf[OffsetPosition]
            }
        }
        override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
            OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
        }
        private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)
        private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4)
        
    

    OffsetPosition有两个属性,一个是offset,即代表log文件中第几个消息,另外一个是position,即log文件中的物理位置。
    另外,索引文件中,一个条目的大小为8个字节,即offset和postion分别为4个字节,所以relativeOffset计算方式为n*8,即为当前slot的位置,再加上baseOffset即得到offset,position=offset往后移动4个字节

    indexSlotFor函数

    返回大于等于指定offset对于的slot位置

        protected def indexSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = {
            // 无索引
            if (_entries == 0)
                return -1
            // 如果索引中最大的offset小于给定的offset,即查找失败返回-1
            if (compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
                return -1
    
            // 二分查找,返回大于等于指定offset对于的slot位置
            var lo = 0
            var hi = _entries - 1
            while (lo < hi) {
                val mid = ceil(hi / 2.0 + lo / 2.0).toInt
                val found = parseEntry(idx, mid)
                val compareResult = compareIndexEntry(found, target, searchEntity)
                if (compareResult > 0)
                    hi = mid - 1
                else if (compareResult < 0)
                    lo = mid
                else
                    return mid
            }
            lo
        }
    

    另外的比较简单,后续涉及再进行分析,接下来就是重点了

    read函数

        def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
                 minOneMessage: Boolean = false): FetchDataInfo = {
    
            val logSize = log.sizeInBytes // this may change, need to save a consistent copy
            // 获取起始位置和大小
            val startOffsetAndSize = translateOffset(startOffset)
    
            if (startOffsetAndSize == null)
                return null
            // 起始位置大于等于startOffset
            val startPosition = startOffsetAndSize.position.toInt
            val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
    
            val adjustedMaxSize =
                if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
                else maxSize
    
            // return a log segment but with zero size in the case below
            if (adjustedMaxSize == 0)
                return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
    
            // calculate the length of the message set to read based on whether or not they gave us a maxOffset
            val length = maxOffset match {
                case None =>
                    min((maxPosition - startPosition).toInt, adjustedMaxSize)
                case Some(offset) =>
                    if (offset < startOffset)
                        return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
                    //如果传了maxOffset,那么通过这个值重新查找
                    val mapping = translateOffset(offset, startPosition)
                    val endPosition =
                        if (mapping == null)
                            logSize // the max offset is off the end of the log, use the end of the file
                        else
                            mapping.position
                    min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
            }
    
            FetchDataInfo(offsetMetadata, log.read(startPosition, length),
                firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
        }
    

    先看下这个translateOffset是干嘛的

        private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogEntryPosition = {
            val mapping = index.lookup(offset)// 通过索引获取数据
            log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
        }
        
        public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
            for (FileChannelLogEntry entry : shallowEntriesFrom(startingPosition)) {
                long offset = entry.offset();
                if (offset >= targetOffset)
                    return new LogEntryPosition(offset, entry.position(), entry.sizeInBytes());
            }
            return null;
        }
    

    第一步:就是上面讲的,先通过索引寻找小于等于给offset的OffsetPostion对象
    第二步:先通过startingPosition这个物理位置开始往后寻找所有消息(当前文件),然后一个个的比对,看看是否大于我们需要找的targetOffset并返回

    总结

    核心的基本分析完毕了,来总结一下:

    建立索引

    1.索引大小8字节
    2.写入索引时包括消息在log中的为第几个消息,以及对应物理offset

    索引查找

    1.通过二分查找获取小于等于给定offset的最大的OffsetPosition对象

    添加消息

    1.先将消息写入.log文件
    2.如果写入的大小累积大于indexIntervalBytes,建立索引

    查找消息

    1.先二分查找获取对应索引,获取到对应的物理offset
    2.拿着物理offset去log文件顺序查找对应消息
    3.返回查找到的消息

    相关文章

      网友评论

          本文标题:Kafka系列4----LogSegment分析

          本文链接:https://www.haomeiwen.com/subject/glvohftx.html