美文网首页
flink自定义table source

flink自定义table source

作者: 无暇的风笛 | 来源:发表于2021-06-15 14:05 被阅读0次

    一、概述

    Flink User-defined Sources & Sinks 官方文档

    flink 动态表是一个逻辑概念,动态表是table & sql api的核心概念,用于以统一的方式处理流、批数据。

    以下是表连接器的一般架构

    1. Metadata

      执行create table声明表,这个操作不会修改外部系统的物理数据,元数据会表示为CatalogTable的一个实例

    2. Planning

      这一步是在表执行时和解析优化时,程序将CatalogTable解析为DynamicTableSource(对应select操作)或DynamicTableSink(对应insert操作)

      主要操作是编写DynamicTableSourceFactory和DynamicTableSinkFactory逻辑,验证with里面的参数是否正确,并将元数据CatalogTable转换为DynamicTableSource和DynamicTableSink。

      工厂类必须提供有效的工厂标识符(对应connector = 'xxxxxx' 的内容),让Java的spi发现并注册服务。spi机制可以参考这篇文章:https://www.cnblogs.com/lovesqcc/p/5229353.html

    3. Runtime

      逻辑规划完成后,规划器将从表连接器获取到runtime的实现类

    二、自定义SourceConnector

    下面实现一个简单的自定义source connector,主要目的是熟悉一遍开发流程。

    自定义SourceConnector逻辑是根据配置的列(字符型),随机返回字段表达式里的任意字符。

    1. 首先实现DynamicTableSourceFactory类方法

      package com.github.knaufk.flink.test;
      
      import org.apache.flink.configuration.ConfigOption;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.table.api.TableSchema;
      import org.apache.flink.table.api.ValidationException;
      import org.apache.flink.table.catalog.CatalogTable;
      import org.apache.flink.table.connector.source.DynamicTableSource;
      import org.apache.flink.table.factories.DynamicTableSourceFactory;
      import org.apache.flink.table.factories.FactoryUtil;
      import org.apache.flink.table.utils.TableSchemaUtils;
      
      import java.util.HashSet;
      import java.util.Set;
      
      import static org.apache.flink.configuration.ConfigOptions.key;
      
      public class XiaolongTableSourceFactory implements DynamicTableSourceFactory {
      
          private static final String IDENTIFIER = "xiaolong";
      
          private static final String FIELDS = "fields";
      
          private static final String EXPRESSION = "expression";
      
          private static final ConfigOption<Long> ROWS_PER_SECOND =
                  key("rows-per-second")
                          .longType()
                          .defaultValue(1000L)
                          .withDescription("Rows per second to emit.");
      
          private static final ConfigOption<String> SPLIT_STR =
                  key("split-str")
                          .stringType()
                          .defaultValue(",")
                          .withDescription("the delimited of string.");
      
      
          @Override
          public DynamicTableSource createDynamicTableSource(Context context) {
              CatalogTable catalogTable = context.getCatalogTable();
      
              // 获取with配置
              Configuration options = new Configuration();
              context.getCatalogTable().getOptions().forEach(options::setString);
      
              // 获取表定义
              TableSchema schema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
              String[] fieldExpressions = new String[schema.getFieldCount()];
      
              Set<ConfigOption<?>> fieldOptions = new HashSet<>();
      
              for (int i = 0; i < fieldExpressions.length; i++) {
                  String fieldName = schema.getFieldName(i).get();
                  // 校验字段配置是否正确
                  ConfigOption<String> filed =
                          key(FIELDS + "." + fieldName + "." + EXPRESSION).stringType().noDefaultValue();
                  fieldExpressions[i] = validateFieldExpression(filed, options);
                  fieldOptions.add(filed);
              }
              // 校验配置
              FactoryUtil.validateFactoryOptions(requiredOptions(), fieldOptions, options);
              return new XiaolongTableSource(fieldExpressions, options.get(ROWS_PER_SECOND), options.get(SPLIT_STR));
          }
      
          /**
           * @return
           * connector的标识
           */
          @Override
          public String factoryIdentifier() {
              return IDENTIFIER;
          }
      
          /**
           * 必须配置的参数,默认返回空集合
           * @return
           */
          @Override
          public Set<ConfigOption<?>> requiredOptions() {
              return new HashSet<>();
          }
      
          /**
           * 可额外配置的参数,默认返回空集合
           * @return
           */
          @Override
          public Set<ConfigOption<?>> optionalOptions() {
              Set<ConfigOption<?>> options = new HashSet<>();
              options.add(ROWS_PER_SECOND);
              options.add(SPLIT_STR);
              return options;
          }
      
          
          private String validateFieldExpression(ConfigOption<String> field, Configuration options) {
              String fieldExpression = options.get(field);
              if (fieldExpression == null) {
                  throw new ValidationException(
                          "Every column needs a corresponding expression. No expression found for "
                                  + field.key()
                                  + ".");
              }
              return fieldExpression;
          }
      }
      
      
    2. 实现DynamicTableSource子类方法

      package com.github.knaufk.flink.test;
      
      import org.apache.flink.table.connector.ChangelogMode;
      import org.apache.flink.table.connector.source.DynamicTableSource;
      import org.apache.flink.table.connector.source.ScanTableSource;
      import org.apache.flink.table.connector.source.SourceFunctionProvider;
      
      
      public class XiaolongTableSource implements ScanTableSource {
      
          private String[] fieldExpression;
      
          private long numsPerSec;
      
          private String split;
      
          public XiaolongTableSource(String[] fieldExpression, long numsPerSec, String split) {
              this.fieldExpression = fieldExpression;
              this.numsPerSec = numsPerSec;
              this.split = split;
          }
      
      
          @Override
          public ChangelogMode getChangelogMode() {
              return ChangelogMode.insertOnly();
          }
      
          @Override
          public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
              return SourceFunctionProvider.of(new XiaolongSource(fieldExpression, numsPerSec, split), false);
          }
      
          @Override
          public DynamicTableSource copy() {
              return new XiaolongTableSource(fieldExpression, numsPerSec, split);
          }
      
          @Override
          public String asSummaryString() {
              return "XiaolongTableSource";
          }
      }
      
      
    3. 实现SourceFunction类方法

      这里可以自定义继承的SourceFunction方法和实现CheckpointedFunction方法实现Exactly-Once,这里只简单实现了RichSourceFunction

      package com.github.knaufk.flink.test;
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
      import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
      import org.apache.flink.table.data.GenericRowData;
      import org.apache.flink.table.data.RowData;
      import org.apache.flink.table.data.StringData;
      
      import java.util.Arrays;
      import java.util.List;
      import java.util.Random;
      
      public class XiaolongSource extends RichSourceFunction<RowData> {
      
          private String[] fieldExpressions;
      
          private long numsPerSec;
      
          private volatile boolean cancelled;
      
          private String split;
      
          public XiaolongSource(String[] fieldExpressions, long numsPerSec, String split) {
              this.fieldExpressions = fieldExpressions;
              this.numsPerSec = numsPerSec;
              this.split = split;
          }
      
          @Override
          public void run(SourceContext<RowData> sourceContext) throws Exception {
              while (!cancelled) {
                  for (long i = 0; i < numsPerSec; i++) {
                      RowData rowData = generateRow();
                      sourceContext.collect(rowData);
                  }
                  Thread.sleep(1000);
              }
          }
      
          @Override
          public void cancel() {
              cancelled = true;
          }
      
          @Override
          public void open(Configuration parameters) throws Exception {
              super.open(parameters);
          }
      
      
          private RowData generateRow() {
              GenericRowData row = new GenericRowData(fieldExpressions.length);
              for (int i = 0; i < fieldExpressions.length; i++) {
                  String fieldExpression = fieldExpressions[i];
                  List<String> result = Lists.newArrayList();
                  result.addAll(Arrays.asList(fieldExpression.split(split)));
                  Random random = new Random();
                  String value = result.get(random.nextInt(result.size()));
                  row.setField(i, StringData.fromString(value));
              }
              return row;
          }
      }
      
    4. 配置文件

      最后在META-INF/services/org.apache.flink.table.factories.Factory配置上自定义的source connector

      com.flink.test.XiaolongTableSourceFactory
      
    5. 打成jar包复制到flink lib目录下

    三、环境搭建

    这里用Zeppelin来做测试环境,具体的搭建过程可以查看文章:https://www.jianshu.com/p/091d2490a969

    四、测试

    1、表声明


    2、执行查询,流数据会一直产生


    相关文章

      网友评论

          本文标题:flink自定义table source

          本文链接:https://www.haomeiwen.com/subject/qmszeltx.html