Flink SQL 解析流程

作者: edd72e125c98 | 来源:发表于2018-01-12 09:22 被阅读5250次

Flink Sql总体架构

execution flow

Table API 和 SQL 如何创建calcite logical Plan (parsing)

对比两者

两者略有不同

  • SQL:SQL query 会经过 Calcite 解析器转变成 SQL 节点树,通过validate验证sql语法(结合catalog)。最后构建成 Calcite 的抽象语法树(也就是图中的 Logical Plan)

  • Table API -调用Table API 实际上是创建了很多 Table API 的 LogicalNode (生成了table API的抽象语法树),创建的过程中对会对整个query进行validate。LogicalNode 比如table是CalalogNode,window groupBy之后在select时会创建WindowAggregate和Project,where对应Filter。然后用calcite.RelBuilder翻译成Calcite LogicalPlan。如果是SQL API 将直接用Calcite的Parser进行解释然后validate生成Calcite LogicalPlan。

sql完全依靠calcite(sql parser)去做语法解析,validate后生成calcite logical plan. 而Table API先自己生成table API的logical plan,再通过calcite relbuilder translation成calcite logical plan。

Optimization And Conversion to runtime flink program

image.png

使用calcite cost-based optimizor 进行优化。也就是说和spark不同, flink 的SQL Parsing, Analysing, Optimizing都是托管给calcite(flink会加入一些optimze rules). Calcite 会基于优化规则来优化这些 Logical Plan,根据运行环境的不同会应用不同的优化规则(Flink提供了批的优化规则,和流的优化规则)。这里的优化规则分为两类,一类是Calcite提供的内置优化规则(如条件下推,剪枝等),再基于flink定制的一些优化rules(根据是streaming还是batch选择rulue)去优化logical Plan。

生成phsyical plan,基于flink里头的rules生成了DataStream Plan(Physical Plan)

将物Physical Plan转成Flink ExecutionPlan, 其通过调用相应的tanslateToPlan()转换和利用CodeGen成Flink的各种算子。

CodeGen 出的Function以字符串的形式存在。在提交任务后会分发到各个 > TaskManager 中运行,在运行时会使用 [Janino](http://janino-> compiler.github.io/janino/) 编译器编译代码后运行。

逻辑和spark类似,只不过calcite做了catalyst的事(sql parsing,analysis和optimizing)。

TableEnvironment

TableEnvironment.sql() - 负责sql的parse和验证

def sql(query: String): Table = {
   val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
   // parse the sql query, 创建sqlNode组成的AST
   val parsed = planner.parse(query)
   // validate the sql query
   val validated = planner.validate(parsed)
   // transform to a relational tree , 从纯AST 语法树转换成 relation expression tress,为optimze做准备
   val relational = planner.rel(validated)

   new Table(this, LogicalRelNode(relational.rel))
 }

Table.writeToSink()

这个方法中主要调用了StreamTableEnvironment的optimze()和translate()两个方法。 负责在sql()中得到的AST tress的Optimize和生产physical plan的过程。

override private[flink] def writeToSink[T](
      table: Table,
      sink: TableSink[T],
      queryConfig: QueryConfig): Unit = {

   case retractSink: RetractStreamTableSink[_] =>
        // retraction sink can always be used
        val outputType = sink.getOutputType
        // translate the Table into a DataStream and provide the type that the TableSink expects.
        val result: DataStream[T] =
          translate(
            table,
            streamQueryConfig,
            updatesAsRetraction = true,
            withChangeFlag = true)(outputType)
        // Give the DataStream to the TableSink to emit it.
        retractSink.asInstanceOf[RetractStreamTableSink[Any]]
          .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
}

StreamTableEnvirnoment

SteamTableEnvironment.optimize()

FlinkRelNode三个children, FlinkLogicalRel, DataStreamRel. DataSetRel
FlinkLogicalRel 对应生产的logical plan node
DataStreamRel对应data stream physical plan
DataSetRel 对应dataset physical plan

DataStreamGroupAggregateRule中

Stream

 // 1. decorrelate
// 转化掉correlated expressions, aka correlated subquery,subquery use Where clause expression refer to variable of table from outer query 
    val decorPlan = RelDecorrelator.decorrelateQuery(relNode) 

 // 2. convert time indicators
    // 转换time的标识符,比如存在rowtime标识的话,我们将会引入TimeMaterializationSqlFunction operator, 
    //这个operator我们会在codeGen中会用到 
    val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder) 

 // 3. normalize the logical plan
    val normRuleSet = getNormRuleSet
    val normalizedPlan = if (normRuleSet.iterator().hasNext) { 
      runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet) 
    } else { 
      convPlan
    } 

 // 4. Optimize the logical Flink plan
    // 优化逻辑计划,做逻辑上的优化(如filter, projection push down)。同时将node转换成派生于FlinkLogicalRel的节点 
  // define logcial plan rules
    val logicalOptRuleSet = getLogicalOptRuleSet
    //用FlinkConventions.LOGICAL替换traitSet, 作为target output traits
    val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify() 
    val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) { 
      runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps) 
    } else { 
      normalizedPlan
    } 


    // 将optimizd logical plan 转换为physical plan,同时将 节点转换成派生于DataStreamRel的节点 
    val physicalOptRuleSet = getPhysicalOptRuleSet
    val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify() 
    val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) { 
      runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps) 
    } else { 
      logicalPlan
    }

