美文网首页
FlinkSQL元数据验证

FlinkSQL元数据验证

作者: 忍者絮 | 来源:发表于2020-11-24 20:48 被阅读0次

    Flink1.9以后引入CatalogManager来管理Catalog和CatalogBaseTable,在执行DDL语句时将表信息封装为CatalogBaseTable存储在CatalogManager中。同时扩展了calcite的Schema接口使得Calcite在Validate阶段能够读取CatalogManager中的表信息。

    CatalogTable写入

    通过执行DDL语句,查看BlinkPlanner如何解析DDL语句,并存储到CatalogManamer中,重点查看如何解析**protime as PROCTIME() **计算列的。

    CREATE TABLE user_address (
         userId BIGINT,
         addressInfo VARCHAR,
         proctime AS PROCTIME()
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'localhost:9092',
      'topic' = 'tp02',
      'format' = 'json',
      'scan.startup.mode' = 'latest-offset'
     )
    

    执行createCatalogTable调用链路。

    org.apache.flink.table.api.internal.TableEnvironmentImpl#executeSql
        org.apache.flink.table.planner.delegation.ParserImpl#parse
            org.apache.flink.table.planner.operations.SqlToOperationConverter#convert
                org.apache.flink.table.planner.operations.SqlCreateTableConverter#convertCreateTable
                    // 从SqlCreateTable语句中解析出CatalogTable
                    org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
    

    从SqlCreateTable中提取TableSchema、表分区、主键、注释等信息,从而构建CatalogTableImpl。

        private CatalogTable createCatalogTable(SqlCreateTable sqlCreateTable) {
            final TableSchema sourceTableSchema;
            final List<String> sourcePartitionKeys;
            final List<SqlTableLike.SqlTableLikeOption> likeOptions;
            final Map<String, String> sourceProperties;
            // 处理 create table like ...
            if (sqlCreateTable.getTableLike().isPresent()) {
                SqlTableLike sqlTableLike = sqlCreateTable.getTableLike().get();
                CatalogTable table = lookupLikeSourceTable(sqlTableLike);
                sourceTableSchema = table.getSchema();
                sourcePartitionKeys = table.getPartitionKeys();
                likeOptions = sqlTableLike.getOptions();
                sourceProperties = table.getProperties();
            } else {
                sourceTableSchema = TableSchema.builder().build();
                sourcePartitionKeys = Collections.emptyList();
                likeOptions = Collections.emptyList();
                sourceProperties = Collections.emptyMap();
            }
            //  处理SqlTableLike中的选项,INCLUDING ALL、OVERWRITING OPTIONS、EXCLUDING PARTITIONS等
            Map<SqlTableLike.FeatureOption, SqlTableLike.MergingStrategy> mergingStrategies =
                mergeTableLikeUtil.computeMergingStrategies(likeOptions);
    
            Map<String, String> mergedOptions = mergeOptions(sqlCreateTable, sourceProperties, mergingStrategies);
            //  提取主键
            Optional<SqlTableConstraint> primaryKey = sqlCreateTable.getFullConstraints()
                .stream()
                .filter(SqlTableConstraint::isPrimaryKey)
                .findAny();
    
            // 获取TableSchema
            TableSchema mergedSchema = mergeTableLikeUtil.mergeTables(
                mergingStrategies,
                sourceTableSchema,  //  非create table like 语句,sourceTableSchema为null。
                sqlCreateTable.getColumnList().getList(),
                sqlCreateTable.getWatermark().map(Collections::singletonList).orElseGet(Collections::emptyList),
                primaryKey.orElse(null)
            );
    
            // 表分区
            List<String> partitionKeys = mergePartitions(
                sourcePartitionKeys,
                sqlCreateTable.getPartitionKeyList(),
                mergingStrategies
            );
            verifyPartitioningColumnsExist(mergedSchema, partitionKeys);
            //  注释
            String tableComment = sqlCreateTable.getComment()
                .map(comment -> comment.getNlsString().getValue())
                .orElse(null);
    
            return new CatalogTableImpl(mergedSchema,
                partitionKeys,
                mergedOptions,
                tableComment);
        }
    

    在提取TableSchema时,会将Calcite中的列的类型转换为Flink内部的数据类型。如果包含了计算列,例如procime()则会对该表达式进行验证,FlinkSqlOperatorTable类中包含了FlinkSQL的所有内置函数。

        /**
         * Function used to access a processing time attribute.
         */
        public static final SqlFunction PROCTIME =
            new CalciteSqlFunction(
                "PROCTIME",
                SqlKind.OTHER_FUNCTION,
                PROCTIME_TYPE_INFERENCE,
                null,
                OperandTypes.NILADIC,
                SqlFunctionCategory.TIMEDATE,
                false
            );
    

    TableColumn生成过程:

    • 将非计算列进行类型转换,并存储到physicalFieldNamesToTypes集合。
    • 对通过表达式生成的计算列,进行存在性验证,并返回该函数对应的RelDataType。
    • 将字段的RelDataType转换为LogicalType,再转换为DataType,并构建为TableColumn。需要具体查看不同类型系统的区别。
        private void appendDerivedColumns(
                    Map<FeatureOption, MergingStrategy> mergingStrategies,
                    List<SqlNode> derivedColumns) {
    // 非计算列进行数据转换,存储到physicalFieldNamesToTypes
                collectPhysicalFieldsTypes(derivedColumns);
    
                for (SqlNode derivedColumn : derivedColumns) {
                    final SqlTableColumn tableColumn = (SqlTableColumn) derivedColumn;
                    final TableColumn column;
                    if (tableColumn.isGenerated()) {
                        String fieldName = tableColumn.getName().toString();
                        //验证表达式,例如:proctime()函数 是否在FlinkSqlOperatorTable内注册
                        SqlNode validatedExpr = sqlValidator.validateParameterizedExpression(
                            tableColumn.getExpr().get(),
                            physicalFieldNamesToTypes);
                
    //  验证返回类型:proctime() 对应的RelDataType 为Flink扩展的TimeIndicatorRelDataType  
                        final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr);
                        column = TableColumn.of(
                            fieldName,
    //   RelDataType--->LogicalType--->DataType
                            fromLogicalToDataType(toLogicalType(validatedType)),
                            escapeExpressions.apply(validatedExpr));
    
                        computedFieldNamesToTypes.put(fieldName, validatedType);
                    } else {
                // 非计算列转换为Flink 内部的数据类型
                        String name = tableColumn.getName().getSimple();
    //    RelDataType --> LogicalType --> DataType
                        LogicalType logicalType = FlinkTypeFactory.toLogicalType(physicalFieldNamesToTypes.get(name));
                        column = TableColumn.of(name, TypeConversions.fromLogicalToDataType(logicalType));
                    }
                    columns.put(column.getName(), column);
                }
            }
    

    计算列proctime信息,需要看下proctime Function定义时绑定的类型。


    Validate 读取元数据

    FlinkSchema包含三个子类分别为:CatalogManagerCalciteSchema,CatalogCalciteSchema,DatabaseCalciteSchema。在Calcite进行Validate时,通过调用重写的getSubSchema方法依次获取Catalog、Database信息,最终从catalogManager中获取对应的Table信息。
    通过创建自定义Schema,查看Calcite获取表Schema信息。测试用例:

    // 当前Scheam包含了USERS、JOBS两张表信息,
    public class CatalogManagerCalciteSchema implements Schema {
        static Map<String, Table> TABLES = Maps.newHashMap();
    
        static {
            TABLES.put("USERS", new AbstractTable() { //note: add a table
                @Override
                public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
                    RelDataTypeFactory.Builder builder = typeFactory.builder();
                    builder.add("ID", new BasicSqlType(new RelDataTypeSystemImpl() {
                    }, SqlTypeName.INTEGER));
                    builder.add("NAME", new BasicSqlType(new RelDataTypeSystemImpl() {
                    }, SqlTypeName.CHAR));
                    builder.add("AGE", new BasicSqlType(new RelDataTypeSystemImpl() {
                    }, SqlTypeName.INTEGER));
                    return builder.build();
                }
            });
    
            TABLES.put("JOBS", new AbstractTable() {
                @Override
                public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
                    RelDataTypeFactory.Builder builder = typeFactory.builder();
    
                    builder.add("ID", new BasicSqlType(new RelDataTypeSystemImpl() {
                    }, SqlTypeName.INTEGER));
                    builder.add("NAME", new BasicSqlType(new RelDataTypeSystemImpl() {
                    }, SqlTypeName.CHAR));
                    builder.add("COMPANY", new BasicSqlType(new RelDataTypeSystemImpl() {
                    }, SqlTypeName.CHAR));
                    return builder.build();
                }
            });
        }
    
        @Override
        public Table getTable(String name) {
            return TABLES.get(name);
        }
    
        @Override
        public Set<String> getTableNames() {
            return null;
        }
    
        @Override
        public RelProtoDataType getType(String name) {
            return null;
        }
    
        @Override
        public Set<String> getTypeNames() {
            return null;
        }
    
        @Override
        public Collection<Function> getFunctions(String name) {
            return null;
        }
    
        @Override
        public Set<String> getFunctionNames() {
            return Collections.emptySet();
        }
    
        @Override
        public Schema getSubSchema(String name) {
            return null;
        }
    
        @Override
        public Set<String> getSubSchemaNames() {
            return null;
        }
    
        @Override
        public Expression getExpression(SchemaPlus parentSchema, String name) {
            return null;
        }
    
        @Override
        public boolean isMutable() {
            return false;
        }
    
        @Override
        public Schema snapshot(SchemaVersion version) {
            return this;
        }
    }
    
    
        public static void main(String[] args) throws SqlParseException {
            // CatalogManagerCalciteSchema是自定义的,非Flink内部的
            CalciteSchema rootSchema =
                    CalciteSchemaBuilder.asRootSchema(new CatalogManagerCalciteSchema());
            SchemaPlus schemaPlus = rootSchema.plus();
     
            SqlTypeFactoryImpl factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
            //  创建CalciteCatalogReader在rel阶段时从SimpleCalciteSchema中读取元数据
            CalciteCatalogReader calciteCatalogReader = new CalciteCatalogReader(
                    CalciteSchema.from(schemaPlus),
                    CalciteSchema.from(schemaPlus).path(null),
                    factory,
                    new CalciteConnectionConfigImpl(new Properties()));
                  
            String sql = "select u.id as user_id, u.name as user_name, j.company as user_company, u.age as user_age \n"
                    + "from users u join jobs j on u.name=j.name";
    
            SqlParser parser = SqlParser.create(sql, SqlParser.Config.DEFAULT);
            SqlNode sqlNode = parser.parseStmt();
    
            SqlValidator
                    validator = SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(), calciteCatalogReader, factory,
                    SqlConformanceEnum.DEFAULT);
            SqlNode validateSqlNode = validator.validate(sqlNode);
            System.out.println(validateSqlNode);
        }
    

    Calcite通过Validate访问CatalogManagerCalciteSchema的调用链路,getSubSchema为null时代表没有子的Schema信息,则从当前Scheam读取Table信息。


    Flink定义了三级Schema,通过调用getSubSchema从CatalogManager中读取Catalog、Database、Table。具体调用需要参考:org.apache.calcite.sql.validate.EmptyScope#resolve_。

    • CatalogManagerCalciteSchema#getSubSchemaNames:通过表名中的catalog信息,从catalogManager中获取CatalogSchema。
    @Override
        public Schema getSubSchema(String name) {
            if (catalogManager.schemaExists(name)) {
                return new CatalogCalciteSchema(name, catalogManager, isStreamingMode);
            } else {
                return null;
            }
        }
    
    • CatalogCalciteSchema#getSubSchemaNames:通过表名中的database信息,从catalogManager中获取DatabaseSchema。
        /**
         * Look up a sub-schema (database) by the given sub-schema name.
         *
         * @param schemaName name of sub-schema to look up
         * @return the sub-schema with a given database name, or null
         */
        @Override
        public Schema getSubSchema(String schemaName) {
            if (catalogManager.schemaExists(catalogName, schemaName)) {
                return new DatabaseCalciteSchema(schemaName, catalogName, catalogManager, isStreamingMode);
            } else {
                return null;
            }
        }
    
    • DatabaseSchema没有SubScheam,则从当前Schema中获取Table信息。
        public Table getTable(String tableName) {
            ObjectIdentifier identifier = ObjectIdentifier.of(catalogName, databaseName, tableName);
            return catalogManager.getTable(identifier)
                .map(result -> {
                    CatalogBaseTable table = result.getTable();
                    FlinkStatistic statistic = getStatistic(result.isTemporary(), table, identifier);
                    return new CatalogSchemaTable(
                        identifier,
                        result,
                        statistic,
                        catalogManager.getCatalog(catalogName).orElseThrow(IllegalStateException::new),
                        isStreamingMode);
                })
                .orElse(null);
        }
    

    Proctime 字段验证

    flinkSQL在validate读取Table schema时,会对计算列rowtime、proctime类型进行转换,转换为calcite能识别的RelDataType类型。 先列举下计算列类型转换的代码。

    ## CatalogManager
    public Optional<TableLookupResult> getTable(ObjectIdentifier objectIdentifier) {
            CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);
            if (temporaryTable != null) {
                TableSchema resolvedSchema = resolveTableSchema(temporaryTable);
                return Optional.of(TableLookupResult.temporary(temporaryTable, resolvedSchema));
            } else {
                return getPermanentTable(objectIdentifier);
            }
        }
    
    ## org.apache.flink.table.api.internal.CatalogTableSchemaResolver#resolve
    
        /**
         * Resolve the computed column's type for the given schema.
         *
         * @param tableSchema Table schema to derive table field names and data types
         * @return the resolved TableSchema
         */
        public TableSchema resolve(TableSchema tableSchema) {
            final String rowtime;
            String[] fieldNames = tableSchema.getFieldNames();
            DataType[] fieldTypes = tableSchema.getFieldDataTypes();
    
            TableSchema.Builder builder = TableSchema.builder();
            for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
                TableColumn tableColumn = tableSchema.getTableColumns().get(i);
                DataType fieldType = fieldTypes[i];
    
                if (tableColumn.isGenerated()) {
            //  通过获取计算列的表达式,提取对应的DataType
                    fieldType = resolveExpressionDataType(tableColumn.getExpr().get(), tableSchema);
                    if (isProctime(fieldType)) {
                        if (fieldNames[i].equals(rowtime)) {
                            throw new TableException("Watermark can not be defined for a processing time attribute column.");
                        }
                    }
                }
                    ......
    
                if (tableColumn.isGenerated()) {
                    builder.field(fieldNames[i], fieldType, tableColumn.getExpr().get());
                } else {
                    builder.field(fieldNames[i], fieldType);
                }
            }
            tableSchema.getWatermarkSpecs().forEach(builder::watermark);
            tableSchema.getPrimaryKey().ifPresent(
                    pk -> builder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0])));
            return builder.build();
        }
    
        # org.apache.flink.table.api.internal.CatalogTableSchemaResolver#resolveExpressionDataType
      
      private DataType resolveExpressionDataType(String expr, TableSchema tableSchema) {
            ResolvedExpression resolvedExpr = parser.parseSqlExpression(expr, tableSchema);
            if (resolvedExpr == null) {
                throw new ValidationException("Could not resolve field expression: " + expr);
            }
            return resolvedExpr.getOutputDataType();
        }
      
      # org.apache.flink.table.planner.delegation.ParserImpl#parseSqlExpression
      
      public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
            SqlExprToRexConverter sqlExprToRexConverter = sqlExprToRexConverterCreator.apply(inputSchema);
            RexNode rexNode = sqlExprToRexConverter.convertToRexNode(sqlExpression);
            //    [[RelDataType]]  ---->  [[LogicalType]]
            LogicalType logicalType = FlinkTypeFactory.toLogicalType(rexNode.getType());
            return new RexNodeExpression(rexNode, TypeConversions.fromLogicalToDataType(logicalType));
        }
      
      # org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl#convertToRexNodes
      
      public RexNode[] convertToRexNodes(String[] exprs) {
            //  通过构造临时表查询,获取RexNode
            String query = String.format(QUERY_FORMAT, String.join(",", exprs));
            SqlNode parsed = planner.parser().parse(query);
            SqlNode validated = planner.validate(parsed);
        // 转换为relNode
            RelNode rel = planner.rel(validated).rel;
            // The plan should in the following tree
            // LogicalProject
            // +- TableScan
            if (rel instanceof LogicalProject
                && rel.getInput(0) != null
                && rel.getInput(0) instanceof TableScan) {
                return ((LogicalProject) rel).getProjects().toArray(new RexNode[0]);
            } else {
                throw new IllegalStateException("The root RelNode should be LogicalProject, but is " + rel.toString());
            }
        }
    
      
    

    proctime()类型提取大致流程:

    1. 包含计算列则将建表语句中TableSchema注册为一张表temp_table
    2. 根据建表中的计算列的表达式,例如proctime(),构建临时查询语句select proctime() from temp_tableproctime() 为Flink 内置函数
    3. 对该查询语句进行validate,并转换RelNode,从RelNode提取行表达式RexNode。
    4. 从RexNode提取proctime() 对应的RelDataType,最终转换为DataType。

    [FLINK-18378]之前对计算列的处理流程。根据proctime、rowtime单独做了区分。
    疑问:在DDL语句中已经将proctime转换为DataType,在validate获取Table schema是直接拿fieldType即可,为什么还要做一次解析。

    for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
                TableColumn tableColumn = tableSchema.getTableColumns().get(i);
                DataType fieldType = fieldTypes[i];
                if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
                    if (fieldNames[i].equals(rowtime)) {
                        throw new TableException("Watermark can not be defined for a processing time attribute column.");
                    }
                    TimestampType originalType = (TimestampType) fieldType.getLogicalType();
                    LogicalType proctimeType = new TimestampType(
                            originalType.isNullable(),
                            TimestampKind.PROCTIME,
                            originalType.getPrecision());
                    fieldType = TypeConversions.fromLogicalToDataType(proctimeType);
                } else if (isStreamingMode && fieldNames[i].equals(rowtime)) {
                    TimestampType originalType = (TimestampType) fieldType.getLogicalType();
                    LogicalType rowtimeType = new TimestampType(
                            originalType.isNullable(),
                            TimestampKind.ROWTIME,
                            originalType.getPrecision());
                    fieldType = TypeConversions.fromLogicalToDataType(rowtimeType);
                }
                if (tableColumn.isGenerated()) {
                    builder.field(fieldNames[i], fieldType, tableColumn.getExpr().get());
                } else {
                    builder.field(fieldNames[i], fieldType);
                }
            }
    

    相关文章

      网友评论

          本文标题:FlinkSQL元数据验证

          本文链接:https://www.haomeiwen.com/subject/oduwiktx.html