美文网首页
Kafka源码分析-Server-日志存储(2)-ByteBuf

Kafka源码分析-Server-日志存储(2)-ByteBuf

作者: 陈阳001 | 来源:发表于2019-02-12 21:15 被阅读0次

MessageSet的另一个子类是ByteBufferMessageSet,FileMessageSet.append()方法的参数就是这个类的对象,为什么必须append()方法的参数是ByteBufferMessageSet,而不直接是ByteBuffer呢?
回顾一下,在第二章生产者时介绍了MemoryRecords时提到,向MemoryRecords写入消息时,可以使用Compressor对消息批量进行压缩,然后将压缩后的消息发送给服务端。
在有些设计中,将每个请求的负载单独压缩后再进行传输,这种设计虽然可以减少传输的数据量,但是存在一个问题,我们常见的压缩算法是数据量越大压缩率越高,一般情况下,每个请求的负载不会特别大,这就导致了压缩率比较低。而且在一般情况下,生产者发送的压缩数据在服务端也是以保持压缩状态进行存储的,消费者从服务端获取的也是压缩消息,消费者在处理压缩消息之前才会解压消息,这就实现了“端到端的压缩”。
压缩后的消息格式和非压缩的消息格式类似,分两层:


压缩后的消息格式.png

解压消息的key为null,所以上图没有画出key这部分;value中保存的是多条消息压缩数据。

创建压缩消息

先回头看看第二章介绍的Compressor的构造方法的源码:

public Compressor(ByteBuffer buffer, CompressionType type) {
        this.type = type;
        this.initPos = buffer.position();

        this.numRecords = 0;
        this.writtenUncompressed = 0;
        this.compressionRate = 1;
        this.maxTimestamp = Record.NO_TIMESTAMP;

        if (type != CompressionType.NONE) {
            // for compressed records, leave space for the header and the shallow message metadata
            // and move the starting position to the value payload offset
            //如果进行消息压缩,则在Buffer的首部提前空出offset和size空间(Records,LOG_OVERHEAD)
            //以及CRC32等字段的空间(Record.RECORD_OVERHEAD)
            buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
        }

        // create the stream根据压缩算法类型,初始化bufferStream和appendStream
        bufferStream = new ByteBufferOutputStream(buffer);
        appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
    }

通过MemoryRecords.append()方法不断写入消息并压缩的过程前面已经分析过了。当MemoryRecords写满,会调用Compressor.close()方法,完成offset,size,CRC32等字段的写入,之后发送给服务端。Compressor.close()方法如下:

public void close() {
        try {
            appendStream.close();//要压缩的已经写完了,先关闭压缩输出流
        } catch (IOException e) {
            throw new KafkaException(e);
        }

        if (type != CompressionType.NONE) {//使用压缩算法
            ByteBuffer buffer = bufferStream.buffer();
            int pos = buffer.position();//保存buffer尾部位置
            // write the header, for the end offset write as number of records - 1
            buffer.position(initPos);//移动position指针到buffer头部
            buffer.putLong(numRecords - 1);//写入offset,写入的是消息数量
            buffer.putInt(pos - initPos - Records.LOG_OVERHEAD);//写入size
            // write the shallow message (the crc and value size are not correct yet)
            //写入压缩消息的相关信息,如,时间戳,压缩算法等,MAGIC_VALUE为1
            Record.write(buffer, maxTimestamp, null, null, type, 0, -1);
            // compute the fill the value size 计算并写入整个压缩消息value部分的长度
            int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD;
            buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET_V1, valueSize);
            
            // compute and fill the crc at the beginning of the message
            long crc = Record.computeChecksum(buffer,
                initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET,
                pos - initPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET);
            Utils.writeUnsignedInt(buffer, initPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc);
            // reset the position
            buffer.position(pos);//还原buffer的position指针

            // update the compression ratio
            this.compressionRate = (float) buffer.position() / this.writtenUncompressed;
            TYPE_TO_RATE[type.id] = TYPE_TO_RATE[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
                compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
        }
    }