FlinkRuleSets定义了很多rule, 用于在Optimize过程中

DATASTREAM_NORM_RULES: 把一些节点进行normalized转换
LOGICAL_OPT_RULES: logical plan优化rules, 里面包含都是sql语句相关的优化(下推,剪裁), 同时做转换:SQLRel -》FlinkLogicalRel
DATASTREAM_OPT_RULES: 把logical plan node转换成datastream physical
plan的rule, FlinkLogicalRel -> DataStreamRel。其中的rules都继承自calcite ConvertRule, 而convertRule:> RelOptRule

ConvertRule

  • Abstract base class for a rule which converts from one calling convention to another without changing semantics.

ConvertRule有很多继承类(比如DATASTREAM_OPT_RULES中所有,以及LOGICAL_OPT_RULES中转换flinkLogicalPlan的rules), 都实现了其convert()接口,convert()会在onMatch中调用。

ConverterRule.onMatch(RelOptRuleCall call)
relOptRule提供了onMatch(RelOptRuleCall call) 被ConverterRule继承, 目的是当matchs() return true后被通知调用

public void onMatch(RelOptRuleCall call) {
 // rel 是RelOptRuleCall中rels[0] , 代表被convert的original rel
    RelNode rel = call.rel(0);
// 如果包含此trait,
    if (rel.getTraitSet().contains(inTrait)) {
      final RelNode converted = convert(rel);
      if (converted != null) {
        call.transformTo(converted);
      }
    }
  }

TODO LOGICAL_OPT_RULES中实现flinkLogicalPlan的rule不光实现了convert()还实现了matchs()
convert()可以用debug走一遍, 看看inTrait到底是什么

Normalize Rules

    /**
    * RuleSet to normalize plans for stream / DataStream execution
    */
  val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList(
    // Transform window to LogicalWindowAggregate
    DataStreamLogicalWindowAggregateRule.INSTANCE,
    WindowStartEndPropertiesRule.INSTANCE,

    // simplify expressions rules
    // 这3个rule都是从3种expresion中移除constats(用RexLiteral替换)和多余的cast,
   //(如果cast(cast date),inner的input和outer的output类型相同,就能删掉)
    ReduceExpressionsRule.FILTER_INSTANCE,
    ReduceExpressionsRule.PROJECT_INSTANCE,
    ReduceExpressionsRule.CALC_INSTANCE,

 //如果`org.apache.calcite.rel.core.Project`中包含over的expression(Projec中包含多个expressions.类似select多个column), 那么就拆分为一个logicalProject和一个LogicalWindow(分离为windowed agg和不相干的部分)
    ProjectToWindowRule.PROJECT
  )

LogicalCalc - A relational expression which computes project expressions and also filters. 它包含了{@link LogicalProject} and {@link LogicalFilter}的功能.

