美文网首页
Kafka源码分析-Server-日志存储(3)-ByteBuf

Kafka源码分析-Server-日志存储(3)-ByteBuf

作者: 陈阳001 | 来源:发表于2019-02-16 22:54 被阅读0次

    ByteBufferMessageSet分析

    介绍完生产者和消费者对压缩消息的处理过程,我们回到服务端,开始对ByteBufferMessageSet的分析,它底层使用了ByteBuffer保存消息数据。ByteBufferMessageSet角色和功能与MemoryRecords类似。ByteBufferMessageSet提供了三个方面的功能:

    • 将Message集合按照指定的压缩类型进行压缩,此功能主要用于构建ByteBufferMessageSet对象,通过ByteBufferMessageSet.create()方法完成。
    • 提供迭代器,实现深层迭代和浅层迭代两种迭代方式。
    • 提供了消息验证和offset分配的功能
      在ByteBufferMessageSet.create()方法中实现了消息的压缩以及offset分配,步骤如下:
    • 如果传入的Message集合为空,则返回空ByteBuffer。
    • 如果要求不对消息进行压缩,则通过OffsetAssigner分配每个消息的offset,在将消息写入到ByteBuffer后,返回ByteBuffer。OffsetAssigner的功能是存储一串offset值,并像迭代器那样逐个返回。
    • 如果要求对消息进行压缩,则先将Message集合按照指定的压缩方式进行压缩并放入缓冲区,同时也会完成offset分配,然后按照压缩消息的格式写入外层消息,最后将整个外层消息所在的ByteBuffer返回。
      ByteBufferMessageSet.create()方法代码如下:
    private def create(offsetAssigner: OffsetAssigner,
                         compressionCodec: CompressionCodec,
                         wrapperMessageTimestamp: Option[Long],
                         timestampType: TimestampType,
                         messages: Message*): ByteBuffer = {
        if (messages.isEmpty)//第一种情况:处理Message集合为空的情况
          MessageSet.Empty.buffer
        //第二种情况:不需要对Message集合进行压缩
        else if (compressionCodec == NoCompressionCodec) {
          val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
          for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset())//为每个消息分配offset,并写入Buffer
          buffer.rewind()
          buffer//返回buffer
        } else {//第三种情况:需要对Message集合进行压缩
          //得到Magic值和时间戳
          val magicAndTimestamp = wrapperMessageTimestamp match {
            case Some(ts) => MagicAndTimestamp(messages.head.magic, ts)
            case None => MessageSet.magicAndLargestTimestamp(messages)
          }
          var offset = -1L
          //底层使用byte数组保存写入的压缩数据
          val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
          messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
            //创建指定压缩类型的输出流
            val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
            try {
              for (message <- messages) {//遍历写入内层压缩信息
                offset = offsetAssigner.nextAbsoluteOffset()
                //Magic值为1,写入的是相对offset;Magic值为0,写入的是offset
                if (message.magic != magicAndTimestamp.magic)
                  throw new IllegalArgumentException("Messages in the message set must have same magic value")
                // Use inner offset if magic value is greater than 0
                if (magicAndTimestamp.magic > Message.MagicValue_V0)
                  output.writeLong(offsetAssigner.toInnerOffset(offset))
                else
                  output.writeLong(offset)
                output.writeInt(message.size)//写入size
                output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)//写入Message数据
              }
            } finally {
              output.close()
            }
          }
          val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
          //按照消息格式写入整个外层消息。外层消息的offset是最后一条内层消息的offset
          writeMessage(buffer, messageWriter, offset)
          buffer.rewind()
          buffer
        }
    

    ByteBufferMessageSet提供的浅层迭代器和深层迭代器与MemoryRecords的迭代器的实现和功能都十分类似,与MemoryRecords.RecordsIterator的实现类似。
    ByteBufferMessageSet.validateMessagesAndAssignOffsets()方法实现了验证消息并分配offset的功能,需要验证的部分如下:

    • 检查Magic Value;
    • 检查时间戳和时间戳类型。
    • 对于压缩消息需要检查它是否有key。
    • 可以重新设定时间戳类型和时间戳。
    • 进行offset分配;
    • 如果消息压缩类型和Broker指定的压缩类型不一致,需要进行重新压缩。
      下面是validateMessagesAndAssignOffsets()方法的代码第一个参数是分配offset的起始值,其他的参数比较好理解;改方法的第二个返回值表示ByteBufferMessageSet中Message集合的长度是否会发生变化。
    /**
       * Update the offsets for this message set and do further validation on messages including:
       * 1. Messages for compacted topics must have keys
       * 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets
       *    starting from 0.
       * 3. When magic value = 1, validate and maybe overwrite timestamps of messages.
       *
       * This method will convert the messages in the following scenarios:
       * A. Magic value of a message = 0 and messageFormatVersion is 1
       * B. Magic value of a message = 1 and messageFormatVersion is 0
       *
       * If no format conversion or value overwriting is required for messages, this method will perform in-place
       * operations and avoid re-compression.
       *
       * Returns the message set and a boolean indicating whether the message sizes may have changed.
       */
      private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef,
                                                          now: Long,
                                                          sourceCodec: CompressionCodec,
                                                          targetCodec: CompressionCodec,
                                                          compactedTopic: Boolean = false,
                                                          messageFormatVersion: Byte = Message.CurrentMagicValue,
                                                          messageTimestampType: TimestampType,
                                                          messageTimestampDiffMaxMs: Long): (ByteBufferMessageSet, Boolean) = {
        if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {//处理非压缩的情况
          // check the magic value 检测所有的Massage的Magic value是否与指定的magic value 一致
          if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) {
            // Message format conversion
            // 如果有Massage的Magic value是否与指定的magic value 不一致,则需要统一,这样
            // 就可能导致消息长度变化,需要创建新的 ByteBufferMessageSet。同时还会进行offset分配,
            // 验证并更新CRC32,时间戳等信息。
            (convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
              messageFormatVersion), true)
          } else {
            // Do in-place validation, offset assignment and maybe set timestamp
            // 处理非压缩消息且Magic值统一的情况,由于Magic值确定,长度不会改变。主要是进行offset分配,
            // 验证并更新CRC32,时间戳等信息
            (validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
              messageTimestampDiffMaxMs), false)
          }
        } else {//处理压缩的情况
          // Deal with compressed messages
          // We cannot do in place assignment in one of the following situations: inPlaceAssignment标识是否可以直接复用当前ByteBufferMessage对象。四种情况不能复用。
          // 1. Source and target compression codec are different   1.消息当前压缩类型与此Broker指定的压缩类型不一致,需要重新压缩。
          // 2. When magic value to use is 0 because offsets need to be overwritten 2.Magic为0时,需要重写消息的 offset 为绝对offset
          // 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten.3.Magic大于0,但内部压缩信息某些字段需要修改,例如时间戳。
          // 4. Message format conversion is needed.4.消息格式需要转换。
    
          // No in place assignment situation 1 and 2  检测情况1,检测情况2
          var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0
    
          var maxTimestamp = Message.NoTimestamp
          val expectedInnerOffset = new LongRef(0)
          val validatedMessages = new mutable.ArrayBuffer[Message]
          //遍历内存压缩消息,这个步骤会解压
          this.internalIterator(isShallow = false).foreach { messageAndOffset =>
            val message = messageAndOffset.message
            validateMessageKey(message, compactedTopic)//检测消息的key
    
            if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) {
              // No in place assignment situation 3
              // Validate the timestamp
              validateTimestamp(message, now, messageTimestampType, messageTimestampDiffMaxMs)//检测时间戳
              // Check if we need to overwrite offset  检测情况3,检查内部offset是否正常
              if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement())
                inPlaceAssignment = false
              maxTimestamp = math.max(maxTimestamp, message.timestamp)
            }
    
            if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec)
              throw new InvalidMessageException("Compressed outer message should not have an inner message with a " +
                s"compression attribute set: $message")
    
            // No in place assignment situation 4 检测情况4
            if (message.magic != messageFormatVersion)
              inPlaceAssignment = false
            // 保存通过上述检测和转换的Message集合
            validatedMessages += message.toFormatVersion(messageFormatVersion)
          }
    
          if (!inPlaceAssignment) {//不能复用当前ByteBufferMessageSet对象的场景
            // Cannot do in place assignment.
            val wrapperMessageTimestamp = {
              if (messageFormatVersion == Message.MagicValue_V0)
                Some(Message.NoTimestamp)
              else if (messageFormatVersion > Message.MagicValue_V0 && messageTimestampType == TimestampType.CREATE_TIME)
                Some(maxTimestamp)
              else // Log append time
                Some(now)
            }
            //创建新ByteBufferMessageSet对象,重新压缩。此时调用上面介绍的create()方法进行压缩
            (new ByteBufferMessageSet(compressionCodec = targetCodec,
                                      offsetCounter = offsetCounter,
                                      wrapperMessageTimestamp = wrapperMessageTimestamp,
                                      timestampType = messageTimestampType,
                                      messages = validatedMessages: _*), true)
          } else {//复用当前ByteBufferMessageSet对象,这样少一次压缩的操作
            // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message.
            buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1)
            // validate the messages
            validatedMessages.foreach(_.ensureValid())
    
            var crcUpdateNeeded = true
            val timestampOffset = MessageSet.LogOverhead + Message.TimestampOffset
            val attributeOffset = MessageSet.LogOverhead + Message.AttributesOffset
            val timestamp = buffer.getLong(timestampOffset)
            val attributes = buffer.get(attributeOffset)
            if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp)
              // We don't need to recompute crc if the timestamp is not updated.
              crcUpdateNeeded = false
            else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) {
              // Set timestamp type and timestamp
              //更新外层消息的时间戳,attribute和CRC32
              buffer.putLong(timestampOffset, now)
              buffer.put(attributeOffset, messageTimestampType.updateAttributes(attributes))
            }
    
            if (crcUpdateNeeded) {
              // need to recompute the crc value
              buffer.position(MessageSet.LogOverhead)
              val wrapperMessage = new Message(buffer.slice())
              Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum)
            }
            buffer.rewind()
            (this, false)
          }
        }
      }
    

    ByteBufferMessageSet中的其他方法是用来辅助实现上述方法的。
    最终,我们回到开始的那个问题,FileMessageSet.append()方法会将ByteBufferMessageSet中全部数据追加到日志文件中,对于压缩消息来说,多条压缩消息就以一个外层的状态存在于日志文件了。当消费者获取消息时也会得到压缩消息,这样就实现了"端到端的压缩"。

    相关文章

      网友评论

          本文标题:Kafka源码分析-Server-日志存储(3)-ByteBuf

          本文链接:https://www.haomeiwen.com/subject/kzhweqtx.html