服务端为每个消息分配offset,要对消息解压缩吗?这里是这样设计的:
1)当生产者产生创建压缩信息的时候,对压缩消息设置的offset是内部的offset(inner offset),即分配给每个消息的offset是0,1,2......请读者回顾第二章介绍的RecordBatch.tryAppend()方法。
2)在Kafka服务端为消息分配offset时,会根据外层消息中记录的内层压缩消息的个数为外层消息分配offset,为外层消息分配的offset是内层压缩消息中最后一个消息的offset值,如下图:


压缩消息存储中offset的设计.png

3)当消费者获取压缩消息后进行解压缩,就可以根据内部消息的,相对的offset和外层消息的offset计算出每个消息的offset值了。

迭代压缩消息

上文提到,消费者获取的压缩消息合适和生产者发送的格式是一模一样的。在第三章介绍Fetch.parseFetchData()方法解析CompleteFetch时,对MemoryRecords进行迭代。下面看下MemoryRecords的迭代器的相关实现,其中有涉及到压缩消息相关的实现。
MemoryRecords的迭代器是静态内部类RecordIterator,它继承了AbstractorIterator抽象类。AbstractorIterator实现了Iterator接口,它对Iterator进行了实现和简化。AbstractorIterator只暴露一个makeNext()方法供子类去实现,这个方法主要负责用于创建下一个迭代项。这样,开发人员就不用了解Iterator接口的细节,只关注如何创建下一个迭代项就可以了。
AbstractorIterator中使用next字段指向迭代的下一项,使用state字段标识当前迭代器的状态,state字段是State枚举类型,其取值和含义如下所述:

  • READY:迭代器已经准备好迭代下一项。
  • NOT_READY:迭代器未准备好迭代下一项,需要调用maybeCompleteNext()。
  • DONE:当前迭代已经结束。
  • FAILED:当前迭代器在迭代过程中出现异常。
    AbstractorIterator的代码如下:
public abstract class AbstractIterator<T> implements Iterator<T> {

    private static enum State {
        READY, NOT_READY, DONE, FAILED
    };

    private State state = State.NOT_READY;
    private T next;

    @Override
    public boolean hasNext() {
        switch (state) {
            case FAILED:
                throw new IllegalStateException("Iterator is in failed state");
            case DONE://迭代结束,返回false.
                return false;
            case READY://迭代准备好了,返回true
                return true;
            default://NOT_READY状态,需要调用maybeComputeNext()方法获取next项
                return maybeComputeNext();
        }
    }

    @Override
    public T next() {
        if (!hasNext())
            throw new NoSuchElementException();
        state = State.NOT_READY;//将state字段重置为NOT_READY状态
        if (next == null)
            throw new IllegalStateException("Expected item but none found.");
        return next;//返回next
    }
    //不支持remove操作,调用报异常。
    @Override
    public void remove() {
        throw new UnsupportedOperationException("Removal not supported");
    }

    public T peek() {
        if (!hasNext())
            throw new NoSuchElementException();
        return next;
    }
    //子类在实现makeNext()方法中可以通过调用allDone()方法结束整个迭代
    protected T allDone() {
        state = State.DONE;
        return null;
    }
    //子类实现,返回下一个迭代项
    protected abstract T makeNext();

    private Boolean maybeComputeNext() {
        state = State.FAILED;//若在子类的实现makeNext()中抛出异常,则state会处于这个状态
        next = makeNext();
        if (state == State.DONE) {
            return false;
        } else {//在makeNext()方法中完成了next项的构造
            state = State.READY;
            return true;
        }
    }
}

