美文网首页
paimon sink 源码之 PrimaryKeyFileSt

paimon sink 源码之 PrimaryKeyFileSt

作者: loukey_j | 来源:发表于2024-05-12 00:11 被阅读0次
    • 接上篇paimon sink 源码之 StoreSinkWriteImpl
    • 回顾 wirte 调用链 StoreSinkWriteImpl#wirte ->TableWriteImpl.writeAndReturn(rowData)->KeyValueFileStoreWrite.write()
    • 回顾 prepareCommit 调用链 StoreSinkWriteImpl#prepareCommit->TableWriteImpl.prepareCommit->KeyValueFileStoreWrite.prepareCommit

    KeyValueFileStoreWrite#构造

    org.apache.paimon.flink.sink.StoreSinkWriteImpl#newTableWrite 构建函数构建中调用 table.newWrite 构建 write
    org.apache.paimon.table.PrimaryKeyFileStoreTable#newWrite(String commitUser, ManifestCacheFilter manifestFilter){
       ... ...
       store().newWrite(commitUser, manifestFilter) //先调用 table 的 store 方法构建出 KeyValueFileStore 然在 newWrite 构建出 KeyValueFileStoreWrite
      ... ... 
    }
    
    //先看 store 方法再看 newWrite 方法
    public KeyValueFileStore store() {
            if (lazyStore == null) {
                RowType rowType = tableSchema.logicalRowType();
                Options conf = Options.fromMap(tableSchema.options());
                CoreOptions options = new CoreOptions(conf);
                KeyValueFieldsExtractor extractor =
                        PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR;
    
                MergeFunctionFactory<KeyValue> mfFactory =
                        PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor);
               //merge-engine:
               //   |--deduplicate 去重保留最后一条 默认是 deduplicate
               //   |--partial-update 局部更新字段 多流拼接场景?
               //   |--aggregation 聚合 类似 pv? uv? 来一条新数据和底表数据做聚合
               //   |--first-row  去重保留最新一条
               //   |--看着就像 hudi 的 payload 。 但是 paimon 表达更简答有着丰富的 function 来个案例 
              // 详情可以去官网查看 https://paimon.apache.org/docs/master/primary-key-table/merge-engine/#aggregation
               //   CREATE TABLE t (
                         //   k INT,
                        //    a INT,
                        //    b INT,
                        //    c INT,
                        //    d INT,
                        //   PRIMARY KEY (k) NOT ENFORCED
               //   ) WITH (
                    //   'merge-engine'='partial-update',                          // 'merge-engine'='aggregation'
                    //   'fields.a.sequence-group' = 'b',                          // 'fields.a.aggregate-function'='sum'
                    //   'fields.b.aggregate-function' = 'first_value',
                    //   'fields.c.sequence-group' = 'd',
                    //   'fields.d.aggregate-function' = 'sum'
                    // );
                // changelog-producer 配置成 lookup 或者 deletion-vectors.enabled=true 就需要 lookup  DV 为啥要 lookUP?? 等到 MergeFunction 调用的时候再揭秘把
                if (options.needLookup()) {
                    mfFactory =
                            LookupMergeFunction.wrap(
                                    mfFactory, new RowType(extractor.keyFields(tableSchema)), rowType);
                }
    
                lazyStore =
                        new KeyValueFileStore(
                                fileIO(), //在第一篇 https://www.jianshu.com/p/f8a518d9f6ff 混了个眼熟,先不管
                                schemaManager(), // 管理 schema 获取最新 schema 等
                                tableSchema,
                                tableSchema.crossPartitionUpdate(), //是否跨分区更新, 有分区有主键并且分区键不全是主键就可以跨分区更新
                                options,
                                tableSchema.logicalPartitionType(),
                                PrimaryKeyTableUtils.addKeyNamePrefix(
                                        tableSchema.logicalBucketKeyType()),
                                new RowType(extractor.keyFields(tableSchema)),
                                rowType,
                                extractor,
                                mfFactory,
                                name(), // 根据数据路径截取出来的名字 路径的最后一级目录名字
                                catalogEnvironment);
            }
            // KeyValueFileStore 构造的这么多属性基本上用来构建 writer、reader 用的。
            return lazyStore;
        }
    
    //  KeyValueFileStore  newWrite 构建出 KeyValueFileStoreWrite
    
    public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter) {
            IndexMaintainer.Factory<KeyValue> indexFactory = null;
            if (bucketMode() == BucketMode.DYNAMIC) {
                indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler());
            }
            DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory = null;
            if (options.deletionVectorsEnabled()) {
                deletionVectorsMaintainerFactory =
                        new DeletionVectorsMaintainer.Factory(newIndexFileHandler());
            }
            return new KeyValueFileStoreWrite(
                    fileIO,                       // 来自 KeyValueFileStore
                    schemaManager,    // 来自 KeyValueFileStore
                    schema,                  // 来自 KeyValueFileStore
                    commitUser,            // 来自 KeyValueFileStore
                    keyType,                 // 来自 KeyValueFileStore
                    valueType,               // 来自 KeyValueFileStore
                    keyComparatorSupplier,  // 用来比较 key 的大小
                    () -> UserDefinedSeqComparator.create(valueType, options), // 用来比较 sequence.field 大小 类似 hudi 的 preCombine filed
                    valueEqualiserSupplier, // 用来比较 value 是否相等??
                    mfFactory,           // mergeFunction  来自 KeyValueFileStore
                    pathFactory(),     // 文件路径  path,空分区时的分区命名 partition.default-name 默认是 __DEFAULT_PARTITION__ 和 hive是一样的,file.format 默认是 orc
                    format2PathFactory(), //file.format.per.level 不同层可以设置不同的文件格式,这里所指的层应该是  LSM 的层
                    snapshotManager(),  // snapshot 相关的工具类
                    newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter), // filestore 读取器 带上了一个 manifestFilter  用来构建 WriterContainer
                    indexFactory, //  如果是 BucketMode.DYNAMIC 就有 用来构建 WriterContainer
                    deletionVectorsMaintainerFactory, // 如果开启了deletion-vectors.enabled 就有 用来构建 WriterContainer
                    options,
                    keyValueFieldsExtractor,
                    tableName);
        }
    

    KeyValueFileStoreWrite#write

    WriterContainer 创建

    //partition 从 row 里面抽取出来的分区 bucket 是根据 bucket 和  
    public void write(BinaryRow partition, int bucket, T data) throws Exception {
             //创建  WriterContainer 每个分区的每个 bucket 维护一个 write
            WriterContainer<T> container = getWriterWrapper(partition, bucket);
            //写数据
            container.writer.write(data);
            if (container.indexMaintainer != null) {
                container.indexMaintainer.notifyNewRecord(data);
            }
        }
    
    // 创建 WriterContainer
    public WriterContainer<T> createWriterContainer(
                BinaryRow partition, int bucket, boolean ignorePreviousFiles) {
            // ... ...
           // 从 snapshotManager 获取最近的 snaphot, 从 ../db/table/snapshot 目录获取
           // snapshot 有一个 LATEST 文件 和 一些 snaphot-n 文件,可以直接解析  LATEST 获取最近的  snaphot
           // 也可以从 snaphot-n 文件名上解析出最大的 n 当做最近的 snaphot
           //  snaphot 记录了 baseManifestList,deltaManifestList 的文件名,解析这两个文件可以拿到  Manifest
          // Manifest 记录一次提交的所有文件集合。
            Long latestSnapshotId = snapshotManager.latestSnapshotId();
            List<DataFileMeta> restoreFiles = new ArrayList<>();
            if (!ignorePreviousFiles && latestSnapshotId != null) {
                 // 当前是要创建 Writer 这个writer 是要去写某个分区的某个 bucket, 
               // 所以这里也会把 partition, bucket 传进入进行 Manifest 过滤,得到 相应 partition, bucket 的 DataFile
                restoreFiles = scanExistingFileMetas(latestSnapshotId, partition, bucket);
            }
            // indexFactory 从上面看 如果是 BucketMode.DYNAMIC 时会有 indexFactory, 为啥  GLOBAL_DYNAMIC 没有 indexFactory ???
            IndexMaintainer<T> indexMaintainer =
                    indexFactory == null
                            ? null
                            : indexFactory.createOrRestore(
                                    ignorePreviousFiles ? null : latestSnapshotId, partition, bucket);
           // 开启 DV 才有
            DeletionVectorsMaintainer deletionVectorsMaintainer =
                    deletionVectorsMaintainerFactory == null
                            ? null
                            : deletionVectorsMaintainerFactory.createOrRestore(
                                    ignorePreviousFiles ? null : latestSnapshotId, partition, bucket);
            RecordWriter<T> writer =
                    createWriter(   //下面再看 createWriter
                            partition.copy(),
                            bucket,
                            restoreFiles,
                            null,
                            compactExecutor(), // 创建 compact 线程
                            deletionVectorsMaintainer);
            //创建完 write 之后给这个 write 分配 memoryPool
           // write-buffer-size 转换为排序的磁盘文件之前要在内存中构建的数据量。 256 mb
           // page-size 64k
           // 初始化 writer 的 writeBuffer
            notifyNewWriter(writer);
            return new WriterContainer<>(
                    writer, indexMaintainer, deletionVectorsMaintainer, latestSnapshotId);
        }
    

    WriterContainer#MergeTreeWriter 创建

    protected MergeTreeWriter createWriter(
                BinaryRow partition,  // 当前 write 的分区
                int bucket,                 // 当前 write 的桶
                List<DataFileMeta> restoreFiles,  //分区和桶下对应的 DataFile
                @Nullable CommitIncrement restoreIncrement, // null
                ExecutorService compactExecutor,   // compact 线程
                @Nullable DeletionVectorsMaintainer dvMaintainer) {
     
            KeyValueFileWriterFactory writerFactory =
                    writerFactoryBuilder.build(partition, bucket, options); // 用来创建 KeyValueDataFileWriter
            Comparator<InternalRow> keyComparator = keyComparatorSupplier.get(); // key 排序比较器
           // merge tree 的 levels 用来存储每层的  文件 level0 存的是 DataFiles  其他 level 存的是 SortedRun, SortedRun 里面也是存的 DataFiles
           // level0 的DataFile 的排序顺序先按 maxSequenceNumber 从大到小,然后再按名字从小到大
            Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
           // 合并参数和策略
            UniversalCompaction universalCompaction =
                    new UniversalCompaction(
                              // compaction.max-size-amplification-percent 默认 200 
                            options.maxSizeAmplificationPercent(),
                           // compaction.size-ratio 默认为 1
                            options.sortedRunSizeRatio(),
                            // num-sorted-run.compaction-trigger 默认为 5
                            options.numSortedRunCompactionTrigger(),
                             // compaction.optimization-interval
                            options.optimizedCompactionInterval());
            CompactStrategy compactStrategy =
                    // changelog-producer 配置成 lookup 或者 deletion-vectors.enabled=true 就需要 lookup
                    options.needLookup()
                            ? new ForceUpLevel0Compaction(universalCompaction)
                            : universalCompaction;
            // compactManager 用来创建 CompactRewriter 当达到触发条件时会把合并单元 封装成一个 callable CompactTask 给到 compactExecutor 里面去执行 后面再看
            CompactManager compactManager =
                    createCompactManager(
                            partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer);
             // 写数据用的 writer 
            return new MergeTreeWriter(
                    bufferSpillable(), // 是否配置了 write-buffer-spillable 是否能够溢写
                    options.writeBufferSpillDiskSize(),  //溢写的 大小 write-buffer-spill.max-disk-size 默认 Long.MAX_VALUE
                   // local-sort.max-num-file-handles 默认 128 外部合并排序的最大扇入。 它限制文件句柄的数量。 如果太小,可能会导致中间合并。 但如果太大,会导致同时打开的文件过多,消耗内存并导致随机读取。
                    options.localSortMaxNumFileHandles(),
                   // 溢写压缩算法 spill-compression 默认 lz4.  lz4, lzo and zstd are supported
                    options.spillCompression(),
                    ioManager,
                    compactManager,
                   // 每次提交记录都会递增 SequenceNumber
                    getMaxSequenceNumber(restoreFiles),
                    keyComparator,
                    mfFactory.create(), // MergeFunction
                    writerFactory,
                   // commit 之前是否强制 合并  commit.force-compact
                    options.commitForceCompact(),
                    // changelog-producer
                    options.changelogProducer(),
                    restoreIncrement, // null
                     // 根据用户指定的  sequence.field 字段构建 seq 的比较器
                    UserDefinedSeqComparator.create(valueType, options));
        }
    

    MergeTreeWriter#write

     public void write(KeyValue kv) throws Exception {
            // 从 DataFile 里面获得最大的 sequenceNumber 之后每次 write 写一条数据都会加 1
            long sequenceNumber = newSequenceNumber(); 
           //执行写入 写入 buffer 重点可以看看 buffer 是什么
            boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
           // 如果 写不下了就会先 flush 在写
            if (!success) {
                flushWriteBuffer(false, false);
                success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
                if (!success) {
                    throw new RuntimeException("Mem table is too small to hold a single element.");
                }
            }
        }
    

    关于 KeyValueFileStoreWrite#write 总结如下

    clas  KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> extends AbstractFileStoreWrite<T>  implements FileStoreWrite<T>
    private MemoryPoolFactory writeBufferPool;
    1. writeBufferPool 来自于构建 KeyValueFileStoreWrite 后调用 withMemoryPool 方法,然后 new MemoryPoolFactory
    2. MemoryPoolFactory 里面放的是 MemorySegmentPool memoryPool 分为 flink 管理内存和堆内存
    3. MemoryPoolFactory 可以理解为对 MemorySegmentPool 的一种管理 ,维护了一个shubTask 中所有 writes 使用的 memory 
    
    // 核心方法创建 MergeTreeWriter 以及 MergeTreeWriter 写时用的 writeBufffer
    public WriterContainer<T> createWriterContainer{
    
    ...
    
    RecordWriter<T> writer = createWriter(...) // 创建 MergeTreeWriter 
    
    1. 给这个 write 创建一个单独的 OwnerMemoryPool  OwnerMemoryPool 实际是操作 MemoryPoolFactory writeBufferPool 里面的 MemorySegmentPool innerPool
    
    2. 个这个 write 分配 memoryPool 实际就是创建 MergeTreeWriter 的 WriteBuffer writeBuffer
    3.这个 writeBufffer 实际是 new SortBufferWriteBuffer ;SortBufferWriteBuffer 还有 MemorySegmentPool 和  SortBuffer; SortBuffer  对于是否开启 spill 分 BinaryInMemorySortBuffer 和 BinaryExternalSortBuffer;MemorySegmentPool 是给 SortBuffer 用的
    ....
    
    }
    
    
    
    // 核心方法写数据,写数据就是往 MergeTreeWriter 的 WriteBuffer 里面 put 和必要的 WriteBuffer flush
    
     public void write(BinaryRow partition, int bucket, T data) throws Exception {
    
            WriterContainer<T> container = getWriterWrapper(partition, bucket);  //实际是调用上面的  createWriterContainer 创建出 MergeTreeWriter  container 只是包了一层
    
            然后调用 MergeTreeWriter 的 write 方法 往下看就是往 MergeTreeWriter 的 WriteBuffer 里面 put
    
            container.writer.write(data);
    
            if (container.indexMaintainer != null) {
    
                container.indexMaintainer.notifyNewRecord(data);
    
            }
    
        }
    
    • 上面提到了 MemoryPoolFactory、MemorySegmentPool、MergeTreeWriter、WriteBuffer、SortBuffer

    MemoryPoolFactory

    class MemoryPoolFactory {
    private final MemorySegmentPool innerPool; //这个是根据配置是否使用托管内存生成的
    private final int totalPages;
    private Iterable<MemoryOwner> owners; //MemoryOwner 其实就是 MergeTreeWriter, MergeTreeWriter 实现了 MemoryOwner
    private final long totalBufferSize;
    private long bufferPreemptCount;
    
    //内部类
    private class OwnerMemoryPool implements MemorySegmentPool {
       // 操作的是外部类的 innerPool
    }
    }
    

    MemorySegmentPool

    MemorySegmentPool
    • 上面提到了 MergeTreeWriter 在看看 MergeTreeWriter

    MergeTreeWriter#write 总结如下

    MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner
    //又有一层 writeBuffer
    private WriteBuffer writeBuffer
    public void write(KeyValue kv) throws Exception {
            long sequenceNumber = newSequenceNumber();
            // 往writeBuffer 写数据 写时可能会 flush
            boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
            if (!success) {
                flushWriteBuffer(false, false);
                success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
                if (!success) {
                    throw new RuntimeException("Mem table is too small to hold a single element.");
                }
            }
        }
    

    WriteBuffer

    SortBufferWriteBuffer implements WriteBuffer
    仅追加用于存储键值的写入器缓冲区。当它已满时,它将被刷新到磁盘和
    形成数据文件。
    MemorySegmentPool memoryPool // MemorySegmentPool 一直传递
    private final KeyValueSerializer serializer;
    private final SortBuffer buffer;   // 又有一层 buffer
    

    SortBuffer

    // SortBuffer 的两种实现,一种是基于纯内存的一种是基于可以 spill 的 是否 spill 由配置决定
    BinaryInMemorySortBuffer extends BinaryIndexedSortable implements SortBuffer
    protected final MemorySegmentPool memorySegmentPool //MemorySegmentPool 最终使用的地方
    
    //spill 有基于内存的 SortBuffer 的引用
    BinaryExternalSortBuffer implements SortBuffer
     private final BinaryInMemorySortBuffer inMemorySortBuffer
    

    FINAL

    • 此篇有点混乱,后面搞得更熟悉的时候再来整理。

    相关文章

      网友评论

          本文标题:paimon sink 源码之 PrimaryKeyFileSt

          本文链接:https://www.haomeiwen.com/subject/xwljfjtx.html