美文网首页FlinkFlink学习指南
Flink 源码之 SQL TableSource 和 Tabl

Flink 源码之 SQL TableSource 和 Tabl

作者: AlienPaul | 来源:发表于2021-06-30 16:46 被阅读0次

    Flink源码分析系列文档目录

    请点击:Flink 源码分析系列文档目录

    前言

    Flink SQL可以将多种数据源或数据落地端映射为table,使用起来非常方便。本篇以Flink自带的datagen类型表数据源和print类型表落地端为例,为大家分析TableSourceTableSink的代码实现逻辑。

    TableSource

    DynamicTableSourceFactory

    Flink使用SPI机制加载Factory(DynamicTableSourceFactoryDynamicTableSinkFactory同属Factory)。在flink-table-api-java-bridge项目的resources/META-INF/services目录我们可以找到org.apache.flink.table.factories.Factory文件,内容为:

    org.apache.flink.table.factories.DataGenTableSourceFactory
    org.apache.flink.table.factories.BlackHoleTableSinkFactory
    org.apache.flink.table.factories.PrintTableSinkFactory
    

    Flink启动的时候会根据这个文件,去加载这3个实现类。可以看出我们接下来要分析的datagen和print就在其中。

    接下来我们看下TableSource需要实现的接口DynamicTableSourceFactory,它有几个重要的方法:

    指定TableSource配置属性的方法:

    • requiredOptions方法:返回table必须的属性配置
    • optionalOptions方法:返回table可选的属性配置

    factoryIdentifier方法:指定该tableSource的类型是什么

    只看方法名称大家可能没体会到它们的作用。这里结合create table SQL语句来说明。

    CREATE TABLE datagen (
      ...
    ) WITH (
     'connector' = 'datagen',
    
     -- optional options --
    
     'rows-per-second'='5',
     ...
    )
    

    可以看出我们使用SQL创建表,需要指定连接器connector,connector是和我们自定义的TableSourceTableSinkfactoryIdentifier方法的返回值关联的。

    下面的'rows-per-second'='5'为table的一个option(选项或属性),用于为TableSourceTableSink传递参数。其中必须指定的参数通过requiredOptions方法指定,可选的参数通过optionalOptions方法指定。

    table的option为ConfigOption<T>类型,使用如下方式构造(使用专门的builder):

    public static final ConfigOption<Long> NUMBER_OF_ROWS =
      key("number-of-rows")
          .longType()
          .defaultValue(UNLIMITED_ROWS)
          .withDescription("Total number of rows to emit. By default, the source is unbounded.");
    

    最后还有一个createDynamicTableSource方法,负责返回创建的自定义TableSource

    接下来我们以DataGenTableSourceFactory为例,分析DynamicTableSourceFactory的使用。

    官网创建datagen类型table的方式如下:

    CREATE TABLE datagen (
     f_sequence INT,
     f_random INT,
     f_random_str STRING,
     ts AS localtimestamp,
     WATERMARK FOR ts AS ts
    ) WITH (
     'connector' = 'datagen',
    
     -- optional options --
    
     'rows-per-second'='5',
    
     'fields.f_sequence.kind'='sequence',
     'fields.f_sequence.start'='1',
     'fields.f_sequence.end'='1000',
    
     'fields.f_random.min'='1',
     'fields.f_random.max'='1000',
    
     'fields.f_random_str.length'='10'
    )
    

    连接器参数如下表:

    参数 是否必选 默认值 数据类型 描述
    connector 必须 (none) String 指定要使用的连接器,这里是 'datagen'。
    rows-per-second 可选 10000 Long 每秒生成的行数,用以控制数据发出速率。
    fields.#.kind 可选 random String 指定 '#' 字段的生成器。可以是 'sequence' 或 'random'。
    fields.#.min 可选 (Minimum value of type) (Type of field) 随机生成器的最小值,适用于数字类型。
    fields.#.max 可选 (Maximum value of type) (Type of field) 随机生成器的最大值,适用于数字类型。
    fields.#.length 可选 100 Integer 随机生成器生成字符的长度,适用于 char、varchar、string。
    fields.#.start 可选 (none) (Type of field) 序列生成器的起始值。
    fields.#.end 可选 (none) (Type of field) 序列生成器的结束值。

    具体使用方式参见Flink官网:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/datagen/

    接下来是代码分析过程。DataGenTableSourceFactory源代码如下:

    // 定义connector类型为datagen
    public static final String IDENTIFIER = "datagen";
    public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L;
    
    // 定义属性,含义为每秒生成多少行数据
    public static final ConfigOption<Long> ROWS_PER_SECOND =
            key("rows-per-second")
                    .longType()
                    .defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE)
                    .withDescription("Rows per second to control the emit rate.");
    
    // 总共生成多少行数据,默认为无限生成
    public static final ConfigOption<Long> NUMBER_OF_ROWS =
            key("number-of-rows")
                    .longType()
                    .noDefaultValue()
                    .withDescription(
                            "Total number of rows to emit. By default, the source is unbounded.");
    
    public static final String FIELDS = "fields";
    public static final String KIND = "kind";
    public static final String START = "start";
    public static final String END = "end";
    public static final String MIN = "min";
    public static final String MAX = "max";
    public static final String LENGTH = "length";
    
    public static final String SEQUENCE = "sequence";
    public static final String RANDOM = "random";
    
    @Override
    public String factoryIdentifier() {
        // 返回TableSource对应的connector类型
        return IDENTIFIER;
    }
    
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        // 必选属性,datagen没有必选属性
        return new HashSet<>();
    }
    
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        // 可选属性
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(ROWS_PER_SECOND);
        options.add(NUMBER_OF_ROWS);
        return options;
    }
    
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        // 创建一个空白的Configuration
        Configuration options = new Configuration();
        // 获取table的所有配置选项填入Configuration
        context.getCatalogTable().getOptions().forEach(options::setString);
    
        // 获取表的物理字段,计算字段和元数据字段会被过滤掉
        TableSchema schema =
                TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
        // 创建DataGenerator数组,每一字段对应一个DataGenerator
        DataGenerator<?>[] fieldGenerators = new DataGenerator[schema.getFieldCount()];
        // 所有的可选选项
        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
    
        // 遍历所有字段
        for (int i = 0; i < fieldGenerators.length; i++) {
            // 获取字段名称
            String name = schema.getFieldNames()[i];
            // 获取字段的数据类型
            DataType type = schema.getFieldDataTypes()[i];
    
            // 创建kind配置选项,默认值RANDOM
            ConfigOption<String> kind =
                    key(FIELDS + "." + name + "." + KIND).stringType().defaultValue(RANDOM);
            
            // 创建各个字段对应的生成器,RANDOM或者Sequence
            DataGeneratorContainer container =
                    createContainer(name, type, options.get(kind), options);
            fieldGenerators[i] = container.getGenerator();
    
            // 添加属性到可选选项中
            optionalOptions.add(kind);
            optionalOptions.addAll(container.getOptions());
        }
    
        // 校验所有的配置值是否合法
        FactoryUtil.validateFactoryOptions(requiredOptions(), optionalOptions, options);
    
        // 创建consumedOptionKeys,获取所有已填写的选项
        Set<String> consumedOptionKeys = new HashSet<>();
        consumedOptionKeys.add(CONNECTOR.key());
        consumedOptionKeys.add(ROWS_PER_SECOND.key());
        consumedOptionKeys.add(NUMBER_OF_ROWS.key());
        optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
        
        // 校验是否有未使用的配置项
        FactoryUtil.validateUnconsumedKeys(
                factoryIdentifier(), options.keySet(), consumedOptionKeys);
    
        // 获取table名称
        String name = context.getObjectIdentifier().toString();
        // 创建一个DataGenTableSource
        return new DataGenTableSource(
                fieldGenerators,
                name,
                schema,
                options.get(ROWS_PER_SECOND),
                options.get(NUMBER_OF_ROWS));
    }
    
    // 根据字段名,字段类型等创建出字段内容生成器
    private DataGeneratorContainer createContainer(
            String name, DataType type, String kind, ReadableConfig options) {
        switch (kind) {
            case RANDOM:
                return type.getLogicalType().accept(new RandomGeneratorVisitor(name, options));
            case SEQUENCE:
                return type.getLogicalType().accept(new SequenceGeneratorVisitor(name, options));
            default:
                throw new ValidationException("Unsupported generator kind: " + kind);
        }
    }
    

    DynamicTableSource

    DynamicTableSource 负责从外部系统创建出一个动态表。该接口包含有如下两个子接口:

    • ScanTableSource
    • LookupTableSource

    这两个接口的方法基本相同,主要区别为使用时需要全量扫描数据源还是根据key查询数据源。他们具有如下主要方法:

    • copy方法:在生成执行计划阶段需要创建DynamicTableSink的一个部分,该方法包含创建副本的逻辑,要求为深拷贝。
    • asSummaryString方法:用于在日志或者是控制台中,打印该TableSink的文字描述。
    • getChangelogMode:返回Sink支持的变更模式,planner可提供建议但是最终决定在于Sink,如果planner和sink支持的模式冲突,则抛出异常。支持的模式有INSERT_ONLY,UPSERT和ALL。

    还有一个方法是创建DynamicTableSource。这个方法在上面两个接口中的名字不同。其中:

    • ScanTableSource接口对应getScanRuntimeProvider方法。系统运行时需要扫描外部系统中所有的数据行。
    • LookupTableSource接口对应getLookupRuntimeProvider方法。系统运行时需要根据一个或多个key查找外部系统中的数据行。

    上面例子中的datagen使用的是ScanTableSource类型。

    接着datagen这个例子。DataGenTableSource代码和分析如下:

    @Internal
    public class DataGenTableSource implements ScanTableSource {
    
        private final DataGenerator<?>[] fieldGenerators;
        private final String tableName;
        private final TableSchema schema;
        private final long rowsPerSecond;
        private final Long numberOfRows;
    
        public DataGenTableSource(
                DataGenerator<?>[] fieldGenerators,
                String tableName,
                TableSchema schema,
                long rowsPerSecond,
                Long numberOfRows) {
            this.fieldGenerators = fieldGenerators;
            this.tableName = tableName;
            this.schema = schema;
            this.rowsPerSecond = rowsPerSecond;
            this.numberOfRows = numberOfRows;
        }
    
        @Override
        public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
            // 返回tableSource
            boolean isBounded = numberOfRows != null;
            return SourceFunctionProvider.of(createSource(), isBounded);
        }
    
        @VisibleForTesting
        public DataGeneratorSource<RowData> createSource() {
            // 构建tableSource的逻辑在此
            // 创建一个RowData生成器,包含每个字段的名称和数据类型,以及配置参数
            return new DataGeneratorSource<>(
                    new RowDataGenerator(fieldGenerators, schema.getFieldNames()),
                    rowsPerSecond,
                    numberOfRows);
        }
    
        @Override
        public DynamicTableSource copy() {
            return new DataGenTableSource(
                    fieldGenerators, tableName, schema, rowsPerSecond, numberOfRows);
        }
    
        @Override
        public String asSummaryString() {
            return "DataGenTableSource";
        }
    
        @Override
        public ChangelogMode getChangelogMode() {
            return ChangelogMode.insertOnly();
        }
    }
    

    最后我们分析DataGeneratorSource的逻辑,它其实是一个RichParallelSourceFunction。和RichSourceFunction不同的是,RichParallelSourceFunction会同时运行多个实例,数量和配置的并行度一致。

    下面分析它的open方法和run方法:

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    
        // 如果配置了行数限制
        if (numberOfRows != null) {
            // 获取任务并行度
            final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
            // 获取任务ID
            final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
    
            // 计算每个DataGenTableSource实例生成的数据行数
            final int baseSize = (int) (numberOfRows / stepSize);
            toOutput = (numberOfRows % stepSize > taskIdx) ? baseSize + 1 : baseSize;
        }
    }
    
    @Override
    public void run(SourceContext<T> ctx) throws Exception {
        // 计算每秒钟需要产生的数据行数
        double taskRowsPerSecond =
                (double) rowsPerSecond / getRuntimeContext().getNumberOfParallelSubtasks();
        long nextReadTime = System.currentTimeMillis();
    
        // 死循环
        while (isRunning) {
            // 每次批量生成taskRowsPerSecond条数据
            for (int i = 0; i < taskRowsPerSecond; i++) {
                if (isRunning
                        && generator.hasNext()
                        && (numberOfRows == null || outputSoFar < toOutput)) {
                    synchronized (ctx.getCheckpointLock()) {
                        outputSoFar++;
                        // 调用生成器生成数据
                        ctx.collect(this.generator.next());
                    }
                } else {
                    return;
                }
            }
    
            // 下批数据生成1秒后进行
            nextReadTime += 1000;
            // 由于生成数据存在耗时,这里计算生成完数据后,还需要等多久够1秒钟
            long toWaitMs = nextReadTime - System.currentTimeMillis();
            // 线程睡眠toWaitMs毫秒
            while (toWaitMs > 0) {
                Thread.sleep(toWaitMs);
                toWaitMs = nextReadTime - System.currentTimeMillis();
            }
        }
    }
    

    到这里TableSource就介绍完了。

    TableSink

    DynamicTableSinkFactory

    DynamicTableSinkFactory的主要方法和DynamicTableSourceFactory几乎完全一致。我们直接从实例分析。

    以print为例,它的使用非常简单,SQL如下所示:

    CREATE TABLE print_table (
     f0 INT,
     f1 INT,
     f2 STRING,
     f3 DOUBLE
    ) WITH (
     'connector' = 'print'
    )
    

    配置参数如下:

    参数 是否必选 默认值 数据类型 描述
    connector 必选 (none) String 指定要使用的连接器,此处应为 'print'
    print-identifier 可选 (none) String 配置一个标识符作为输出数据的前缀。
    standard-error 可选 false Boolean 如果 format 需要打印为标准错误而不是标准输出,则为 True 。
    sink.parallelism 可选 (none) Integer 为 Print sink operator 定义并行度。默认情况下,并行度由框架决定,和链在一起的上游 operator 一致。

    print类型table sink的作用为把insert到这个table的数据直接print到控制台。详细内容参见:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/print/

    下面开始源代码分析。PrintTableSinkFactory的代码如下所示:

    public static final String IDENTIFIER = "print";
    
    // 创建配置选项
    public static final ConfigOption<String> PRINT_IDENTIFIER =
            key("print-identifier")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Message that identify print and is prefixed to the output of the value.");
    
    public static final ConfigOption<Boolean> STANDARD_ERROR =
            key("standard-error")
                    .booleanType()
                    .defaultValue(false)
                    .withDescription(
                            "True, if the format should print to standard error instead of standard out.");
    
    @Override
    public String factoryIdentifier() {
        // 返回print,当设置connector为print时,使用这个DynamicTableSourceFactory
        return IDENTIFIER;
    }
    
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        // 必须的属性,返回空集合表示没有必须的属性
        return new HashSet<>();
    }
    
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        // 这个方法返回可选的属性
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(PRINT_IDENTIFIER);
        options.add(STANDARD_ERROR);
        options.add(FactoryUtil.SINK_PARALLELISM);
        return options;
    }
    
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        // 该方法返回DynamicTableSink实例
        // 主要逻辑为校验属性值,然后读取这些属性,构建出PrintSink
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        ReadableConfig options = helper.getOptions();
        return new PrintSink(
                context.getCatalogTable().getSchema().toPhysicalRowDataType(),
                options.get(PRINT_IDENTIFIER),
                options.get(STANDARD_ERROR),
                options.getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null));
    }
    

    DynamicTableSink

    它具有的方法和DynamicTableSource基本一致,只有一个方法不同:getSinkRuntimeProvider方法。这个方法是sink的关键,返回一个SinkRuntimeProvider。这个类包含如何将表中数据落地的逻辑。

    和上面例子相同,我们贴出PrintSink的源代码展开分析。

    private static class PrintSink implements DynamicTableSink {
    
        // 这里保存了table中各个column的数据类型
        private final DataType type;
        private final String printIdentifier;
        private final boolean stdErr;
        private final @Nullable Integer parallelism;
    
        private PrintSink(
                DataType type, String printIdentifier, boolean stdErr, Integer parallelism) {
            this.type = type;
            this.printIdentifier = printIdentifier;
            this.stdErr = stdErr;
            this.parallelism = parallelism;
        }
    
        @Override
        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return requestedMode;
        }
    
        @Override
        public SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            // 类型转换器,负责转换Flink内部数据类型到POJO
            DataStructureConverter converter = context.createDataStructureConverter(type);
            return SinkFunctionProvider.of(
                    new RowDataPrintFunction(converter, printIdentifier, stdErr), parallelism);
        }
    
        @Override
        public DynamicTableSink copy() {
            return new PrintSink(type, printIdentifier, stdErr, parallelism);
        }
    
        @Override
        public String asSummaryString() {
            return "Print to " + (stdErr ? "System.err" : "System.out");
        }
    }
    

    最后我们看下RowDataPrintFunction,分析它是如何将数据打印到控制台的。它实际上是一个RichSinkFunction

    private static class RowDataPrintFunction extends RichSinkFunction<RowData> {
    
        private static final long serialVersionUID = 1L;
    
        private final DataStructureConverter converter;
        private final PrintSinkOutputWriter<String> writer;
    
        private RowDataPrintFunction(
                DataStructureConverter converter, String printIdentifier, boolean stdErr) {
            this.converter = converter;
            // 创建出PrintSinkOutputWriter,输出到标准输出或者是标准错误
            this.writer = new PrintSinkOutputWriter<>(printIdentifier, stdErr);
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
            writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
        }
    
        @Override
        public void invoke(RowData value, Context context) {
            // 将table一行数据转换为Object(POJO)类型
            Object data = converter.toExternal(value);
            assert data != null;
            // 输出数据到PrintSinkOutputWriter
            writer.write(data.toString());
        }
    }
    

    到这里print类型connector源代码已分析完毕。

    本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

    相关文章

      网友评论

        本文标题:Flink 源码之 SQL TableSource 和 Tabl

        本文链接:https://www.haomeiwen.com/subject/xnvrultx.html