为了同时能够处理压缩消息和非压缩消息,MemoryRecords.RecordsIterator分为两层迭代,使用shallow参数标区分。当shallow为true时,认为消息是非压缩消息,只迭代当前这一层消息,我们称为“浅层迭代”(shallow Iterator);当shallow为false时,不只迭代当前层消息,还会创建Inner Iterator(也是MemoryRecords.RecordsIterator对象)迭代嵌套的压缩消息,我们称之为“深层迭代”(deep Iterator)。
MemoryRecords.RecordsIterator的关键字段如下:

  • buffer:指向MemoryRecords的buffer字段。其中消息可以是压缩的,也可以是非压缩的。
  • stream:读取buffer的输入流。如果迭代压缩消息,则是对应的解压缩输入流。
  • type:压缩类型。
  • shallow:标识当前RecordsIterator是深层迭代器还是浅层迭代器。
  • innerIter:迭代压缩消息的Inner Iterator。
  • logEntries:ArrayDeque<LogEntry>类型集合。Inner Iterator需要迭代的压缩消息集合。Outer Iterator的此字段始终为null,其中LogEntry封装了消息及其offset。
  • absoluteBaseOffset:在Inner Iterator 迭代压缩消息时使用,用于记录压缩消息中第一个消息的offset,并根据此字段计算每个消息的offset。Outer Iterator的此字段始终为-1。
    MemoryRecords.RecordsIterator的构造方法也有两个:一个是public的,提供外部的API,用于创建Outer Iterator;另一个是private的,用于创建Inner Iterator。
public RecordsIterator(ByteBuffer buffer, boolean shallow) {
            this.type = CompressionType.NONE;//外层消息是非压缩的
            this.buffer = buffer;//指向MemoryRecords.buffer字段
            this.shallow = shallow;//标识是否为深层迭代器
            this.stream = new DataInputStream(new ByteBufferInputStream(buffer));//输入流
            this.logEntries = null;
            this.absoluteBaseOffset = -1;
        }

        // Private constructor for inner iterator.
        // Inner Iterator的构造方法
        private RecordsIterator(LogEntry entry) {
            this.type = entry.record().compressionType();//指定内存压缩消息的压缩类型。
            this.buffer = entry.record().value();
            this.shallow = true;
            //创建指定压缩类型的输入流
            this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
            long wrapperRecordOffset = entry.offset();//外层消息的offset
            // If relative offset is used, we need to decompress the entire message first to compute
            // the absolute offset.
            if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
                this.logEntries = new ArrayDeque<>();
                long wrapperRecordTimestamp = entry.record().timestamp();
                while (true) {//在这层循环中,将内层消息全部解压出来并添加到logEntries集合中
                    try {
                        //对于内层消息,getNextEntryFromStream()方法是读取并解压缩消息
                        //对于外层消息或非压缩消息,则仅仅是读取消息
                        LogEntry logEntry = getNextEntryFromStream();
                        Record recordWithTimestamp = new Record(logEntry.record().buffer(),
                                                                wrapperRecordTimestamp,
                                                                entry.record().timestampType());
                        //添加到logEntries集合中
                        logEntries.add(new LogEntry(logEntry.offset(), recordWithTimestamp));
                    } catch (EOFException e) {
                        break;
                    } catch (IOException e) {
                        throw new KafkaException(e);
                    }
                }
                //计算absoluteBaseOffset
                this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
            } else {
                this.logEntries = null;
                this.absoluteBaseOffset = -1;
            }
        }

