一、概述
FlinkSource适用于通过Flink方式来读取iceberg的table,类似TableScan(并且每个Scan都会关联一个ScanContext)。
FlinkSource能够读取有界静态数据,同时也是支持增量数据和流式读取。
- 批式读取
1、不包含startSnapshotId(OPTIONS中指定)
2、若是指定了startSnapshotId并未指定endSnapshotId - 流式读取
1、使用startSnapshotId=-1并且未指定endSnapshotId
二、实现
1、默认情况下FlinkSource是不允许构造的(私有化构造器)
2、通过Builder设计模式将复杂对象的构建与其表示分离;
// ============ 省略部分代码 ============== //
// flink stream环境
private StreamExecutionEnvironment env;
// iceberg table
private Table table;
// 用于loader iceberg表
private TableLoader tableLoader;
// iceberg的schema描述
private TableSchema projectedSchema;
// 进行icebeg 表读取的配置参数
private ReadableConfig readableConfig = new Configuration();
// 构建scan上下文(很关键);能够进行filter/limit/properties设置/snapshotid指定/是否区分大小写
// datafile split/数据回放等
private final ScanContext.Builder contextBuilder = ScanContext.builder();
// ============ 省略部分代码 ============== //
关键代码
// ============== buildFormat =============== //
public FlinkInputFormat buildFormat() {
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
// 对应的iceberg表的schema
Schema icebergSchema;
// 用于iceberg的读取(实例化的时候注意是可序列化的[写入的时候是串行的,读写可以同时存在])
FileIO io;
// iceberg datafile文件的加解密的(实例化的时候也是需要注意是可序列化的)
EncryptionManager encryption;
if (table == null) { // 加载iceberg表
// load required fields by table loader.
// 表加载器来完成内容的加载
tableLoader.open();
try (TableLoader loader = tableLoader) {
table = loader.loadTable();
icebergSchema = table.schema();
io = table.io();
encryption = table.encryption();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} else {
icebergSchema = table.schema();
io = table.io();
encryption = table.encryption();
}
if (projectedSchema == null) {
contextBuilder.project(icebergSchema);
} else {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
}
// 最终构建一个RichInputFormat
return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build());
}
构建DataStream
public DataStream<RowData> build() {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
// 1、构建source读取的input format
FlinkInputFormat format = buildFormat();
// 2、构建读取scan
ScanContext context = contextBuilder.build();
// 3、用于iceberg与flink的类型转换
TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));
// 4、构建datastream
if (!context.isStreaming()) { // 4.1 批模式
// 指定读取时的并行度
int parallelism = inferParallelism(format, context);
// 创建datastream
return env.createInput(format, typeInfo).setParallelism(parallelism);
} else { // 4.2 流模式
// 根据指定monitor周期进行数据读取
StreamingMonitorFunction function = new StreamingMonitorFunction(tableLoader, context);
String monitorFunctionName = String.format("Iceberg table (%s) monitor", table);
String readerOperatorName = String.format("Iceberg table (%s) reader", table);
// 按照指定的monitor进行数据读取
return env.addSource(function, monitorFunctionName)
.transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));
}
}
关于批模式读取时的并行度设置
// 用于flink批模式下读取时的parallelism的大小设置
// parallelism来自当前读取的split的个数 以及 手动指定的table.exec.resource.default-parallelism两个选项中最小的那个值
int inferParallelism(FlinkInputFormat format, ScanContext context) {
int parallelism = readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); // 手动指定的table.exec.resource.default-parallelism
if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) { // 是否开启根据split个数来推断当前读取的并行度个数;若是未指定的时候,则使用手动指定
int maxInferParallelism = readableConfig.get(FlinkConfigOptions
.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX); // 默认指定读取时的并行度:100
Preconditions.checkState(
maxInferParallelism >= 1,
FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
int splitNum;
try {
FlinkInputSplit[] splits = format.createInputSplits(0); // 对datafiles读取进行split的个数
splitNum = splits.length;
} catch (IOException e) {
throw new UncheckedIOException("Failed to create iceberg input splits for table: " + table, e);
}
parallelism = Math.min(splitNum, maxInferParallelism); // 取split和默认MAX_parallelism的最小值
}
if (context.limit() > 0) { // 还需要注意limit参数的设置防止parallelism设置过大
int limit = context.limit() >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) context.limit();
parallelism = Math.min(parallelism, limit);
}
// parallelism must be positive.
parallelism = Math.max(1, parallelism);
return parallelism;
}
三、关于FlinkSource的批读取
相对比较简单通过Stream Execution Environment来创建InputFormat
四、关于FlinkSource的流读取
首先指定monitor function定时获取对应的datafiles内容,并执行split,再将生成的FlinkInputSplit下发给下游的task进行处理;
网友评论