美文网首页Flink
Flink源码阅读(八)--- Flink SQL 整体执行流程

Flink源码阅读(八)--- Flink SQL 整体执行流程

作者: sj_91d7 | 来源:发表于2021-04-07 11:55 被阅读0次

    为了批流统一,Flink提供了两种关系型API,Table API和SQL。Table API是一种语言集成的查询API,由多个比如selection,filter,join关系operator组合而成。Flink SQL是基于Calcite来实现的。无论是在streaming还是在batch上,Table API和SQL具有相同的语义并且能够得到相同的结果。Table API、SQL以及DataStream API之间都能无缝集成,可以方便的使用。本篇文章主要介绍下Flink SQL的整体流程。本文内容是基于Flink 1.12来讲解。

    1. 整体执行流程介绍

    Flink SQL的执行一般分为四个阶段,主要依赖 Calcite 来完成

    1. Parse
         语法解析,Calcite使用JavaCC把用户提交的Sql String 转换成一个抽象语法树(AST),对应SqlNode节点。

    2. Validate
         语法校验,根据元数据信息进行验证,例如查询的表、使用的函数是否存在,会分别对 from / where / group by / having / select / order by 等子句进行validate ,校验之后仍然是 SqlNode 构成的语法树;

    3. Optimize
         查询计划优化,这里其实包含了两部分,1)首先将 SqlNode 语法树转换成关系表达式 RelNode 构成的逻辑树,2)然后使用优化器基于规则进行等价变换,例如我们比较熟悉的谓词下推、列裁剪等,经过优化器优化后得到最优的查询计划;

    4. Execute
         将逻辑查询计划翻译成物理执行计划,生成对应的可执行代码,提交运行。

    2. 源码分析

    Flink1.12 默认的Planner是Blink Planner,本篇文章也是基于Blink Planner来分析。

    为了方便讲解,咱们给个具体regular join例子来说明,将 Orders 订单表和 Shipments 运输单表依据订单id进行Regular Join,并且以name进行分组,然后对name进行orderby操作,最后取出name以及sum(price)

    SELECT name, SUM(price)
    FROM Orders o
        JOIN Shipments s ON o.id = s.orderId
    GROUP BY name
    HAVING SUM(o.price) > 10
    ORDER BY name
    

    2.1 Parse阶段

    源码入口TableEnvironment#sqlQuery方法,会调用实现类TableEnvironmentImpl#sqlQuery方法,看下该方法源码

        @Override
        public Table sqlQuery(String query) {
            List<Operation> operations = parser.parse(query);
    
            if (operations.size() != 1) {
                throw new ValidationException(
                        "Unsupported SQL query! sqlQuery() only accepts a single SQL query.");
            }
    
            Operation operation = operations.get(0);
    
            if (operation instanceof QueryOperation && !(operation instanceof ModifyOperation)) {
                return createTable((QueryOperation) operation);
            } else {
                throw new ValidationException(
                        "Unsupported SQL query! sqlQuery() only accepts a single SQL query of type "
                                + "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.");
            }
        }
    

    然后看下parser.parse(query) --> ParserImpl#parse

        @Override
        public List<Operation> parse(String statement) {
            CalciteParser parser = calciteParserSupplier.get();
            FlinkPlannerImpl planner = validatorSupplier.get();
            // parse the sql query
            SqlNode parsed = parser.parse(statement);
    
            Operation operation =
                    SqlToOperationConverter.convert(planner, catalogManager, parsed)
                            .orElseThrow(() -> new TableException("Unsupported query: " + statement));
            return Collections.singletonList(operation);
        }
    

    首先是基于Calcite的Parse解析,Calcite涉及的东西比较多,这里就不展开来说了,最终返回一个SqlNode节点。
    然后调用convert方法,在这个方法中,会对解析之后的SqlNode进行validate

    2.2 Validate阶段

    2.2.1 Scope背景知识
    • 首先给出SqlValidatorScope的继承关系,该图来自https://www.jianshu.com/p/83e88fdc04ec

      Scope继承关系.png
    • SqlValidatorScope是所有Scope的父接口,是做名字转换用的,代表在Parse Tree的位置。当验证如"foo"."bar"这个表达式的时候,会首先使用对应Scope实现的resolve方法来定位"foo",如果成功,会返回描述结果类型的SqlValidatorNamespace对象。

    • 这个概念可能有点抽象,咱们看下其中一个具体实现SelectScope的官方解释
    /**
     * The name-resolution scope of a SELECT clause. The objects visible are those
     * in the FROM clause, and objects inherited from the parent scope.
     *
     *
     * <p>This object is both a {@link SqlValidatorScope} and a
     * {@link SqlValidatorNamespace}. In the query</p>
     *
     * <blockquote>
     * <pre>SELECT name FROM (
     *     SELECT *
     *     FROM emp
     *     WHERE gender = 'F')</pre></blockquote>
     *
     * <p>we need to use the {@link SelectScope} as a
     * {@link SqlValidatorNamespace} when resolving 'name', and
     * as a {@link SqlValidatorScope} when resolving 'gender'.</p>
     *
     * <h2>Scopes</h2>
     *
     * <p>In the query</p>
     *
     * <blockquote>
     * <pre>
     * SELECT expr1
     * FROM t1,
     *     t2,
     *     (SELECT expr2 FROM t3) AS q3
     * WHERE c1 IN (SELECT expr3 FROM t4)
     * ORDER BY expr4</pre>
     * </blockquote>
     *
     * <p>The scopes available at various points of the query are as follows:</p>
     *
     * <ul>
     * <li>expr1 can see t1, t2, q3</li>
     * <li>expr2 can see t3</li>
     * <li>expr3 can see t4, t1, t2</li>
     * <li>expr4 can see t1, t2, q3, plus (depending upon the dialect) any aliases
     * defined in the SELECT clause</li>
     * </ul>
     *
     * <h2>Namespaces</h2>
     *
     * <p>In the above query, there are 4 namespaces:</p>
     *
     * <ul>
     * <li>t1</li>
     * <li>t2</li>
     * <li>(SELECT expr2 FROM t3) AS q3</li>
     * <li>(SELECT expr3 FROM t4)</li>
     * </ul>
     *
     * @see SelectNamespace
     */
    
    • 简单理解一下,scope直接翻译就是范围,表示SQL不同部分可以看到的数据源,类似于Java的作用域。Scope中重要的方法包括 1. 获取Scope的root SqlNode :SqlNode getNode(); 2. 获取Scope中所有的columns: void findAllColumnNames(List<SqlMoniker> result); 3. 获取Scope中所有的table aliases:void findAliases(Collection<SqlMoniker> result); 4. 将field转成fully-qualified identifier的方法: SqlQualified fullyQualify(SqlIdentifier identifier); 等
    2.2.2 Namespace背景知识
    • 与Scope类似,SqlValidatorNamespace是所有Namespace的父接口,一个Namespace描述了一个SQL查询的关系。
      比如1,对于 SELECT emp.deptno, age FROM emp,dept 这样一个查询,from子句代表了一个Namespace,这个Namespace由emp和dept两个table组成,row type由这两个emp、dept table的column组成。
      另外一个例子2,如果一个查询的from子句中,包含一个table和一个sub-query,那该Namespace就包含了该table的columns和sub-query的select中的columns字段。

    • SqlValidatorNamespace也有多种实现,比如table name对应IdentifierNamespace,SELECT queries对应SelectNamespace,UNION / EXCEPT / INTERSECT对应SetopNamespace等

    • 简单理解一下,namespace表示scope中的一个数据源(数据源是一个逻辑概念,可以是table,field或子查询),一个scope中可以有多个namespace。

    2.2.3 Validate源码分析

    源码入口 ParserImpl#parse --> SqlToOperationConverter#convert --> FlinkPlannerImpl#validate --> SqlValidatorImpl#validate 方法

      public SqlNode validate(SqlNode topNode) {
        SqlValidatorScope scope = new EmptyScope(this);
        scope = new CatalogScope(scope, ImmutableList.of("CATALOG"));
        final SqlNode topNode2 = validateScopedExpression(topNode, scope);
        final RelDataType type = getValidatedNodeType(topNode2);
        Util.discard(type);
        return topNode2;
      }
    

    该方法主要的逻辑都在validateScopedExpression中,处理流程如下:

    1. performUnconditionalRewrites
         - 对表达式进行重写使其更标准化,简化接下来的validate逻辑。
    SELECT name, SUM(price)
    FROM Orders o
        JOIN Shipments s ON o.id = s.orderId
    GROUP BY name
    HAVING SUM(o.price) > 10
    ORDER BY name
    

    下面针对这个例子具体调试跟下源码

    • 在经过Calcite parse之后,得到的SqlNode是一个SqlOrderBy实例,并且join --> inner join


      parse之后的SqlNode节点.png
    • 然后看下performUnconditionalRewrites方法
      上面提到parse之后,SqlNode是一个SqlOrderBy实例,performUnconditionalRewrites方法首先会先拿到SqlOrderBy对应的OperandList,具体都有哪些Operands,可以看下SqlOrderBy#getOperandList,对应四个Operands: query, orderList, offset, fetch。(note:不同SqlNode实例,有不同的getOperandList实现)

    1. query: 是个SqlSelect实例
    SELECT `name`, SUM(`price`)
    FROM `Orders` AS `o`
    INNER JOIN `Shipments` AS `s` ON `o`.`id` = `s`.`orderId`
    GROUP BY `name`
    HAVING SUM(`o`.`price`) > 10
    
    2. orderList:是个SqlNodeList实例,对应 name
    3. offset: null
    4. fetch: null
    

    然后performUnconditionalRewrites方法会对每个Operand递归的执行performUnconditionalRewrites。

    如果operand对应的SqlKind是 VALUES / ORDER_BY / EXPLICIT_TABLE / DELETE / UPDATE / MERGE , performUnconditionalRewrites会对其进行rewrite。

    performUnconditionalRewrites方法执行之后,SqlNode从最开始的SqlOrderBy实例,变成了SqlSelect实例


    performUnconditionalRewrites方法执行前后节点.png

       - performUnconditionalRewrites方法,把SqlOrderBy节点的orderList, offset, fetch这三个operand赋值给SqlSelect对象,然后直接把SqlSelect返回。

    1. registerQuery
         - 创建scope以及namespace,namespace和scope对象中都会包含SqlNode。

    上面例子执行完registerQuery方法,最终FlinkCalciteSqlValidator对象为

    创建Scope和namespace之后的validate.png

      从图中可以看出,测试sql共有两个scopes和4个namespaces

    1. validateQuery
         - 首先根据node和scope来获取namespace
         - 对namespace进行validate,validateNamespace(ns, targetRowType);
         - 最后通过调用validateSelect来进行SQL验证

    调用关系链如下图所示:

    validateQuery方法调用链.png

    validateSelect处理分为8个部分:

    • validateFrom

       - 在调用validateFrom之前,首先会去校验from后面的names有没有重复table name,如果有重复,直接报错。如果没有from子句,也会直接报错。

      /**
       * Validates the FROM clause of a query, or (recursively) a child node of
       * the FROM clause: AS, OVER, JOIN, VALUES, or sub-query.
       *
       * @param node          Node in FROM clause, typically a table or derived
       *                      table
       * @param targetRowType Desired row type of this expression, or
       *                      {@link #unknownType} if not fussy. Must not be null.
       * @param scope         Scope
       */
      protected void validateFrom(
          SqlNode node,
          RelDataType targetRowType,
          SqlValidatorScope scope) {
        Objects.requireNonNull(targetRowType);
        switch (node.getKind()) {
        case AS:
        case TABLE_REF:
          validateFrom(
              ((SqlCall) node).operand(0),
              targetRowType,
              scope);
          break;
        case VALUES:
          validateValues((SqlCall) node, targetRowType, scope);
          break;
        case JOIN:
          validateJoin((SqlJoin) node, scope);
          break;
        case OVER:
          validateOver((SqlCall) node, scope);
          break;
        case UNNEST:
          validateUnnest((SqlCall) node, scope, targetRowType);
          break;
        default:
          validateQuery(node, scope, targetRowType);
          break;
        }
    
        // Validate the namespace representation of the node, just in case the
        // validation did not occur implicitly.
        getNamespace(node, scope).validate(targetRowType);
      }
    

       对于我们的例子,会先调用validateJoin
          - validateJoin方法中,又包括了validateFrom(left, unknownType, joinScope); 和 validateFrom(right, unknownType, joinScope); 这两个方法首先完成的功能是验证,其次会把left节点和right节点的名字补齐,即full qualified name,比如

    `Orders` AS `o` --> `default_catalog`.`default_database`.`Orders` AS `o`
    

          - validateJoin会去校验是等值连接还是使用on关键字。如果使用on关键字,那on的条件是否为空,如果为空直接抛异常。
       - 验证from中的table是否存在,将sqlNode name扩展为full qualified name并得到数据源的数据类型可以理解该数据源所有field type组成的row type。

    • validateWhereClause:
         - 验证where条件中的字段是否存在,如果不存在就返回;
         - 扩展field name为full qualified name;
         - 验证where clause中是否存在aggregate function,如果where clause中存在 Windowed aggregate 表达式 或者 Aggregate表达式,就直接报错。
         - 验证where clause的类型是否是boolean类型,如果不是,直接报错。

    • validateGroupClause
         - 验证group by field是否存在,如果不存在就返回;
         - 如果存在就扩展该field name
         - 验证group by中是否存在aggregate function,如果group by中存在 Windowed aggregate 表达式 或者 Aggregate表达式,就直接报错。

    • validateHavingClause
         - 验证having field是否存在,如果不存在就返回;
         - 扩展having中的field name
         - 验证having中不在聚合函数中的field是否和group by中的field相同;
         - 验证having中的field是否存在;
         - 验证having clause的类型是否是boolean类型,如果不是,直接报错。
         - 验证having clause中是否有嵌套Aggregate expressions。对于非窗口agg(expr),expr中不能包含嵌套aggregate function,比如 SUM(2 * MAX(x)) 就是非法的;如果是windowed aggregate "agg(expr)",expr可以包含aggregate function,比如下面的例子就是合法的。

        SELECT AVG(2 * MAX(x)) OVER (PARTITION BY y)
        FROM t
        GROUP BY y
    
    • validateWindowClause:暂略

    • handleOffsetFetch: 暂略

    • validateSelectList
         - 验证projection list中的aliases是否是唯一的,对于 * 和 TABLE.* 不做该检查
         - 如果select clause中包含 * 或 TABLE.* ,对字段进行补齐
         - 扩展field name;并且抽取aliases,使用selectItems(List<SqlNode>)和 aliases(Set<String>)分别存储field的node信息和别名;然后获得field数据类型,组成键值对<alias, type> 放入 List<Map.Entry<String, RelDataType>> fields 对象中。
         - 验证projection list中的field是否存在
         - 判断projection list中不在聚合函数中的field是否和group by的field相同

    • validateOrderList
         - 验证order by field是否存在,如果不存在就返回;
         - 如果order by field不在select projection list中,扩展field name,否则不进行扩展;
         - 验证order by中field是否是group by的field子集,如果出现不在group by中的field,直接报错。

    到这里,validate验证阶段基本就完成了。

    2.3 Optimize阶段

    2.3.1 Convert转换

    convert阶段主要的工作是把 SqlNode 语法树转换成关系表达式 RelNode 构成的逻辑树,也就是生成相应的逻辑计划(Logical Plan),然后返回根结点 RelRoot。
       即:语义分析,根据 SqlNode及元信息构建 RelNode 树,也就是最初版本的逻辑计划(Logical Plan),根据这个已经生成的Flink的logical Plan,将它转换成calcite的logicalPlan,这样我们才能用到calcite强大的优化规则。

    SqlNode有很多种,既包括MIN、MAX这种表达式型的,也包括SELECT、JOIN这种关系型的,转化过程中,将这两种分离成RexNode表达式型和RelNode关系型。

    介绍下 RelRoot 出现的必要性,这里说两个场景,都需要借助 RelRoot 来解决。
    场景1:

    SELECT name
    FROM emp
    ORDER BY empno DESC
    

    Calcite知道结果必须sort排序,但不能将其排序顺序表示为排序法,因为empno并不在结果字段中,这种场景我们需要使用RelRoot这么表达

    RelRoot: {
        rel: Sort($1 DESC)
               Project(name, empno)
                 TableScan(EMP)
        fields: [0]
        collation: [1 DESC]
      }
    

    可以看到,empno字段会出现在结果字段中,只是 fields 标识给consumer暴露什么字段。

    场景2:

    SELECT name AS n, name AS n2, empno AS n FROM emp
    

    这里重复使用了name字段,并且有多列的别名都叫 n ,这种场景应该使用RelRoot这么表达

    RelRoot: {
        rel: Project(name, empno)
               TableScan(EMP)
        fields: [(0, "n"), (0, "n2"), (1, "n")]
        collation: []
      }
    
    2.3.1.1 源码入口:SqlToRelConverter#convertQuery
     public RelRoot convertQuery(
          SqlNode query,
          final boolean needsValidation,
          final boolean top) {
        if (needsValidation) {
          query = validator.validate(query);
        }
    
        RelNode result = convertQueryRecursive(query, top, null).rel;
    

    可以看出在convertQueryRecursive采取了递归遍历的方式来解析query。对于给定的例子,会去调用 convertSelect --> convertSelectImpl

     /**
       * Implementation of {@link #convertSelect(SqlSelect, boolean)};
       * derived class may override.
       */
      protected void convertSelectImpl(
          final Blackboard bb,
          SqlSelect select) {
        convertFrom(
            bb,
            select.getFrom());
        convertWhere(
            bb,
            select.getWhere());
    
        final List<SqlNode> orderExprList = new ArrayList<>();
        final List<RelFieldCollation> collationList = new ArrayList<>();
        gatherOrderExprs(
            bb,
            select,
            select.getOrderList(),
            orderExprList,
            collationList);
        final RelCollation collation =
            cluster.traitSet().canonize(RelCollations.of(collationList));
    
        if (validator.isAggregate(select)) {
          convertAgg(
              bb,
              select,
              orderExprList);
        } else {
          convertSelectList(
              bb,
              select,
              orderExprList);
        }
    
        if (select.isDistinct()) {
          distinctify(bb, true);
        }
    
        convertOrder(
            select, bb, collation, orderExprList, select.getOffset(),
            select.getFetch());
    
        if (select.hasHints()) {
          final List<RelHint> hints = SqlUtil.getRelHint(hintStrategies, select.getHints());
          // Attach the hints to the first Hintable node we found from the root node.
          bb.setRoot(bb.root
              .accept(
                  new RelShuttleImpl() {
                    boolean attached = false;
                    @Override public RelNode visitChild(RelNode parent, int i, RelNode child) {
                      if (parent instanceof Hintable && !attached) {
                        attached = true;
                        return ((Hintable) parent).attachHints(hints);
                      } else {
                        return super.visitChild(parent, i, child);
                      }
                    }
                  }), true);
        } else {
          bb.setRoot(bb.root, true);
        }
      }
    

    也就是在对select这种SqlNode进行convert的时候,会分别对from,where,aggregate,select,order等进行转换,最终生成的逻辑计划如下:

    == Abstract Syntax Tree ==
    LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])
    +- LogicalFilter(condition=[>($1, 10)])
       +- LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
          +- LogicalProject(name=[$1], price=[$2])
             +- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
                :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
                +- LogicalTableScan(table=[[default_catalog, default_database, Shipments]])
    

    转成一个关系树之后:

    1. 首先 会生成一个operation,对应代码 SqlToOperationConverter#toQueryOperation。operation包含两个成员变量,分别是RelNode calciteTree(就是刚才convert之后的RelNode)和 TableSchema tableSchema(包含字段名,字段类型 列表,对于本文例子这里是select之后到的VARCHAR(2147483647) name, FLOAT EXPR$1),在tableSchema的build方法中,还会对field name是否重复进行validate,以及验证watermark对应的rowtime属性类型的root type是否是TIMESTAMP_WITHOUT_TIME_ZONE。
    2. 根据上面生成的operation,生成一个Table。table包含的成员变量主要包括QueryOperation,TableEnvironmentInternal以及OperationTreeBuilder和LookupCallResolver。
    2.3.2 Optimize

    // TODO,sql优化的东西有点多,这块内容后面再补充

    2.4 Execute阶段

    2.4.1 转换成ExecNode

    入口 PlannerBase#translateToExecNodePlan,完成FlinkPhysicalRel DAG 到 ExecNode DAG的转换,并且尝试reuse重复的sub-plans。
    // TODO,源码过程尽量补充下

    2.4.2 转换成Transformation DAG

    相关文章

      网友评论

        本文标题:Flink源码阅读(八)--- Flink SQL 整体执行流程

        本文链接:https://www.haomeiwen.com/subject/nhgecltx.html