- 将Message集合按照指定的压缩类型进行压缩,此功能主要用于构建ByteBufferMessageSet对象,通过ByteBufferMessageSet.create()方法完成。
- 提供迭代器,实现深层迭代和浅层迭代两种迭代方式。
- 提供了消息验证和offset分配的功能
在ByteBufferMessageSet.create()方法中实现了消息的压缩以及offset分配,步骤如下: - 如果传入的Message集合为空,则返回空ByteBuffer。
- 如果要求不对消息进行压缩,则通过OffsetAssigner分配每个消息的offset,在将消息写入到ByteBuffer后,返回ByteBuffer。OffsetAssigner的功能是存储一串offset值,并像迭代器那样逐个返回。
- 如果要求对消息进行压缩,则先将Message集合按照指定的压缩方式进行压缩并放入缓冲区,同时也会完成offset分配,然后按照压缩消息的格式写入外层消息,最后将整个外层消息所在的ByteBuffer返回。
private def create(offsetAssigner: OffsetAssigner,
compressionCodec: CompressionCodec,
wrapperMessageTimestamp: Option[Long],
timestampType: TimestampType,
messages: Message*): ByteBuffer = {
if (messages.isEmpty)//第一种情况:处理Message集合为空的情况
else if (compressionCodec == NoCompressionCodec) {
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset())//为每个消息分配offset,并写入Buffer
} else {//第三种情况:需要对Message集合进行压缩
val magicAndTimestamp = wrapperMessageTimestamp match {
case Some(ts) => MagicAndTimestamp(messages.head.magic, ts)
case None => MessageSet.magicAndLargestTimestamp(messages)
var offset = -1L
val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
try {
for (message <- messages) {//遍历写入内层压缩信息
offset = offsetAssigner.nextAbsoluteOffset()
if (message.magic != magicAndTimestamp.magic)
throw new IllegalArgumentException("Messages in the message set must have same magic value")
// Use inner offset if magic value is greater than 0
if (magicAndTimestamp.magic > Message.MagicValue_V0)
output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)//写入Message数据
} finally {
val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
writeMessage(buffer, messageWriter, offset)
- 检查Magic Value;
- 检查时间戳和时间戳类型。
- 对于压缩消息需要检查它是否有key。
- 可以重新设定时间戳类型和时间戳。
- 进行offset分配;
- 如果消息压缩类型和Broker指定的压缩类型不一致,需要进行重新压缩。
* Update the offsets for this message set and do further validation on messages including:
* 1. Messages for compacted topics must have keys
* 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets
* starting from 0.
* 3. When magic value = 1, validate and maybe overwrite timestamps of messages.
* This method will convert the messages in the following scenarios:
* A. Magic value of a message = 0 and messageFormatVersion is 1
* B. Magic value of a message = 1 and messageFormatVersion is 0
* If no format conversion or value overwriting is required for messages, this method will perform in-place
* operations and avoid re-compression.
* Returns the message set and a boolean indicating whether the message sizes may have changed.
private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef,
now: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
compactedTopic: Boolean = false,
messageFormatVersion: Byte = Message.CurrentMagicValue,
messageTimestampType: TimestampType,
messageTimestampDiffMaxMs: Long): (ByteBufferMessageSet, Boolean) = {
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {//处理非压缩的情况
// check the magic value 检测所有的Massage的Magic value是否与指定的magic value 一致
if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) {
// Message format conversion
// 如果有Massage的Magic value是否与指定的magic value 不一致,则需要统一,这样
// 就可能导致消息长度变化,需要创建新的 ByteBufferMessageSet。同时还会进行offset分配,
// 验证并更新CRC32,时间戳等信息。
(convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
messageFormatVersion), true)
} else {
// Do in-place validation, offset assignment and maybe set timestamp
// 处理非压缩消息且Magic值统一的情况,由于Magic值确定,长度不会改变。主要是进行offset分配,
// 验证并更新CRC32,时间戳等信息
(validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
messageTimestampDiffMaxMs), false)
} else {//处理压缩的情况
// Deal with compressed messages
// We cannot do in place assignment in one of the following situations: inPlaceAssignment标识是否可以直接复用当前ByteBufferMessage对象。四种情况不能复用。
// 1. Source and target compression codec are different 1.消息当前压缩类型与此Broker指定的压缩类型不一致,需要重新压缩。
// 2. When magic value to use is 0 because offsets need to be overwritten 2.Magic为0时,需要重写消息的 offset 为绝对offset
// 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten.3.Magic大于0,但内部压缩信息某些字段需要修改,例如时间戳。
// 4. Message format conversion is needed.4.消息格式需要转换。
// No in place assignment situation 1 and 2 检测情况1,检测情况2
var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0
var maxTimestamp = Message.NoTimestamp
val expectedInnerOffset = new LongRef(0)
val validatedMessages = new mutable.ArrayBuffer[Message]
this.internalIterator(isShallow = false).foreach { messageAndOffset =>
val message = messageAndOffset.message
validateMessageKey(message, compactedTopic)//检测消息的key
if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) {
// No in place assignment situation 3
// Validate the timestamp
validateTimestamp(message, now, messageTimestampType, messageTimestampDiffMaxMs)//检测时间戳
// Check if we need to overwrite offset 检测情况3,检查内部offset是否正常
if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement())
inPlaceAssignment = false
maxTimestamp = math.max(maxTimestamp, message.timestamp)
if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec)
throw new InvalidMessageException("Compressed outer message should not have an inner message with a " +
s"compression attribute set: $message")
// No in place assignment situation 4 检测情况4
if (message.magic != messageFormatVersion)
inPlaceAssignment = false
// 保存通过上述检测和转换的Message集合
validatedMessages += message.toFormatVersion(messageFormatVersion)
if (!inPlaceAssignment) {//不能复用当前ByteBufferMessageSet对象的场景
// Cannot do in place assignment.
val wrapperMessageTimestamp = {
if (messageFormatVersion == Message.MagicValue_V0)
else if (messageFormatVersion > Message.MagicValue_V0 && messageTimestampType == TimestampType.CREATE_TIME)
else // Log append time
(new ByteBufferMessageSet(compressionCodec = targetCodec,
offsetCounter = offsetCounter,
wrapperMessageTimestamp = wrapperMessageTimestamp,
timestampType = messageTimestampType,
messages = validatedMessages: _*), true)
} else {//复用当前ByteBufferMessageSet对象,这样少一次压缩的操作
// Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message.
buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1)
// validate the messages
var crcUpdateNeeded = true
val timestampOffset = MessageSet.LogOverhead + Message.TimestampOffset
val attributeOffset = MessageSet.LogOverhead + Message.AttributesOffset
val timestamp = buffer.getLong(timestampOffset)
val attributes = buffer.get(attributeOffset)
if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp)
// We don't need to recompute crc if the timestamp is not updated.
crcUpdateNeeded = false
else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) {
// Set timestamp type and timestamp
buffer.putLong(timestampOffset, now)
buffer.put(attributeOffset, messageTimestampType.updateAttributes(attributes))
if (crcUpdateNeeded) {
// need to recompute the crc value
val wrapperMessage = new Message(buffer.slice())
Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum)
(this, false)