美文网首页
paimon sink 源码之 StoreSinkWriteIm

paimon sink 源码之 StoreSinkWriteIm

作者: loukey_j | 来源:发表于2024-05-08 19:18 被阅读0次

    StoreSinkWriteImpl#构造方法

    1. FileStoreTable table : PrimaryKeyFileStoreTable 或者 AppendOnlyFileStoreTable, paimon sink 源码 之 paimon table 创建
    2. String commitUser: 是 UUID
    3. StoreSinkWriteState state: paimon sink 源码之 RowDataStoreWriteOperator
      这篇有讲他是从状态中恢复出来的 listState 的一个包装里面核心 是一个 map 数据结构,map 里面装的啥在 StoreSinkWriteImpl 里面也没有提到,因为 StoreSinkWriteImpl 的 这个 listState 是空的。map 里面存的啥 要看 StoreSinkWriteImpl 的子类
    4. IOManager ioManager: 来自于 flink runtime, paimon 中利用 flink IOManager 获取 spill 路径用来构建 paimon 自己的 IOManager ,paimon 自己的 IOManager 先不看
    5. boolean ignorePreviousFiles:是 overwritePartition 场景才使用所以可以认为是 false
    6. boolean waitCompaction 逻辑如下:非 writeOnly 情况下 && (开启了 DV || 在 changlogProducer 是 lookup 时开启了 changelog-producer.lookup-wait) waitCompaction 就为 true 否则就是 false


      waitCompaction
    7. MemorySegmentPool memoryPool : 开了 sink.use-managed-memory-allocator 就会有基于 managed-memory 的内存池
    8. 根据第一步的 FileStoreTable table 创建 TableWriteImpl write。 所以从现在开始 write 里面开始套 write 了。琢磨下来总共是 套了 7 层 write 如下图


      image.png

    StoreSinkWriteImpl#write(org.apache.paimon.data.InternalRow)

    1. StoreSinkWriteImpl#write --> TableWriteImpl.writeAndReturn(rowData)
    public SinkRecord writeAndReturn(InternalRow row) throws Exception {
           //如果配置了 ignore-delete 则舍弃撤回流, 
            if (ignoreDelete && row.getRowKind().isRetract()) {
                return null;
            }
           //进行一次包装 SinkRecord 包含 主键、分区、bucket、和原本的 InternalRow 把一些重要属性先抽取出来方便使用
            SinkRecord record = toSinkRecord(row);
            write.write(record.partition(), record.bucket(), recordExtractor.extract(record));
            return record;
        }
    

    StoreSinkWriteImpl#prepareCommit

    CommitMessage committable = write.prepareCommit(this.waitCompaction || waitCompaction, checkpointId)
    

    StoreSinkWriteImpl 有 write 、prepareCommit 的方法都是调用 里面 write 变量的 write 和 prepareCommit 方法
    那直接看 TableWriteImpl 的 write 、prepareCommit 方法
    在 StoreSinkWriteImpl#构造方法 第 8 步是构造 TableWriteImpl 的地方,所以先看下他时如何构建的然后再看他的 的 write 、prepareCommit 方法

    TableWriteImpl#构造方法

    • 在里面 StoreSinkWriteImpl 构造函数的第 8 步会通过 newTableWrite 创建 TableWriteImpl
    // StoreSinkWriteImpl 初始化 TableWriteImpl
    private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
    TableWriteImpl<?> tableWrite =
                    table.newWrite( //不同的  table 创建不同的 write 等下在看
                                    commitUser,
                                    (part, bucket) -> // 这里竟然用了 state 过滤器,[在上篇](https://www.jianshu.com/p/8153c43a4170) 有详细讲这个过滤器的可能用处,这里只是一个函数,要看函数的调用放过才知道具体的  (part, bucket) 是怎么来的
                                            state.stateValueFilter().filter(table.name(), part, bucket)
                            )
                            .withIOManager(paimonIOManager) // 设置其他属性
                            .withIgnorePreviousFiles(ignorePreviousFiles) // false
                            .withExecutionMode(isStreamingMode) // true
                            .withBucketMode(table.bucketMode()); 
    
            if (metricGroup != null) {
                tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup));
            }
    
            if (memoryPoolFactory != null) {
                return tableWrite.withMemoryPoolFactory(memoryPoolFactory); // mamaged memory
            } else {
                return tableWrite.withMemoryPool( // 堆内存
                        memoryPool != null
                                ? memoryPool
                                : new HeapMemorySegmentPool(
                                        table.coreOptions().writeBufferSize(),
                                        table.coreOptions().pageSize()));
            }
    }
    
    // 从上面函数来看首先是通过 table 创建出了 TableWriteImpl 然后设置了 各种属性, 
    // 接下来看 table 是怎么 newWrite 的。主键表为例 如下
    
     public TableWriteImpl<KeyValue> newWrite(
                String commitUser, ManifestCacheFilter manifestFilter) {
            TableSchema schema = schema();
            CoreOptions options = store().options();
           //rowKindGenerator 用来解析 '+I', '-U', '+U' or '-D' , 
           //如果有指定 rowkind.field 那就从数据中抽取这个字段来获取 rowKind 
           //否则直接获取 row 的 rowkind 这个是 flink 层面加上的,而 rowkind.field 是数据层面的。
           //暂时没有 get 到数据里面带 rowkind.field 的场景。
            RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, options);
           //主键表写的数据格式是 KeyValue, append 表写的格式直接是 InternalRow
            KeyValue kv = new KeyValue();
          //调用 TableWriteImpl 构造函数
            return new TableWriteImpl<>(
                     // TableWriteImpl 里面又嵌套了一个 write 对于主键表是 KeyValueFileStoreWrite 
                     // append 表是 AppendOnlyFileStoreWrite 
                     // manifestFilter 是那个 state 过滤器。 ok 这里命名了 manifestFilter,  
                     // manifest 应该是 Paiom 的元数据,文件列表信息,一个 Paimon 表会有很多文件列表信息,
                     // 可能对某个 task 只需要和自己 partition bucket 相关的 manifest
                     // (以上个人猜测,边看边猜,后面到 KeyValueFileStoreWrite 再细看)
                    store().newWrite(commitUser, manifestFilter), 
                    createRowKeyExtractor(), //用来读取数据的主键的
                    record -> { //又是一个函数用来定义如何把 输入的 InternalRow 转化成 KeyValue, 然后后面用 KeyValue 结构去写
                        InternalRow row = record.row(); // 原始 row
                        RowKind rowKind =  // 抽取 rowkind
                                rowKindGenerator == null
                                        ? row.getRowKind()
                                        : rowKindGenerator.generate(row);
                         // 构建 KeyValue 放了主键,sequence, rowKind , row 本身
                        return kv.replace(record.primaryKey(), KeyValue.UNKNOWN_SEQUENCE, rowKind, row);
                    },
                   //获取是否有配置 ignore-delete ,如果配置了则会对 rowKind 为撤回类型的过滤掉 这个在上面 StoreSinkWriteImpl#write 里面也有提到
                    CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
        }
    
    //到这里我们看了 TableWriteImpl 构造的地方了再看 TableWriteImpl 的构造函数
    
    // TableWriteImpl 自身构造函数
    public TableWriteImpl(
                FileStoreWrite<T> write, // 来自于 table.store().newWrite() 主键表是 KeyValueFileStoreWrite
                KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor, //主键和bucket 的抽取器给一个InternalRow 之后用来方便对 key bucket 等的抽取
                RecordExtractor<T> recordExtractor, //数据转化器 write 的不同 数据转化也不同,KeyValueFileStoreWrite 需要把 InternalRow 转化成 KeyValue, append 表 不需要转化
                boolean ignoreDelete) { //是否忽略撤回数据
            this.write = write;
            this.keyAndBucketExtractor = keyAndBucketExtractor;
            this.recordExtractor = recordExtractor;
            this.ignoreDelete = ignoreDelete;
        }
    

    FINAL

    • 此篇讲述了 StoreSinkWriteImpl 的构造和 write,prepareCommit 方法实际 write,prepareCommit 并么有做什么他是调用里面的 write 的write 和 prepareCommit 方法,write 里面又嵌套了 wirte, 对此又往下看了两层
    • 从 RowDataStoreWriteOperator 来看 StoreSinkWriteImpl 是第一层 write
    • StoreSinkWriteImpl 中的 TableWriteImpl 是第二层 write
      • 接着讲述了 StoreSinkWriteImpl 的构建过程 他的 write,prepareCommit 又是调用第 3 层的 write
    • StoreSinkWriteImpl 中的 FileStoreWrite 是第三成的 write
      • FileStoreWrite 对于主键表对应的是 KeyValueFileStoreWrite, 对于append 表是 AppendOnlyFileStoreWrite
    • KeyValueFileStoreWrite 里面还有 write. 后面再看, AppendOnlyFileStoreWrite 就先不看了,下文将接着讲述 KeyValueFileStoreWrite
    • 回顾 wirte 调用链 StoreSinkWriteImpl#wirte ->TableWriteImpl.writeAndReturn(rowData)->KeyValueFileStoreWrite.write()
    • 回顾 prepareCommit 调用链 StoreSinkWriteImpl#prepareCommit->TableWriteImpl.prepareCommit->KeyValueFileStoreWrite.prepareCommit

    相关文章

      网友评论

          本文标题:paimon sink 源码之 StoreSinkWriteIm

          本文链接:https://www.haomeiwen.com/subject/jgrdfjtx.html