makeNext()方法的相关实现代码如下,会通过一个示例介绍深层次迭代的整个过程:

 /*
         * Read the next record from the buffer.
         * 
         * Note that in the compressed message set, each message value size is set as the size of the un-compressed
         * version of the message value, so when we do de-compression allocating an array of the specified size for
         * reading compressed value data is sufficient.
         */
        @Override
        protected LogEntry makeNext() {
            if (innerDone()) {//检测当前深层迭代是否已经完成,或是深层次迭代还未开始
                try {
                    LogEntry entry = getNextEntry();//获取消息
                    // No more record to return.
                    if (entry == null)//获取不到消息,调用allDone()方法结束迭代
                        return allDone();

                    // Convert offset to absolute offset if needed.在Inner Iterator中计算每个消息的absoluteOffset
                    if (absoluteBaseOffset >= 0) {
                        long absoluteOffset = absoluteBaseOffset + entry.offset();
                        entry = new LogEntry(absoluteOffset, entry.record());
                    }

                    // decide whether to go shallow or deep iteration if it is compressed 
                    // 根据压缩类型和shallow参数决定是否创建Inner Iterator
                    CompressionType compression = entry.record().compressionType();
                    if (compression == CompressionType.NONE || shallow) {
                        return entry;
                    } else {
                        // init the inner iterator with the value payload of the message,
                        // which will de-compress the payload to a set of messages;
                        // since we assume nested compression is not allowed, the deep iterator
                        // would not try to further decompress underlying messages
                        // There will be at least one element in the inner iterator, so we don't
                        // need to call hasNext() here.
                        // 创建Inner Iterator,没迭代一个外层消息,创建一个Inner Iterator用来迭代其中的内层消息
                        innerIter = new RecordsIterator(entry);
                        return innerIter.next();//迭代内层消息
                    }
                } catch (EOFException e) {
                    return allDone();
                } catch (IOException e) {
                    throw new KafkaException(e);
                }
            } else {
                return innerIter.next();
            }
        }
        //getNextEntry()方法的实现
        private LogEntry getNextEntry() throws IOException {
            if (logEntries != null)
                return getNextEntryFromEntryList();
            else
                return getNextEntryFromStream();
        }

        private LogEntry getNextEntryFromEntryList() {
            return logEntries.isEmpty() ? null : logEntries.remove();
        }

        //
        private LogEntry getNextEntryFromStream() throws IOException {
            // read the offset
            long offset = stream.readLong();//读取offset
            // read record size
            int size = stream.readInt();//读取消息长度
            if (size < 0)
                throw new IllegalStateException("Record with size " + size);
            // read the record, if compression is used we cannot depend on size
            // and hence has to do extra copy
            ByteBuffer rec;
            if (type == CompressionType.NONE) {//未压缩消息的处理
                rec = buffer.slice();
                int newPos = buffer.position() + size;
                if (newPos > buffer.limit())
                    return null;
                buffer.position(newPos);//修改buffer的position
                rec.limit(size);
            } else {//处理压缩消息
                byte[] recordBuffer = new byte[size];
                //从stream中读取消息,此过程会解压消息
                stream.readFully(recordBuffer, 0, size);
                rec = ByteBuffer.wrap(recordBuffer);
            }
            return new LogEntry(offset, new Record(rec));
        }

        private boolean innerDone() {
            return innerIter == null || !innerIter.hasNext();
        }
    }

下图展示了两种不同的迭代过程,为了更好的理解整个迭代过程,以下图为例进行说明。首先通过public构造方法创建MemoryRecords.RecordsIterator对象作为浅层迭代器并调用next()方法,此时state字段为NOT_READY,调用makeNext()方法准备迭代项。在makeNext()方法中会判断深层迭代是否完成(即innerDone()方法),当前未开始深层迭代则使用getNextEntryFromStream()方法获取offset为3031的消息,如下图的步骤1。之后检测3031消息的压缩格式,假设采用GZIP的压缩格式,则通过private构造方法创建MemoryRecords.RecordsIterator对象作为深层迭代器,在构造过程中会创建对应的解压输入流,然后调用getNextEntryFromStream()方法解压offset为3031的外层消息,其中嵌套的压缩消息形成logEntries队列。然后调用深层次的next()方法,因为不存在第三层迭代且logEntries不为空,则从logEntries获取消息并返回,对应下图步骤2。后续迭代中深层次迭代未完成,则直接从logEntries集合中返回消息,系统中步骤3-7都会重复此过程。当深层迭代完成后,调用getNextEntryFromStream()方法获取offset为3037的消息,如下图步骤8。后续迭代过程与上述过程重复。


日志文件解析的深层迭代和浅层迭代.png

相关文章

网友评论

      本文标题:Kafka源码分析-Server-日志存储(2)-ByteBuf

      本文链接:https://www.haomeiwen.com/subject/mikleqtx.html