一、概述
主要是为了flink进行读取datafile提供读取性能以及整体吞吐量,来进行combine split的操作;并基于此来生成对应的CombineScanTask,进而生成FlinkInputSplit的。
二、实现
// 关键代码
// 生成CombineScanTask用于完成每个FlinkInputSplit
private static List<CombinedScanTask> tasks(Table table, ScanContext context) {
TableScan scan = table
.newScan() // 创建一个新的scan
.caseSensitive(context.caseSensitive()) // 当指定数据列是否需要考虑忽略大小写
.project(context.project()); // 是否进行下推读取
if (context.snapshotId() != null) { // 指定本次读取的snapshot
scan = scan.useSnapshot(context.snapshotId());
}
if (context.asOfTimestamp() != null) { // 指定本次读取的timestamp
scan = scan.asOfTime(context.asOfTimestamp());
}
if (context.startSnapshotId() != null) {
if (context.endSnapshotId() != null) { // 指定本次读取的[startSnapshot, endSnapshot)
scan = scan.appendsBetween(context.startSnapshotId(), context.endSnapshotId());
} else { // 流式读取 未指定对应的endSnapshot
scan = scan.appendsAfter(context.startSnapshotId());
}
}
if (context.splitSize() != null) { // 指定split时的大小
scan = scan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
}
if (context.splitLookback() != null) { // 指定combine split时的文件个数
scan = scan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
}
if (context.splitOpenFileCost() != null) { // 指定combine split时对应的文件大小
scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
}
if (context.filters() != null) { // 指定本次scan的filter内容,便于在进行读取下推 在读取时过滤对应的内容
for (Expression filter : context.filters()) {
scan = scan.filter(filter);
}
}
// 获取到对应的CombineScanTask集
try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
return Lists.newArrayList(tasksIterable);
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table scan: " + scan, e);
}
}
网友评论