val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(

// calcite内部的一些默认优化
    // convert a logical table scan to a relational expression
    TableScanRule.INSTANCE,
    EnumerableToLogicalTableScan.INSTANCE,

    // push a filter into a join
    FilterJoinRule.FILTER_ON_JOIN,
    // push filter into the children of a join
    FilterJoinRule.JOIN,
    // push filter through an aggregation
    FilterAggregateTransposeRule.INSTANCE,

    还有很多....................................................
    
 
// flink提供的优化, project和filter下推到表scan
    // scan optimization
    PushProjectIntoTableSourceScanRule.INSTANCE,
    PushFilterIntoTableSourceScanRule.INSTANCE,

    // Unnest rule
    LogicalUnnestRule.INSTANCE,

    // translate to flink logical rel nodes
    FlinkLogicalAggregate.CONVERTER,
    FlinkLogicalWindowAggregate.CONVERTER,
    FlinkLogicalOverWindow.CONVERTER,
    FlinkLogicalCalc.CONVERTER,
    FlinkLogicalCorrelate.CONVERTER,
    FlinkLogicalIntersect.CONVERTER,
    FlinkLogicalJoin.CONVERTER,
    FlinkLogicalMinus.CONVERTER,
    FlinkLogicalSort.CONVERTER,
    FlinkLogicalUnion.CONVERTER,
    FlinkLogicalValues.CONVERTER,
    FlinkLogicalTableSourceScan.CONVERTER,
    FlinkLogicalTableFunctionScan.CONVERTER,
    FlinkLogicalNativeTableScan.CONVERTER
  )

DATASTREAM_OPT_RULES 略, convertRule, 用于转换logical plan到DataStreamRel

DATASTREAM_DECO_RULES
/**
* RuleSet to decorate plans for stream / DataStream execution
转化为三种output表, 实现都是RelOptRule的实现
*/
val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList(
// retraction rules
DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE,
DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE,
DataStreamRetractionRules.ACCMODE_INSTANCE
)

SteamTableEnvironment.translate()

translate()在TableEnvironment.writeToSink中被call,主要逻辑是将DataStreamRel转换成flink可执行的代码(DataStream)

 protected def translate[A](
      logicalPlan: RelNode,
      logicalType: RelDataType,
      queryConfig: StreamQueryConfig,
      withChangeFlag: Boolean)
      (implicit tpe: TypeInformation[A]): DataStream[A] = {

    // if no change flags are requested, verify table is an insert-only (append-only) table.
    if (!withChangeFlag && !isAppendOnly(logicalPlan)) {
      throw new TableException(
        "Table is not an append-only table. " +
        "Use the toRetractStream() in order to handle add and retract messages.")
    }

    // 这个方法是转换核心,其实是调用各个DataStreamRel的implementation的tranlateToPlan
    // get CRow plan
    val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)

    // convert CRow to output type
    val conversion = if (withChangeFlag) {
      getConversionMapperWithChanges(
        plan.getType,
        new RowSchema(logicalType),
        tpe,
        "DataStreamSinkConversion")
    } else {
      getConversionMapper(
        plan.getType,
        new RowSchema(logicalType),
        tpe,
        "DataStreamSinkConversion")
    }

    val rootParallelism = plan.getParallelism

    conversion match {
      case mapFunction: MapFunction[CRow, A] =>
        plan.map(mapFunction)
          .returns(tpe)
          .name(s"to: ${tpe.getTypeClass.getSimpleName}")
          .setParallelism(rootParallelism)
    }
  }

自己的一些笔记(请忽略):
HINT:
flink-table下有org.apache.flink.table.plan.nodes.datastream和org.apache.flink.table.plan.nodes.dataset两个包, 下面放着很多继承calcite XXXnode的class。

LogicalNode 貌似是TableAPI中 flink自身AST中的node,其中有toRelNode方法用于转化成calcite AST

DataStreamGroupWindowAggregate
AggregateUtil.createDataStreamAggregateFunction

相关文章

网友评论

    本文标题:Flink SQL 解析流程

    本文链接:https://www.haomeiwen.com/subject/krxcpxtx.html