从上篇中可以看到了 MemorySegmentPool,MemoryPoolFactory, 各种 buffer ,概念比较多再来重新梳理下整个 writer 的构建过程同时也关注下 MemorySegmentPool, buffer, 看看 MemorySegmentPool,MemoryPoolFactory 是算子级别( subtask 的所有 writer 共用 MemorySegmentPool)的 还是 writer 级别(每个 write 独享一个 MemorySegmentPool)的
-
RowDataStoreWriteOperator 有成员变量 MemorySegmentPool memoryPool 、StoreSinkWrite write
1.1 memoryPool 在 setup 方法时 根据是否 配置了 sink.use-managed-memory-allocator 会创建 FlinkMemorySegmentPool, 如果没有配置则为空,所以默认是空的, MemorySegmentPool 主要有两个实现一个是基于managed memory 的 FlinkMemorySegmentPool 一个是基于内存的 HeapMemorySegmentPool
1.2 initializeState 时创建了 StoreSinkWrite 创建 StoreSinkWrite 会把 memoryPool 传入进去。StoreSinkWrite 会根据是否要合并分为 StoreSinkWriteImpl 和 GlobalFullCompactionSinkWrite 两种实现, 同时 GlobalFullCompactionSinkWrite 是继承自 StoreSinkWriteImpl -
StoreSinkWriteImpl 有成员变量 MemorySegmentPool memoryPool、MemoryPoolFactory memoryPoolFactory、TableWriteImpl<?> write
2.1 memoryPool 从 RowDataStoreWriteOperator 传下来的可能为空 对于 配置了 sink.use-managed-memory-allocator 会有值
2.2 memoryPoolFactory 是空
2.3 write 是 StoreSinkWriteImpl 构造函数里面创建的最终是通过 FileStoreTable 的 newWrite 方法创建 TableWriteImpl,创建完 TableWriteImpl 后会紧接着通过 withMemoryPool 设置 memoryPool,对于 memoryPool 为空的场景则会初始化一个 HeapMemorySegmentPool, 到此 memoryPool 就都有值了
2.4 创建完 TableWriteImpl 后会紧接着通过 withMemoryPoolFactory 设置 memoryPoolFactory, 因为 memoryPoolFactory 是空的所以忽略 -
TableWriteImpl 有属性 FileStoreWrite<T> write
3.1 他是在构建 TableWriteImpl 时创建的,对于主键表创建的是 KeyValueFileStoreWrite
3.2 在 TableWriteImpl 调用 withMemoryPool 设置 memoryPool 实际还是调用的 KeyValueFileStoreWrite withMemoryPool 这样 memoryPool 不为空了也传导到了 KeyValueFileStoreWrite
3.3 KeyValueFileStoreWrite 执行 withMemoryPool 时其实是 new MemoryPoolFactory(memoryPool) 然后调用
withMemoryPoolFactory 把这个 MemoryPoolFactory 赋值给 KeyValueFileStoreWrite 的 writeBufferPool 属性
3.4 这样 KeyValueFileStoreWrite 就有 MemoryPoolFactory, MemoryPoolFactory 里面放的是 memoryPool
此时 MemoryPoolFactory 里面的 memoryPool 叫做 innerPool
3.5 MemoryPoolFactory 里面一个 owners 其实对应的 KeyValueFileStoreWrite 的 writers,同一个 subTask 针对不同的 分区和 bucket 都会创建一个 write 所以 KeyValueFileStoreWrite 里面有多个 writer
3.6 所以到这里大概可以猜测出 KeyValueFileStoreWrite 是一个 subTask 一个, MemoryPoolFactory 也是一个,又因为 MemoryPoolFactory 里面只有一个 innerPool 看上去也是 subTask 一个 innerPool -
KeyValueFileStoreWrite 有属性 MemoryPoolFactory writeBufferPool; 有一个 createWriter 方法
4.1 writeBufferPool 来自于 KeyValueFileStoreWrite 新建的 MemoryPoolFactory
4.2 createWriter 方法针对每个分区和 bucket 创建一个 MergeTreeWriter, 创建完 MergeTreeWriter 之后会给这个 MergeTreeWriter 创建一个 OwnerMemoryPool, OwnerMemoryPool 和 MergeTreeWriter 是 一一 对应的,但是操作的还是 MemoryPoolFactory 的 innerPool。之后还会初始化 MergeTreeWriter 的 writeBuffer
此时 writeBuffer 是 new SortBufferWriteBuffer , 构造 SortBufferWriteBuffer 会把 OwnerMemoryPool 传进去 -
MergeTreeWriter 有属性 WriteBuffer writeBuffer
5.1 在 KeyValueFileStoreWrite 创建完成 MergeTreeWriter 后会创建一个 OwnerMemoryPool, writeBuffer 里面会与这个 OwnerMemoryPool
5.2 writeBuffer 是 SortBufferWriteBuffer -
SortBufferWriteBuffer 有属性 SortBuffer buffer
6.1 SortBuffer 根据是否 spill 会生成 BinaryExternalSortBuffer 或者 BinaryInMemorySortBuffer
6.2 SortBuffer 里面会用到 memoryPool 也就是 OwnerMemoryPool
画个图总结如下
![](https://img.haomeiwen.com/i15838615/b5359e461f72ddeb.png)
网友评论