ByteBufferMessageSet分析
介绍完生产者和消费者对压缩消息的处理过程,我们回到服务端,开始对ByteBufferMessageSet的分析,它底层使用了ByteBuffer保存消息数据。ByteBufferMessageSet角色和功能与MemoryRecords类似。ByteBufferMessageSet提供了三个方面的功能:
- 将Message集合按照指定的压缩类型进行压缩,此功能主要用于构建ByteBufferMessageSet对象,通过ByteBufferMessageSet.create()方法完成。
- 提供迭代器,实现深层迭代和浅层迭代两种迭代方式。
- 提供了消息验证和offset分配的功能
在ByteBufferMessageSet.create()方法中实现了消息的压缩以及offset分配,步骤如下: - 如果传入的Message集合为空,则返回空ByteBuffer。
- 如果要求不对消息进行压缩,则通过OffsetAssigner分配每个消息的offset,在将消息写入到ByteBuffer后,返回ByteBuffer。OffsetAssigner的功能是存储一串offset值,并像迭代器那样逐个返回。
- 如果要求对消息进行压缩,则先将Message集合按照指定的压缩方式进行压缩并放入缓冲区,同时也会完成offset分配,然后按照压缩消息的格式写入外层消息,最后将整个外层消息所在的ByteBuffer返回。
ByteBufferMessageSet.create()方法代码如下:
private def create(offsetAssigner: OffsetAssigner,
compressionCodec: CompressionCodec,
wrapperMessageTimestamp: Option[Long],
timestampType: TimestampType,
messages: Message*): ByteBuffer = {
if (messages.isEmpty)//第一种情况:处理Message集合为空的情况
MessageSet.Empty.buffer
//第二种情况:不需要对Message集合进行压缩
else if (compressionCodec == NoCompressionCodec) {
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset())//为每个消息分配offset,并写入Buffer
buffer.rewind()
buffer//返回buffer
} else {//第三种情况:需要对Message集合进行压缩
//得到Magic值和时间戳
val magicAndTimestamp = wrapperMessageTimestamp match {
case Some(ts) => MagicAndTimestamp(messages.head.magic, ts)
case None => MessageSet.magicAndLargestTimestamp(messages)
}
var offset = -1L
//底层使用byte数组保存写入的压缩数据
val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
//创建指定压缩类型的输出流
val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
try {
for (message <- messages) {//遍历写入内层压缩信息
offset = offsetAssigner.nextAbsoluteOffset()
//Magic值为1,写入的是相对offset;Magic值为0,写入的是offset
if (message.magic != magicAndTimestamp.magic)
throw new IllegalArgumentException("Messages in the message set must have same magic value")
// Use inner offset if magic value is greater than 0
if (magicAndTimestamp.magic > Message.MagicValue_V0)
output.writeLong(offsetAssigner.toInnerOffset(offset))
else
output.writeLong(offset)
output.writeInt(message.size)//写入size
output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)//写入Message数据
}
} finally {
output.close()
}
}
val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
//按照消息格式写入整个外层消息。外层消息的offset是最后一条内层消息的offset
writeMessage(buffer, messageWriter, offset)
buffer.rewind()
buffer
}
ByteBufferMessageSet提供的浅层迭代器和深层迭代器与MemoryRecords的迭代器的实现和功能都十分类似,与MemoryRecords.RecordsIterator的实现类似。
ByteBufferMessageSet.validateMessagesAndAssignOffsets()方法实现了验证消息并分配offset的功能,需要验证的部分如下:
- 检查Magic Value;
- 检查时间戳和时间戳类型。
- 对于压缩消息需要检查它是否有key。
- 可以重新设定时间戳类型和时间戳。
- 进行offset分配;
- 如果消息压缩类型和Broker指定的压缩类型不一致,需要进行重新压缩。
下面是validateMessagesAndAssignOffsets()方法的代码第一个参数是分配offset的起始值,其他的参数比较好理解;改方法的第二个返回值表示ByteBufferMessageSet中Message集合的长度是否会发生变化。
/**
* Update the offsets for this message set and do further validation on messages including:
* 1. Messages for compacted topics must have keys
* 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets
* starting from 0.
* 3. When magic value = 1, validate and maybe overwrite timestamps of messages.
*
* This method will convert the messages in the following scenarios:
* A. Magic value of a message = 0 and messageFormatVersion is 1
* B. Magic value of a message = 1 and messageFormatVersion is 0
*
* If no format conversion or value overwriting is required for messages, this method will perform in-place
* operations and avoid re-compression.
*
* Returns the message set and a boolean indicating whether the message sizes may have changed.
*/
private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef,
now: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
compactedTopic: Boolean = false,
messageFormatVersion: Byte = Message.CurrentMagicValue,
messageTimestampType: TimestampType,
messageTimestampDiffMaxMs: Long): (ByteBufferMessageSet, Boolean) = {
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {//处理非压缩的情况
// check the magic value 检测所有的Massage的Magic value是否与指定的magic value 一致
if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) {
// Message format conversion
// 如果有Massage的Magic value是否与指定的magic value 不一致,则需要统一,这样
// 就可能导致消息长度变化,需要创建新的 ByteBufferMessageSet。同时还会进行offset分配,
// 验证并更新CRC32,时间戳等信息。
(convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
messageFormatVersion), true)
} else {
// Do in-place validation, offset assignment and maybe set timestamp
// 处理非压缩消息且Magic值统一的情况,由于Magic值确定,长度不会改变。主要是进行offset分配,
// 验证并更新CRC32,时间戳等信息
(validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
messageTimestampDiffMaxMs), false)
}
} else {//处理压缩的情况
// Deal with compressed messages
// We cannot do in place assignment in one of the following situations: inPlaceAssignment标识是否可以直接复用当前ByteBufferMessage对象。四种情况不能复用。
// 1. Source and target compression codec are different 1.消息当前压缩类型与此Broker指定的压缩类型不一致,需要重新压缩。
// 2. When magic value to use is 0 because offsets need to be overwritten 2.Magic为0时,需要重写消息的 offset 为绝对offset
// 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten.3.Magic大于0,但内部压缩信息某些字段需要修改,例如时间戳。
// 4. Message format conversion is needed.4.消息格式需要转换。
// No in place assignment situation 1 and 2 检测情况1,检测情况2
var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0
var maxTimestamp = Message.NoTimestamp
val expectedInnerOffset = new LongRef(0)
val validatedMessages = new mutable.ArrayBuffer[Message]
//遍历内存压缩消息,这个步骤会解压
this.internalIterator(isShallow = false).foreach { messageAndOffset =>
val message = messageAndOffset.message
validateMessageKey(message, compactedTopic)//检测消息的key
if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) {
// No in place assignment situation 3
// Validate the timestamp
validateTimestamp(message, now, messageTimestampType, messageTimestampDiffMaxMs)//检测时间戳
// Check if we need to overwrite offset 检测情况3,检查内部offset是否正常
if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement())
inPlaceAssignment = false
maxTimestamp = math.max(maxTimestamp, message.timestamp)
}
if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec)
throw new InvalidMessageException("Compressed outer message should not have an inner message with a " +
s"compression attribute set: $message")
// No in place assignment situation 4 检测情况4
if (message.magic != messageFormatVersion)
inPlaceAssignment = false
// 保存通过上述检测和转换的Message集合
validatedMessages += message.toFormatVersion(messageFormatVersion)
}
if (!inPlaceAssignment) {//不能复用当前ByteBufferMessageSet对象的场景
// Cannot do in place assignment.
val wrapperMessageTimestamp = {
if (messageFormatVersion == Message.MagicValue_V0)
Some(Message.NoTimestamp)
else if (messageFormatVersion > Message.MagicValue_V0 && messageTimestampType == TimestampType.CREATE_TIME)
Some(maxTimestamp)
else // Log append time
Some(now)
}
//创建新ByteBufferMessageSet对象,重新压缩。此时调用上面介绍的create()方法进行压缩
(new ByteBufferMessageSet(compressionCodec = targetCodec,
offsetCounter = offsetCounter,
wrapperMessageTimestamp = wrapperMessageTimestamp,
timestampType = messageTimestampType,
messages = validatedMessages: _*), true)
} else {//复用当前ByteBufferMessageSet对象,这样少一次压缩的操作
// Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message.
buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1)
// validate the messages
validatedMessages.foreach(_.ensureValid())
var crcUpdateNeeded = true
val timestampOffset = MessageSet.LogOverhead + Message.TimestampOffset
val attributeOffset = MessageSet.LogOverhead + Message.AttributesOffset
val timestamp = buffer.getLong(timestampOffset)
val attributes = buffer.get(attributeOffset)
if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp)
// We don't need to recompute crc if the timestamp is not updated.
crcUpdateNeeded = false
else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) {
// Set timestamp type and timestamp
//更新外层消息的时间戳,attribute和CRC32
buffer.putLong(timestampOffset, now)
buffer.put(attributeOffset, messageTimestampType.updateAttributes(attributes))
}
if (crcUpdateNeeded) {
// need to recompute the crc value
buffer.position(MessageSet.LogOverhead)
val wrapperMessage = new Message(buffer.slice())
Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum)
}
buffer.rewind()
(this, false)
}
}
}
ByteBufferMessageSet中的其他方法是用来辅助实现上述方法的。
最终,我们回到开始的那个问题,FileMessageSet.append()方法会将ByteBufferMessageSet中全部数据追加到日志文件中,对于压缩消息来说,多条压缩消息就以一个外层的状态存在于日志文件了。当消费者获取消息时也会得到压缩消息,这样就实现了"端到端的压缩"。
网友评论