MessageSet的另一个子类是ByteBufferMessageSet,FileMessageSet.append()方法的参数就是这个类的对象,为什么必须append()方法的参数是ByteBufferMessageSet,而不直接是ByteBuffer呢?
回顾一下,在第二章生产者时介绍了MemoryRecords时提到,向MemoryRecords写入消息时,可以使用Compressor对消息批量进行压缩,然后将压缩后的消息发送给服务端。
在有些设计中,将每个请求的负载单独压缩后再进行传输,这种设计虽然可以减少传输的数据量,但是存在一个问题,我们常见的压缩算法是数据量越大压缩率越高,一般情况下,每个请求的负载不会特别大,这就导致了压缩率比较低。而且在一般情况下,生产者发送的压缩数据在服务端也是以保持压缩状态进行存储的,消费者从服务端获取的也是压缩消息,消费者在处理压缩消息之前才会解压消息,这就实现了“端到端的压缩”。
压缩后的消息格式和非压缩的消息格式类似,分两层:

解压消息的key为null,所以上图没有画出key这部分;value中保存的是多条消息压缩数据。
创建压缩消息
先回头看看第二章介绍的Compressor的构造方法的源码:
public Compressor(ByteBuffer buffer, CompressionType type) {
this.type = type;
this.initPos = buffer.position();
this.numRecords = 0;
this.writtenUncompressed = 0;
this.compressionRate = 1;
this.maxTimestamp = Record.NO_TIMESTAMP;
if (type != CompressionType.NONE) {
// for compressed records, leave space for the header and the shallow message metadata
// and move the starting position to the value payload offset
//如果进行消息压缩,则在Buffer的首部提前空出offset和size空间(Records,LOG_OVERHEAD)
//以及CRC32等字段的空间(Record.RECORD_OVERHEAD)
buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
}
// create the stream根据压缩算法类型,初始化bufferStream和appendStream
bufferStream = new ByteBufferOutputStream(buffer);
appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}
通过MemoryRecords.append()方法不断写入消息并压缩的过程前面已经分析过了。当MemoryRecords写满,会调用Compressor.close()方法,完成offset,size,CRC32等字段的写入,之后发送给服务端。Compressor.close()方法如下:
public void close() {
try {
appendStream.close();//要压缩的已经写完了,先关闭压缩输出流
} catch (IOException e) {
throw new KafkaException(e);
}
if (type != CompressionType.NONE) {//使用压缩算法
ByteBuffer buffer = bufferStream.buffer();
int pos = buffer.position();//保存buffer尾部位置
// write the header, for the end offset write as number of records - 1
buffer.position(initPos);//移动position指针到buffer头部
buffer.putLong(numRecords - 1);//写入offset,写入的是消息数量
buffer.putInt(pos - initPos - Records.LOG_OVERHEAD);//写入size
// write the shallow message (the crc and value size are not correct yet)
//写入压缩消息的相关信息,如,时间戳,压缩算法等,MAGIC_VALUE为1
Record.write(buffer, maxTimestamp, null, null, type, 0, -1);
// compute the fill the value size 计算并写入整个压缩消息value部分的长度
int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD;
buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET_V1, valueSize);
// compute and fill the crc at the beginning of the message
long crc = Record.computeChecksum(buffer,
initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET,
pos - initPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET);
Utils.writeUnsignedInt(buffer, initPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc);
// reset the position
buffer.position(pos);//还原buffer的position指针
// update the compression ratio
this.compressionRate = (float) buffer.position() / this.writtenUncompressed;
TYPE_TO_RATE[type.id] = TYPE_TO_RATE[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
}
}
服务端为每个消息分配offset,要对消息解压缩吗?这里是这样设计的:
1)当生产者产生创建压缩信息的时候,对压缩消息设置的offset是内部的offset(inner offset),即分配给每个消息的offset是0,1,2......请读者回顾第二章介绍的RecordBatch.tryAppend()方法。
2)在Kafka服务端为消息分配offset时,会根据外层消息中记录的内层压缩消息的个数为外层消息分配offset,为外层消息分配的offset是内层压缩消息中最后一个消息的offset值,如下图:

