美文网首页程序员Flink学习指南Flink
Flink 源码之 SQL 执行流程

Flink 源码之 SQL 执行流程

作者: AlienPaul | 来源:发表于2021-08-03 09:45 被阅读0次

    Flink源码分析系列文档目录

    请点击:Flink 源码分析系列文档目录

    前言

    本篇为大家带来Flink执行SQL流程的分析。它的执行步骤概括起来包含:

    • 解析。使用Calcite的解析器,解析SQL为语法树(SqlNode)。
    • 校验。按照校验规则,检查SQL的合法性,同事重写语法树。
    • 转换。将SqlNode转换为Releational tree。再包装为Flink的Operation。
    • 执行。根据上一步生成的Operation,将其翻译为执行计划。

    入口

    TableEnvironmentImpl.sqlQuery方法为SQL执行的入口,接收字符串格式的SQL语句。返回Table类型对象,可用于进一步SQL查询或变换。

    @Override
    public Table sqlQuery(String query) {
        // 使用解析器,解析SQL查询语句
        List<Operation> operations = parser.parse(query);
    
        // 如果解析出的operation多于1个,说明填写了多个SQL,不支持这样使用
        if (operations.size() != 1) {
            throw new ValidationException(
                "Unsupported SQL query! sqlQuery() only accepts a single SQL query.");
        }
    
        // 获取解析过的operation
        Operation operation = operations.get(0);
    
        // 检查SQL类型,只支持查询语句
        if (operation instanceof QueryOperation && !(operation instanceof ModifyOperation)) {
            // 根据operation构造出Table对象
            return createTable((QueryOperation) operation);
        } else {
            throw new ValidationException(
                "Unsupported SQL query! sqlQuery() only accepts a single SQL query of type " +
                    "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.");
        }
    }
    

    解析

    我们展开分析上面parser.parse(query)这一句。parser实际上是ParserImpl,封装了SQL的解析逻辑。

    ParseImplparse方法:

    @Override
    public List<Operation> parse(String statement) {
        // 获取Calcite的解析器
        CalciteParser parser = calciteParserSupplier.get();
        // 使用FlinkPlannerImpl作为validator
        FlinkPlannerImpl planner = validatorSupplier.get();
        
        // 对于一些特殊的写法,例如SET key=value。CalciteParser是不支持这种写法的
        // 为了避免在Calcite引入过多的关键字,这里定义了一组extended parser,专门用于在CalciteParser之前,解析这些特殊的语句
        Optional<Operation> command = EXTENDED_PARSER.parse(statement);
        if (command.isPresent()) {
            return Collections.singletonList(command.get());
        }
        
        
        // parse the sql query
        // 解析SQL为语法树
        SqlNode parsed = parser.parse(statement);
    
        // 将解析过的语法树转换为operator
        Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
            .orElseThrow(() -> new TableException(
                "Unsupported SQL query! parse() only accepts SQL queries of type " +
                    "SELECT, UNION, INTERSECT, EXCEPT, VALUES, ORDER_BY or INSERT;" +
                    "and SQL DDLs of type " +
                    "CREATE TABLE"));
        return Collections.singletonList(operation);
    }
    

    ExtendedParser

    ExtendedParser用于在不增加CalciteParser复杂性的前提下(不用修改Calcite,增加新的关键字),让Flink SQL支持更多专用的语法。

    ExtendedParser包含如下解析策略:

    private static final List<ExtendedParseStrategy> PARSE_STRATEGIES =
            Arrays.asList(
                    ClearOperationParseStrategy.INSTANCE,
                    HelpOperationParseStrategy.INSTANCE,
                    QuitOperationParseStrategy.INSTANCE,
                    ResetOperationParseStrategy.INSTANCE,
                    SetOperationParseStrategy.INSTANCE);
    

    这5条策略分别对应:

    • CLEAR语句,清空输出
    • HELP语句,打印帮助信息
    • EXIT或QUIT语句,退出执行环境
    • RESET语句,重设一个变量的值
    • SET语句,设置一个变量的值

    下面以较为复杂的SetOperationParseStrategy为例,讲解下Flink如何把SET语句解析为operation。

    SetOperationParseStrategy继承了AbstractRegexParseStrategy,它包含statement正则匹配的逻辑。

    // 默认的正则匹配方式,忽略大小写,点可以匹配行结束标志
    protected static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL;
    
    // 这个pattern的用于匹配语句,如果语句和pattern匹配,则使用这个strategy解析
    protected Pattern pattern;
    
    protected AbstractRegexParseStrategy(Pattern pattern) {
        this.pattern = pattern;
    }
    
    // 用于验证语句是否和pattern匹配的方法
    @Override
    public boolean match(String statement) {
        return pattern.matcher(statement.trim()).matches();
    }
    

    紧接着我们分析SetOperationParseStrategy。SET语句的pattern如下:

    protected SetOperationParseStrategy() {
        super(
                Pattern.compile(
                        "SET(\\s+(?<key>[^'\\s]+)\\s*=\\s*('(?<quotedVal>[^']*)'|(?<val>\\S+)))?",
                        DEFAULT_PATTERN_FLAGS));
    }
    

    从正则表达式可知,SET语句的格式为:SET key='quotedVal'或者SET key=val。

    转换SET语句为Operation的过程位于convert方法:

    @Override
    public Operation convert(String statement) {
        // 匹配statement
        Matcher matcher = pattern.matcher(statement.trim());
        // 创建保存操作符的集合
        final List<String> operands = new ArrayList<>();
        if (matcher.find()) {
            if (matcher.group("key") != null) {
                // 获取key和对应的quotedVal或者val,加入操作符集合
                operands.add(matcher.group("key"));
                operands.add(
                        matcher.group("quotedVal") != null
                                ? matcher.group("quotedVal")
                                : matcher.group("val"));
            }
        }
    
        // only capture SET
        // 如果只有SET个单词,operands为空,创建一个空的SetOperation
        if (operands.isEmpty()) {
            return new SetOperation();
        } else if (operands.size() == 2) {
            // 如果operands的大小为2,说明解析到了key和val或者quotedVal,创建SetOperation
            return new SetOperation(operands.get(0), operands.get(1));
        } else {
            // impossible
            // 其他情况,抛出异常
            throw new TableException(
                    String.format(
                            "Failed to convert the statement to SET operation: %s.", statement));
        }
    }
    

    CalciteParser

    对于标准的SQL语句,上一节的ExtendedParser不会去解析它。标准SQL的解析过程由CalciteParser负责。

    public SqlNode parse(String sql) {
        try {
            // 创建Sql解析器
            SqlParser parser = SqlParser.create(sql, config);
            // 解析statement
            return parser.parseStmt();
        } catch (SqlParseException e) {
            throw new SqlParserException("SQL parse failed. " + e.getMessage(), e);
        }
    }
    

    parserStmt方法将SQL解析成为语法树。例如select * from demo将被解析为一个SqlSelect对象。

    我们一路跟踪parser.parseStmt方法(这些方法都位于Calcite Core中):

    • parseStmt
    • parseQuery
    • parseSqlStmtEof

    它的底层是通过JavaCC生成的解析器去解析SQL。过程非常复杂,此处暂不分析。

    校验和转换

    SqlToOperationConverter.convert方法负责校验SQL语句,并将其转换为Operation

    public static Optional<Operation> convert(
            FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {
        // validate the query
        // 校验解析后的SQL语法树
        final SqlNode validated = flinkPlanner.validate(sqlNode);
        
        // 创建SqlToOperationConverter,它负责SQL转换
        SqlToOperationConverter converter =
                new SqlToOperationConverter(flinkPlanner, catalogManager);
        
        // 判断SqlNode的类型,采用不同的转换逻辑
        if (validated instanceof SqlCreateCatalog) {
            return Optional.of(converter.convertCreateCatalog((SqlCreateCatalog) validated));
        } else if (validated instanceof SqlDropCatalog) {
            return Optional.of(converter.convertDropCatalog((SqlDropCatalog) validated));
        } else if (validated instanceof SqlLoadModule) {
            return Optional.of(converter.convertLoadModule((SqlLoadModule) validated));
        } else if (validated instanceof SqlShowCatalogs) {
            return Optional.of(converter.convertShowCatalogs((SqlShowCatalogs) validated));
        } else if (validated instanceof SqlShowCurrentCatalog) {
            return Optional.of(
                    converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) validated));
        } else if (validated instanceof SqlShowModules) {
            return Optional.of(converter.convertShowModules((SqlShowModules) validated));
        } else if (validated instanceof SqlUnloadModule) {
            return Optional.of(converter.convertUnloadModule((SqlUnloadModule) validated));
        } else if (validated instanceof SqlUseCatalog) {
            return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated));
        } else if (validated instanceof SqlUseModules) {
            return Optional.of(converter.convertUseModules((SqlUseModules) validated));
        } else if (validated instanceof SqlCreateDatabase) {
            return Optional.of(converter.convertCreateDatabase((SqlCreateDatabase) validated));
        } else if (validated instanceof SqlDropDatabase) {
            return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated));
        } else if (validated instanceof SqlAlterDatabase) {
            return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated));
        } else if (validated instanceof SqlShowDatabases) {
            return Optional.of(converter.convertShowDatabases((SqlShowDatabases) validated));
        } else if (validated instanceof SqlShowCurrentDatabase) {
            return Optional.of(
                    converter.convertShowCurrentDatabase((SqlShowCurrentDatabase) validated));
        } else if (validated instanceof SqlUseDatabase) {
            return Optional.of(converter.convertUseDatabase((SqlUseDatabase) validated));
        } else if (validated instanceof SqlCreateTable) {
            return Optional.of(
                    converter.createTableConverter.convertCreateTable((SqlCreateTable) validated));
        } else if (validated instanceof SqlDropTable) {
            return Optional.of(converter.convertDropTable((SqlDropTable) validated));
        } else if (validated instanceof SqlAlterTable) {
            return Optional.of(converter.convertAlterTable((SqlAlterTable) validated));
        } else if (validated instanceof SqlShowTables) {
            return Optional.of(converter.convertShowTables((SqlShowTables) validated));
        } else if (validated instanceof SqlCreateView) {
            return Optional.of(converter.convertCreateView((SqlCreateView) validated));
        } else if (validated instanceof SqlDropView) {
            return Optional.of(converter.convertDropView((SqlDropView) validated));
        } else if (validated instanceof SqlAlterView) {
            return Optional.of(converter.convertAlterView((SqlAlterView) validated));
        } else if (validated instanceof SqlShowViews) {
            return Optional.of(converter.convertShowViews((SqlShowViews) validated));
        } else if (validated instanceof SqlCreateFunction) {
            return Optional.of(converter.convertCreateFunction((SqlCreateFunction) validated));
        } else if (validated instanceof SqlDropFunction) {
            return Optional.of(converter.convertDropFunction((SqlDropFunction) validated));
        } else if (validated instanceof SqlAlterFunction) {
            return Optional.of(converter.convertAlterFunction((SqlAlterFunction) validated));
        } else if (validated instanceof SqlShowCreateTable) {
            return Optional.of(converter.convertShowCreateTable((SqlShowCreateTable) validated));
        } else if (validated instanceof SqlShowFunctions) {
            return Optional.of(converter.convertShowFunctions((SqlShowFunctions) validated));
        } else if (validated instanceof SqlShowPartitions) {
            return Optional.of(converter.convertShowPartitions((SqlShowPartitions) validated));
        } else if (validated instanceof SqlRichExplain) {
            return Optional.of(converter.convertRichExplain((SqlRichExplain) validated));
        } else if (validated instanceof SqlRichDescribeTable) {
            return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated));
        } else if (validated instanceof SqlAddJar) {
            return Optional.of(converter.convertAddJar((SqlAddJar) validated));
        } else if (validated instanceof SqlRemoveJar) {
            return Optional.of(converter.convertRemoveJar((SqlRemoveJar) validated));
        } else if (validated instanceof SqlShowJars) {
            return Optional.of(converter.convertShowJars((SqlShowJars) validated));
        } else if (validated instanceof RichSqlInsert) {
            return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
        } else if (validated instanceof SqlBeginStatementSet) {
            return Optional.of(
                    converter.convertBeginStatementSet((SqlBeginStatementSet) validated));
        } else if (validated instanceof SqlEndStatementSet) {
            return Optional.of(converter.convertEndStatementSet((SqlEndStatementSet) validated));
        } else if (validated instanceof SqlSet) {
            return Optional.of(converter.convertSet((SqlSet) validated));
        } else if (validated instanceof SqlReset) {
            return Optional.of(converter.convertReset((SqlReset) validated));
        } else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
            // 对于查询语句
            return Optional.of(converter.convertSqlQuery(validated));
        } else {
            return Optional.empty();
        }
    }
    

    我们这里重点关注select语句执行的这条线,它调用的是:

    converter.convertSqlQuery(validated)
    

    该方法代码:

    private Operation convertSqlQuery(SqlNode node) {
        return toQueryOperation(flinkPlanner, node);
    }
    

    继续跟踪toQueryOperation方法:

    private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
        // transform to a relational tree
        // 转换为relational tree
        RelRoot relational = planner.rel(validated);
        return new PlannerQueryOperation(relational.rel);
    }
    

    该方法最终生成一个PlannerQueryOperation。将Calcite转换成的reletional tree包装在其中。

    接下来我们跟进其中的两个关键点:validate和转换为relation tree。

    validate

    validate步骤用来校验和重写SQL。

    我们跟踪上一节的flinkPlanner.validate方法:

    def validate(sqlNode: SqlNode): SqlNode = {
        // 创建校验器
        val validator = getOrCreateSqlValidator()
        // 校验SQL
        validate(sqlNode, validator)
    }
    
    private def validate(sqlNode: SqlNode, validator: FlinkCalciteSqlValidator): SqlNode = {
        try {
            // 校验rich SQL insert语句
            // rich SQL insert是一种带分区或overwrite的insert语句
            sqlNode.accept(new PreValidateReWriter(
                validator, typeFactory))
            // do extended validation.
            // 校验create table和create table like 语句
            sqlNode match {
                case node: ExtendedSqlNode =>
                node.validate()
                case _ =>
            }
            // no need to validate row type for DDL and insert nodes.
            // DDL和这些类型的语句无需校验
            if (sqlNode.getKind.belongsTo(SqlKind.DDL)
                || sqlNode.getKind == SqlKind.INSERT
                || sqlNode.getKind == SqlKind.CREATE_FUNCTION
                || sqlNode.getKind == SqlKind.DROP_FUNCTION
                || sqlNode.getKind == SqlKind.OTHER_DDL
                || sqlNode.isInstanceOf[SqlLoadModule]
                || sqlNode.isInstanceOf[SqlShowCatalogs]
                || sqlNode.isInstanceOf[SqlShowCurrentCatalog]
                || sqlNode.isInstanceOf[SqlShowDatabases]
                || sqlNode.isInstanceOf[SqlShowCurrentDatabase]
                || sqlNode.isInstanceOf[SqlShowTables]
                || sqlNode.isInstanceOf[SqlShowFunctions]
                || sqlNode.isInstanceOf[SqlShowJars]
                || sqlNode.isInstanceOf[SqlShowModules]
                || sqlNode.isInstanceOf[SqlShowViews]
                || sqlNode.isInstanceOf[SqlShowPartitions]
                || sqlNode.isInstanceOf[SqlRichDescribeTable]
                || sqlNode.isInstanceOf[SqlUnloadModule]
                || sqlNode.isInstanceOf[SqlUseModules]
                || sqlNode.isInstanceOf[SqlBeginStatementSet]
                || sqlNode.isInstanceOf[SqlEndStatementSet]
                || sqlNode.isInstanceOf[SqlSet]
                || sqlNode.isInstanceOf[SqlReset]) {
                return sqlNode
            }
            sqlNode match {
                // 校验SQL explain insert语句
                case richExplain: SqlRichExplain =>
                val validatedStatement = richExplain.getStatement match {
                    case insert: RichSqlInsert =>
                    val validatedSource = validator.validate(insert.getSource)
                    insert.setOperand(2, validatedSource)
                    insert
                    case others =>
                    validator.validate(others)
                }
                richExplain.setOperand(0, validatedStatement)
                richExplain
                // 其他情况,走通用validate逻辑
                case _ =>
                validator.validate(sqlNode)
            }
        }
        catch {
            case e: RuntimeException =>
            throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
        }
    }
    

    通用的validate流程较为复杂,本人后续博客再展开分析。

    生成Relational tree

    我们跟踪FlinkPlannerImplrel方法。SqlNode转换为Relation tree的过程由Calcite的SqlToRelConverter完成。

    def rel(validatedSqlNode: SqlNode): RelRoot = {
        rel(validatedSqlNode, getOrCreateSqlValidator())
    }
    
    private def rel(validatedSqlNode: SqlNode, sqlValidator: FlinkCalciteSqlValidator) = {
        try {
            assert(validatedSqlNode != null)
            // 创建出Rel转换器
            val sqlToRelConverter: SqlToRelConverter = createSqlToRelConverter(sqlValidator)
    
            // 由Calcite转换为Relation tree
            sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
            // we disable automatic flattening in order to let composite types pass without modification
            // we might enable it again once Calcite has better support for structured types
            // root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
    
            // TableEnvironment.optimize will execute the following
            // root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
            // convert time indicators
            // root = root.withRel(RelTimeIndicatorConverter.convert(root.rel, rexBuilder))
        } catch {
            case e: RelConversionException => throw new TableException(e.getMessage)
        }
    }
    

    执行

    execute阶段为执行SQL语句并返回结果。

    TableImplexecute方法执行SQL查询,返回一个TableResult对象。

    @Override
    public TableResult execute() {
        return tableEnvironment.executeInternal(getQueryOperation());
    }
    

    继续追踪TableEnvironmentImplexecuteInternal方法。这个方法非常长,总体逻辑是判断operation的类型,不同的operation类型执行不同的操作。例如DDL语句透过CatalogManager操作database或table的元数据。其中有一个片段如下所示,调用executeQueryOperation来执行select语句:

    @Override
    public TableResult executeInternal(Operation operation) {
        // ...
         else if (operation instanceof QueryOperation) {
            return executeQueryOperation((QueryOperation) operation);
        }
        // ...
    }
    

    我们展开分析这个方法。

    private TableResult executeQueryOperation(QueryOperation operation) {
        // 创建一个标识符
        final UnresolvedIdentifier unresolvedIdentifier =
                UnresolvedIdentifier.of(
                        "Unregistered_Collect_Sink_" + CollectModifyOperation.getUniqueId());
        final ObjectIdentifier objectIdentifier =
                catalogManager.qualifyIdentifier(unresolvedIdentifier);
    
        // 创建一个本地收集ModifyOperation结果的Operation
        CollectModifyOperation sinkOperation =
                new CollectModifyOperation(objectIdentifier, operation);
        
        // 将上一步的sinkOperation翻译为Flink的Transformation,后面分析
        List<Transformation<?>> transformations =
                translate(Collections.singletonList(sinkOperation));
        // 设置作业名称
        String jobName = getJobName("collect");
        // 根据transformation,生成StreamGraph
        Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);
        try {
            // 代表作业执行过程
            JobClient jobClient = execEnv.executeAsync(pipeline);
            // 用于帮助jobClient获取执行结果
            CollectResultProvider resultProvider = sinkOperation.getSelectResultProvider();
            resultProvider.setJobClient(jobClient);
            // 构建TableResultImpl对象
            return TableResultImpl.builder()
                    .jobClient(jobClient)
                    .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
                    .schema(operation.getResolvedSchema())
                    .data(resultProvider.getResultIterator())
                    .setPrintStyle(
                            TableResultImpl.PrintStyle.tableau(
                                    PrintUtils.MAX_COLUMN_WIDTH,
                                    PrintUtils.NULL_COLUMN,
                                    true,
                                    isStreamingMode))
                    .setSessionTimeZone(getConfig().getLocalTimeZone())
                    .build();
        } catch (Exception e) {
            throw new TableException("Failed to execute sql", e);
        }
    }
    

    最后我们以打印出select结果(调用print方法)为例,分析SQL执行结果的展示。

    TableResultImplprint方法:

    @Override
    public void print() {
        // 获取行迭代器
        Iterator<Row> it = collect();
        // 判断打印table的格式
        // 如果是Tableau风格
        if (printStyle instanceof TableauStyle) {
            // 按照tableau的风格配置打印,最大列宽,null列格式等
            int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth();
            String nullColumn = ((TableauStyle) printStyle).getNullColumn();
            boolean deriveColumnWidthByType =
                    ((TableauStyle) printStyle).isDeriveColumnWidthByType();
            boolean printRowKind = ((TableauStyle) printStyle).isPrintRowKind();
            PrintUtils.printAsTableauForm(
                    getResolvedSchema(),
                    it,
                    new PrintWriter(System.out),
                    maxColumnWidth,
                    nullColumn,
                    deriveColumnWidthByType,
                    printRowKind,
                    sessionTimeZone);
        } else if (printStyle instanceof RawContentStyle) {
            // 如果是原生样式
            while (it.hasNext()) {
                System.out.println(
                        String.join(
                                ",",
                                PrintUtils.rowToString(
                                        it.next(), getResolvedSchema(), sessionTimeZone)));
            }
        } else {
            throw new TableException("Unsupported print style: " + printStyle);
        }
    }
    

    translate

    translate方法包含将Operation转换为Transformation的逻辑。

    private List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
        return planner.translate(modifyOperations);
    }
    

    这一行调用了PlannerBasetranslate方法:

    override def translate(
        modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
        // 检查planner和运行模式是否和configuration匹配
        validateAndOverrideConfiguration()
        // 如果modifyOperations为空,返回一个空的Transformation集合
        if (modifyOperations.isEmpty) {
            return List.empty[Transformation[_]]
        }
    
        // 转换Operation为Calcite的relation expression(关系表达式)
        val relNodes = modifyOperations.map(translateToRel)
        // 优化关系表达式
        val optimizedRelNodes = optimize(relNodes)
        // 生成执行节点图
        val execGraph = translateToExecNodeGraph(optimizedRelNodes)
        // 将执行节点图转换为transformation
        val transformations = translateToPlan(execGraph)
        // translation步骤完毕后,清理内部的配置
        cleanupInternalConfigurations()
        transformations
    }
    

    translate步骤包含了从Operation获取关系表达式,优化,生成执行节点图和转换为Flink Transformation的步骤。这些步骤详细分析博主单独开篇分析。

    本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

    相关文章

      网友评论

        本文标题:Flink 源码之 SQL 执行流程

        本文链接:https://www.haomeiwen.com/subject/nrwsvltx.html