Flink SQL 解析流程

作者: edd72e125c98 | 来源:发表于2018-01-12 09:22 被阅读5250次

    Flink Sql总体架构

    execution flow

    Table API 和 SQL 如何创建calcite logical Plan (parsing)

    对比两者

    两者略有不同

    • SQL:SQL query 会经过 Calcite 解析器转变成 SQL 节点树,通过validate验证sql语法(结合catalog)。最后构建成 Calcite 的抽象语法树(也就是图中的 Logical Plan)

    • Table API -调用Table API 实际上是创建了很多 Table API 的 LogicalNode (生成了table API的抽象语法树),创建的过程中对会对整个query进行validate。LogicalNode 比如table是CalalogNode,window groupBy之后在select时会创建WindowAggregate和Project,where对应Filter。然后用calcite.RelBuilder翻译成Calcite LogicalPlan。如果是SQL API 将直接用Calcite的Parser进行解释然后validate生成Calcite LogicalPlan。

    sql完全依靠calcite(sql parser)去做语法解析,validate后生成calcite logical plan. 而Table API先自己生成table API的logical plan,再通过calcite relbuilder translation成calcite logical plan。

    Optimization And Conversion to runtime flink program

    image.png

    使用calcite cost-based optimizor 进行优化。也就是说和spark不同, flink 的SQL Parsing, Analysing, Optimizing都是托管给calcite(flink会加入一些optimze rules). Calcite 会基于优化规则来优化这些 Logical Plan,根据运行环境的不同会应用不同的优化规则(Flink提供了批的优化规则,和流的优化规则)。这里的优化规则分为两类,一类是Calcite提供的内置优化规则(如条件下推,剪枝等),再基于flink定制的一些优化rules(根据是streaming还是batch选择rulue)去优化logical Plan。

    生成phsyical plan,基于flink里头的rules生成了DataStream Plan(Physical Plan)

    将物Physical Plan转成Flink ExecutionPlan, 其通过调用相应的tanslateToPlan()转换和利用CodeGen成Flink的各种算子。

    CodeGen 出的Function以字符串的形式存在。在提交任务后会分发到各个 > TaskManager 中运行,在运行时会使用 [Janino](http://janino-> compiler.github.io/janino/) 编译器编译代码后运行。

    逻辑和spark类似,只不过calcite做了catalyst的事(sql parsing,analysis和optimizing)。

    TableEnvironment

    TableEnvironment.sql() - 负责sql的parse和验证

    def sql(query: String): Table = {
       val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
       // parse the sql query, 创建sqlNode组成的AST
       val parsed = planner.parse(query)
       // validate the sql query
       val validated = planner.validate(parsed)
       // transform to a relational tree , 从纯AST 语法树转换成 relation expression tress,为optimze做准备
       val relational = planner.rel(validated)
    
       new Table(this, LogicalRelNode(relational.rel))
     }
    

    Table.writeToSink()

    这个方法中主要调用了StreamTableEnvironment的optimze()和translate()两个方法。 负责在sql()中得到的AST tress的Optimize和生产physical plan的过程。

    override private[flink] def writeToSink[T](
          table: Table,
          sink: TableSink[T],
          queryConfig: QueryConfig): Unit = {
    
       case retractSink: RetractStreamTableSink[_] =>
            // retraction sink can always be used
            val outputType = sink.getOutputType
            // translate the Table into a DataStream and provide the type that the TableSink expects.
            val result: DataStream[T] =
              translate(
                table,
                streamQueryConfig,
                updatesAsRetraction = true,
                withChangeFlag = true)(outputType)
            // Give the DataStream to the TableSink to emit it.
            retractSink.asInstanceOf[RetractStreamTableSink[Any]]
              .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
    }
    

    StreamTableEnvirnoment

    SteamTableEnvironment.optimize()

    FlinkRelNode三个children, FlinkLogicalRel, DataStreamRel. DataSetRel
    FlinkLogicalRel 对应生产的logical plan node
    DataStreamRel对应data stream physical plan
    DataSetRel 对应dataset physical plan

    DataStreamGroupAggregateRule中

    Stream

     // 1. decorrelate
    // 转化掉correlated expressions, aka correlated subquery,subquery use Where clause expression refer to variable of table from outer query 
        val decorPlan = RelDecorrelator.decorrelateQuery(relNode) 
    
     // 2. convert time indicators
        // 转换time的标识符,比如存在rowtime标识的话,我们将会引入TimeMaterializationSqlFunction operator, 
        //这个operator我们会在codeGen中会用到 
        val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder) 
    
     // 3. normalize the logical plan
        val normRuleSet = getNormRuleSet
        val normalizedPlan = if (normRuleSet.iterator().hasNext) { 
          runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet) 
        } else { 
          convPlan
        } 
    
     // 4. Optimize the logical Flink plan
        // 优化逻辑计划,做逻辑上的优化(如filter, projection push down)。同时将node转换成派生于FlinkLogicalRel的节点 
      // define logcial plan rules
        val logicalOptRuleSet = getLogicalOptRuleSet
        //用FlinkConventions.LOGICAL替换traitSet, 作为target output traits
        val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify() 
        val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) { 
          runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps) 
        } else { 
          normalizedPlan
        } 
    
    
        // 将optimizd logical plan 转换为physical plan,同时将 节点转换成派生于DataStreamRel的节点 
        val physicalOptRuleSet = getPhysicalOptRuleSet
        val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify() 
        val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) { 
          runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps) 
        } else { 
          logicalPlan
        }
    

    FlinkRuleSets定义了很多rule, 用于在Optimize过程中

    DATASTREAM_NORM_RULES: 把一些节点进行normalized转换
    LOGICAL_OPT_RULES: logical plan优化rules, 里面包含都是sql语句相关的优化(下推,剪裁), 同时做转换:SQLRel -》FlinkLogicalRel
    DATASTREAM_OPT_RULES: 把logical plan node转换成datastream physical
    plan的rule, FlinkLogicalRel -> DataStreamRel。其中的rules都继承自calcite ConvertRule, 而convertRule:> RelOptRule

    ConvertRule

    • Abstract base class for a rule which converts from one calling convention to another without changing semantics.

    ConvertRule有很多继承类(比如DATASTREAM_OPT_RULES中所有,以及LOGICAL_OPT_RULES中转换flinkLogicalPlan的rules), 都实现了其convert()接口,convert()会在onMatch中调用。

    ConverterRule.onMatch(RelOptRuleCall call)
    relOptRule提供了onMatch(RelOptRuleCall call) 被ConverterRule继承, 目的是当matchs() return true后被通知调用

    public void onMatch(RelOptRuleCall call) {
     // rel 是RelOptRuleCall中rels[0] , 代表被convert的original rel
        RelNode rel = call.rel(0);
    // 如果包含此trait,
        if (rel.getTraitSet().contains(inTrait)) {
          final RelNode converted = convert(rel);
          if (converted != null) {
            call.transformTo(converted);
          }
        }
      }
    

    TODO LOGICAL_OPT_RULES中实现flinkLogicalPlan的rule不光实现了convert()还实现了matchs()
    convert()可以用debug走一遍, 看看inTrait到底是什么

    Normalize Rules

        /**
        * RuleSet to normalize plans for stream / DataStream execution
        */
      val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList(
        // Transform window to LogicalWindowAggregate
        DataStreamLogicalWindowAggregateRule.INSTANCE,
        WindowStartEndPropertiesRule.INSTANCE,
    
        // simplify expressions rules
        // 这3个rule都是从3种expresion中移除constats(用RexLiteral替换)和多余的cast,
       //(如果cast(cast date),inner的input和outer的output类型相同,就能删掉)
        ReduceExpressionsRule.FILTER_INSTANCE,
        ReduceExpressionsRule.PROJECT_INSTANCE,
        ReduceExpressionsRule.CALC_INSTANCE,
    
     //如果`org.apache.calcite.rel.core.Project`中包含over的expression(Projec中包含多个expressions.类似select多个column), 那么就拆分为一个logicalProject和一个LogicalWindow(分离为windowed agg和不相干的部分)
        ProjectToWindowRule.PROJECT
      )
    

    LogicalCalc - A relational expression which computes project expressions and also filters. 它包含了{@link LogicalProject} and {@link LogicalFilter}的功能.

    val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
    
    // calcite内部的一些默认优化
        // convert a logical table scan to a relational expression
        TableScanRule.INSTANCE,
        EnumerableToLogicalTableScan.INSTANCE,
    
        // push a filter into a join
        FilterJoinRule.FILTER_ON_JOIN,
        // push filter into the children of a join
        FilterJoinRule.JOIN,
        // push filter through an aggregation
        FilterAggregateTransposeRule.INSTANCE,
    
        还有很多....................................................
        
     
    // flink提供的优化, project和filter下推到表scan
        // scan optimization
        PushProjectIntoTableSourceScanRule.INSTANCE,
        PushFilterIntoTableSourceScanRule.INSTANCE,
    
        // Unnest rule
        LogicalUnnestRule.INSTANCE,
    
        // translate to flink logical rel nodes
        FlinkLogicalAggregate.CONVERTER,
        FlinkLogicalWindowAggregate.CONVERTER,
        FlinkLogicalOverWindow.CONVERTER,
        FlinkLogicalCalc.CONVERTER,
        FlinkLogicalCorrelate.CONVERTER,
        FlinkLogicalIntersect.CONVERTER,
        FlinkLogicalJoin.CONVERTER,
        FlinkLogicalMinus.CONVERTER,
        FlinkLogicalSort.CONVERTER,
        FlinkLogicalUnion.CONVERTER,
        FlinkLogicalValues.CONVERTER,
        FlinkLogicalTableSourceScan.CONVERTER,
        FlinkLogicalTableFunctionScan.CONVERTER,
        FlinkLogicalNativeTableScan.CONVERTER
      )
    

    DATASTREAM_OPT_RULES 略, convertRule, 用于转换logical plan到DataStreamRel

    DATASTREAM_DECO_RULES
    /**
    * RuleSet to decorate plans for stream / DataStream execution
    转化为三种output表, 实现都是RelOptRule的实现
    */
    val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList(
    // retraction rules
    DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE,
    DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE,
    DataStreamRetractionRules.ACCMODE_INSTANCE
    )

    SteamTableEnvironment.translate()

    translate()在TableEnvironment.writeToSink中被call,主要逻辑是将DataStreamRel转换成flink可执行的代码(DataStream)

     protected def translate[A](
          logicalPlan: RelNode,
          logicalType: RelDataType,
          queryConfig: StreamQueryConfig,
          withChangeFlag: Boolean)
          (implicit tpe: TypeInformation[A]): DataStream[A] = {
    
        // if no change flags are requested, verify table is an insert-only (append-only) table.
        if (!withChangeFlag && !isAppendOnly(logicalPlan)) {
          throw new TableException(
            "Table is not an append-only table. " +
            "Use the toRetractStream() in order to handle add and retract messages.")
        }
    
        // 这个方法是转换核心,其实是调用各个DataStreamRel的implementation的tranlateToPlan
        // get CRow plan
        val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
    
        // convert CRow to output type
        val conversion = if (withChangeFlag) {
          getConversionMapperWithChanges(
            plan.getType,
            new RowSchema(logicalType),
            tpe,
            "DataStreamSinkConversion")
        } else {
          getConversionMapper(
            plan.getType,
            new RowSchema(logicalType),
            tpe,
            "DataStreamSinkConversion")
        }
    
        val rootParallelism = plan.getParallelism
    
        conversion match {
          case mapFunction: MapFunction[CRow, A] =>
            plan.map(mapFunction)
              .returns(tpe)
              .name(s"to: ${tpe.getTypeClass.getSimpleName}")
              .setParallelism(rootParallelism)
        }
      }
    

    自己的一些笔记(请忽略):
    HINT:
    flink-table下有org.apache.flink.table.plan.nodes.datastream和org.apache.flink.table.plan.nodes.dataset两个包, 下面放着很多继承calcite XXXnode的class。

    LogicalNode 貌似是TableAPI中 flink自身AST中的node,其中有toRelNode方法用于转化成calcite AST

    DataStreamGroupWindowAggregate
    AggregateUtil.createDataStreamAggregateFunction

    相关文章

      网友评论

        本文标题:Flink SQL 解析流程

        本文链接:https://www.haomeiwen.com/subject/krxcpxtx.html