3)当消费者获取压缩消息后进行解压缩,就可以根据内部消息的,相对的offset和外层消息的offset计算出每个消息的offset值了。
迭代压缩消息
上文提到,消费者获取的压缩消息合适和生产者发送的格式是一模一样的。在第三章介绍Fetch.parseFetchData()方法解析CompleteFetch时,对MemoryRecords进行迭代。下面看下MemoryRecords的迭代器的相关实现,其中有涉及到压缩消息相关的实现。
MemoryRecords的迭代器是静态内部类RecordIterator,它继承了AbstractorIterator抽象类。AbstractorIterator实现了Iterator接口,它对Iterator进行了实现和简化。AbstractorIterator只暴露一个makeNext()方法供子类去实现,这个方法主要负责用于创建下一个迭代项。这样,开发人员就不用了解Iterator接口的细节,只关注如何创建下一个迭代项就可以了。
AbstractorIterator中使用next字段指向迭代的下一项,使用state字段标识当前迭代器的状态,state字段是State枚举类型,其取值和含义如下所述:
- READY:迭代器已经准备好迭代下一项。
- NOT_READY:迭代器未准备好迭代下一项,需要调用maybeCompleteNext()。
- DONE:当前迭代已经结束。
- FAILED:当前迭代器在迭代过程中出现异常。
AbstractorIterator的代码如下:
public abstract class AbstractIterator<T> implements Iterator<T> {
private static enum State {
READY, NOT_READY, DONE, FAILED
};
private State state = State.NOT_READY;
private T next;
@Override
public boolean hasNext() {
switch (state) {
case FAILED:
throw new IllegalStateException("Iterator is in failed state");
case DONE://迭代结束,返回false.
return false;
case READY://迭代准备好了,返回true
return true;
default://NOT_READY状态,需要调用maybeComputeNext()方法获取next项
return maybeComputeNext();
}
}
@Override
public T next() {
if (!hasNext())
throw new NoSuchElementException();
state = State.NOT_READY;//将state字段重置为NOT_READY状态
if (next == null)
throw new IllegalStateException("Expected item but none found.");
return next;//返回next
}
//不支持remove操作,调用报异常。
@Override
public void remove() {
throw new UnsupportedOperationException("Removal not supported");
}
public T peek() {
if (!hasNext())
throw new NoSuchElementException();
return next;
}
//子类在实现makeNext()方法中可以通过调用allDone()方法结束整个迭代
protected T allDone() {
state = State.DONE;
return null;
}
//子类实现,返回下一个迭代项
protected abstract T makeNext();
private Boolean maybeComputeNext() {
state = State.FAILED;//若在子类的实现makeNext()中抛出异常,则state会处于这个状态
next = makeNext();
if (state == State.DONE) {
return false;
} else {//在makeNext()方法中完成了next项的构造
state = State.READY;
return true;
}
}
}
为了同时能够处理压缩消息和非压缩消息,MemoryRecords.RecordsIterator分为两层迭代,使用shallow参数标区分。当shallow为true时,认为消息是非压缩消息,只迭代当前这一层消息,我们称为“浅层迭代”(shallow Iterator);当shallow为false时,不只迭代当前层消息,还会创建Inner Iterator(也是MemoryRecords.RecordsIterator对象)迭代嵌套的压缩消息,我们称之为“深层迭代”(deep Iterator)。
MemoryRecords.RecordsIterator的关键字段如下:
- buffer:指向MemoryRecords的buffer字段。其中消息可以是压缩的,也可以是非压缩的。
- stream:读取buffer的输入流。如果迭代压缩消息,则是对应的解压缩输入流。
- type:压缩类型。
- shallow:标识当前RecordsIterator是深层迭代器还是浅层迭代器。
- innerIter:迭代压缩消息的Inner Iterator。
- logEntries:ArrayDeque<LogEntry>类型集合。Inner Iterator需要迭代的压缩消息集合。Outer Iterator的此字段始终为null,其中LogEntry封装了消息及其offset。
- absoluteBaseOffset:在Inner Iterator 迭代压缩消息时使用,用于记录压缩消息中第一个消息的offset,并根据此字段计算每个消息的offset。Outer Iterator的此字段始终为-1。
MemoryRecords.RecordsIterator的构造方法也有两个:一个是public的,提供外部的API,用于创建Outer Iterator;另一个是private的,用于创建Inner Iterator。
public RecordsIterator(ByteBuffer buffer, boolean shallow) {
this.type = CompressionType.NONE;//外层消息是非压缩的
this.buffer = buffer;//指向MemoryRecords.buffer字段
this.shallow = shallow;//标识是否为深层迭代器
this.stream = new DataInputStream(new ByteBufferInputStream(buffer));//输入流
this.logEntries = null;
this.absoluteBaseOffset = -1;
}
// Private constructor for inner iterator.
// Inner Iterator的构造方法
private RecordsIterator(LogEntry entry) {
this.type = entry.record().compressionType();//指定内存压缩消息的压缩类型。
this.buffer = entry.record().value();
this.shallow = true;
//创建指定压缩类型的输入流
this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
long wrapperRecordOffset = entry.offset();//外层消息的offset
// If relative offset is used, we need to decompress the entire message first to compute
// the absolute offset.
if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
this.logEntries = new ArrayDeque<>();
long wrapperRecordTimestamp = entry.record().timestamp();
while (true) {//在这层循环中,将内层消息全部解压出来并添加到logEntries集合中
try {
//对于内层消息,getNextEntryFromStream()方法是读取并解压缩消息
//对于外层消息或非压缩消息,则仅仅是读取消息
LogEntry logEntry = getNextEntryFromStream();
Record recordWithTimestamp = new Record(logEntry.record().buffer(),
wrapperRecordTimestamp,
entry.record().timestampType());
//添加到logEntries集合中
logEntries.add(new LogEntry(logEntry.offset(), recordWithTimestamp));
} catch (EOFException e) {
break;
} catch (IOException e) {
throw new KafkaException(e);
}
}
//计算absoluteBaseOffset
this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
} else {
this.logEntries = null;
this.absoluteBaseOffset = -1;
}
}
makeNext()方法的相关实现代码如下,会通过一个示例介绍深层次迭代的整个过程:
/*
* Read the next record from the buffer.
*
* Note that in the compressed message set, each message value size is set as the size of the un-compressed
* version of the message value, so when we do de-compression allocating an array of the specified size for
* reading compressed value data is sufficient.
*/
@Override
protected LogEntry makeNext() {
if (innerDone()) {//检测当前深层迭代是否已经完成,或是深层次迭代还未开始
try {
LogEntry entry = getNextEntry();//获取消息
// No more record to return.
if (entry == null)//获取不到消息,调用allDone()方法结束迭代
return allDone();
// Convert offset to absolute offset if needed.在Inner Iterator中计算每个消息的absoluteOffset
if (absoluteBaseOffset >= 0) {
long absoluteOffset = absoluteBaseOffset + entry.offset();
entry = new LogEntry(absoluteOffset, entry.record());
}
// decide whether to go shallow or deep iteration if it is compressed
// 根据压缩类型和shallow参数决定是否创建Inner Iterator
CompressionType compression = entry.record().compressionType();
if (compression == CompressionType.NONE || shallow) {
return entry;
} else {
// init the inner iterator with the value payload of the message,
// which will de-compress the payload to a set of messages;
// since we assume nested compression is not allowed, the deep iterator
// would not try to further decompress underlying messages
// There will be at least one element in the inner iterator, so we don't
// need to call hasNext() here.
// 创建Inner Iterator,没迭代一个外层消息,创建一个Inner Iterator用来迭代其中的内层消息
innerIter = new RecordsIterator(entry);
return innerIter.next();//迭代内层消息
}
} catch (EOFException e) {
return allDone();
} catch (IOException e) {
throw new KafkaException(e);
}
} else {
return innerIter.next();
}
}
//getNextEntry()方法的实现
private LogEntry getNextEntry() throws IOException {
if (logEntries != null)
return getNextEntryFromEntryList();
else
return getNextEntryFromStream();
}
private LogEntry getNextEntryFromEntryList() {
return logEntries.isEmpty() ? null : logEntries.remove();
}
//
private LogEntry getNextEntryFromStream() throws IOException {
// read the offset
long offset = stream.readLong();//读取offset
// read record size
int size = stream.readInt();//读取消息长度
if (size < 0)
throw new IllegalStateException("Record with size " + size);
// read the record, if compression is used we cannot depend on size
// and hence has to do extra copy
ByteBuffer rec;
if (type == CompressionType.NONE) {//未压缩消息的处理
rec = buffer.slice();
int newPos = buffer.position() + size;
if (newPos > buffer.limit())
return null;
buffer.position(newPos);//修改buffer的position
rec.limit(size);
} else {//处理压缩消息
byte[] recordBuffer = new byte[size];
//从stream中读取消息,此过程会解压消息
stream.readFully(recordBuffer, 0, size);
rec = ByteBuffer.wrap(recordBuffer);
}
return new LogEntry(offset, new Record(rec));
}
private boolean innerDone() {
return innerIter == null || !innerIter.hasNext();
}
}
下图展示了两种不同的迭代过程,为了更好的理解整个迭代过程,以下图为例进行说明。首先通过public构造方法创建MemoryRecords.RecordsIterator对象作为浅层迭代器并调用next()方法,此时state字段为NOT_READY,调用makeNext()方法准备迭代项。在makeNext()方法中会判断深层迭代是否完成(即innerDone()方法),当前未开始深层迭代则使用getNextEntryFromStream()方法获取offset为3031的消息,如下图的步骤1。之后检测3031消息的压缩格式,假设采用GZIP的压缩格式,则通过private构造方法创建MemoryRecords.RecordsIterator对象作为深层迭代器,在构造过程中会创建对应的解压输入流,然后调用getNextEntryFromStream()方法解压offset为3031的外层消息,其中嵌套的压缩消息形成logEntries队列。然后调用深层次的next()方法,因为不存在第三层迭代且logEntries不为空,则从logEntries获取消息并返回,对应下图步骤2。后续迭代中深层次迭代未完成,则直接从logEntries集合中返回消息,系统中步骤3-7都会重复此过程。当深层迭代完成后,调用getNextEntryFromStream()方法获取offset为3037的消息,如下图步骤8。后续迭代过程与上述过程重复。

网友评论