美文网首页大数据领域精选
【Spark 精选】SQL 执行计划详解

【Spark 精选】SQL 执行计划详解

作者: 熊本极客 | 来源:发表于2023-07-05 21:24 被阅读0次

    1.Spark SQL 执行流程

    • Parser 阶段:使用 Antlr4 对 sql 语句进行词法和语法的解析
    • Analyzer 阶段:利用 Catalog 信息将 Unresolved Logical Plan 解析成 Analyzed Logical plan
    • Optimizer 阶段:使用一些规格 Rule 将 Analyzed Logical Plan 解析成 Optimized Logical Plan
    • Planner 阶段:Logical Plan 转换成多个 Physical Plans,可以利用代价模型 cost model 选择最佳的 Physical Plan
    image.png

    2.案例分析

    # stu 表
    CREATE TABLE stu (
      id INT,
      name STRING,
      age INT
    );
    # stu 表中的数据
    +------+--------+-------+
    |id   |name   | age   |
    +------+--------+-------+
    |0     |John    |10    |
    |1     |Mike    |11    |
    |2     |Lisa    |12    |
    +------+--------+-------+
    
    # score 表
    CREATE TABLE score (
      id INT,
      xueke STRING,
      score INT
    );
    # score 表中的数据
    +------+--------+-------+
    |id   |xueke   |score   |
    +------+--------+-------+
    |0     |Chinese    |80   |
    |0     |Math       |100  |
    |0     |English    |99   |
    |1     |Chinese    |40   |
    |1     |Math       |50   |
    |1     |English    |60   |
    |2     |Chinese    |70   |
    |2     |Math       |80   |
    |2     |English    |90   |
    +------+--------+-------+
    
    # 查看指定 sql 语句的 Parsed Logical Plan、Analyzed Logical Plan、Optimized Logical Plan 和 Physical Plan
    EXPLAIN EXTENDED SELECT sum(v), name FROM 
      (SELECT stu.id, 100+10+score.score AS v, name FROM stu JOIN score 
       WHERE stu.id = score.id AND stu.age >= 11) AS tmp 
    GROUP BY name;
    
    == Parsed Logical Plan ==
    'Aggregate ['name], [unresolvedalias('sum('v), None), 'name]
    +- 'SubqueryAlias tmp
       +- 'Project ['stu.id, ((100 + 10) + 'score.score) AS v#97, 'name]
          +- 'Filter (('stu.id = 'score.id) AND ('stu.age >= 11))
             +- 'Join Inner
                :- 'UnresolvedRelation [stu], [], false
                +- 'UnresolvedRelation [score], [], false
    
    == Analyzed Logical Plan ==
    sum(v): bigint, name: string
    Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
    +- SubqueryAlias tmp
       +- Project [id#103, ((100 + 10) + score#108) AS v#97, name#104]
          +- Filter ((id#103 = id#106) AND (age#105 >= 11))
             +- Join Inner
                :- SubqueryAlias spark_catalog.default.stu
                :  +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
                +- SubqueryAlias spark_catalog.default.score
                   +- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
    +- Project [(110 + score#108) AS v#97, name#104]
       +- Join Inner, (id#103 = id#106)
          :- Project [id#103, name#104]
          :  +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
          :     +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
          +- Project [id#106, score#108]
             +- Filter isnotnull(id#106)
                +- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
    
    == Physical Plan ==
    AdaptiveSparkPlan isFinalPlan=false
    +- HashAggregate(keys=[name#104], functions=[sum(v#97)], output=[sum(v)#110L, name#104])
       +- Exchange hashpartitioning(name#104, 200), ENSURE_REQUIREMENTS, [plan_id=248]
          +- HashAggregate(keys=[name#104], functions=[partial_sum(v#97)], output=[name#104, sum#112L])
             +- Project [(110 + score#108) AS v#97, name#104]
                +- SortMergeJoin [id#103], [id#106], Inner
                   :- Sort [id#103 ASC NULLS FIRST], false, 0
                   :  +- Exchange hashpartitioning(id#103, 200), ENSURE_REQUIREMENTS, [plan_id=240]
                   :     +- Project [id#103, name#104]
                   :        +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
                   :           +- Scan hive default.stu [age#105, id#103, name#104], HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
                   +- Sort [id#106 ASC NULLS FIRST], false, 0
                      +- Exchange hashpartitioning(id#106, 200), ENSURE_REQUIREMENTS, [plan_id=241]
                         +- Filter isnotnull(id#106)
                            +- Scan hive default.score [id#106, score#108], HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
    

    2.1 Parser 阶段

    sql 语句如下所示,经过 Parser 解析后变成抽象语法树即 Parsed Logical Plan,其中叶子节点 UnresolvedRelation 表示该节点还没有被解析,而下一步在 Analyzer 阶段进行处理,并解析这些 UnresolvedRelation 节点。

    == Parsed Logical Plan ==
    'Aggregate ['name], [unresolvedalias('sum('v), None), 'name]
    +- 'SubqueryAlias tmp
       +- 'Project ['stu.id, ((100 + 10) + 'score.score) AS v#97, 'name]
          +- 'Filter (('stu.id = 'score.id) AND ('stu.age >= 11))
             +- 'Join Inner
                :- 'UnresolvedRelation [stu], [], false
                +- 'UnresolvedRelation [score], [], false
    
    image.JPG

    上面树中的节点都是 LogicalPlan 类型的,即进行各种操作的 Operator,如下所示是部分 Operator

    名称 功能描述
    Project(projectList: Seq[NamedExpression], child: LogicalPlan) select 语句输出操作,其中 projectList 是输出对象,每个元素都是一个 expression
    Filter(condition: Expression, child: LogicalPlan) 根据 condition 对 child 的输入 rows 进行过滤
    Aggregate(groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], child: LogicalPlan) 对 child 输出 rows 进行 aggregate 操作,例如 groupby
    Join(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, condition: Option[Expression]) left 和 right 的输出结果进行 join 操作
    Union(children: Seq[LogicalPlan]) 将 children 计算结果进行 Union 联合
    Distinct(child: LogicalPlan) 对 child 输出 rows 取重操作
    Sort(order: Seq[SortOrder], global: Boolean, child: LogicalPlan) 对 child 的输出进行 sort 排序
    SubqueryAlias(alias: Alias, child: LogicalPlan) 对 child 取别名
    GlobalLimit(limitExpr: Expression, child: LogicalPlan) 对 child 输出的数据进行 Limit 限制
    Window(windowExpressions: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], child: LogicalPlan) 输入 child 进行窗口操作,windowExpressions 表示窗口函数的列表,每个窗口函数都是一个 NamedExpression,指定了要计算的聚合操作;partitionSpec 是表达式的列表,用于指定窗口的分区方式;orderSpec 是排序规则的列表,用于指定窗口内行的排序方式

    2.2 Analyzer 阶段

    分析器会根据上下文和元数据信息,将 UnresolvedRelation 解析成 ResolvedRelation,并自动创建一个 SubqueryAlias 别名的子查询。当分析器解析 Hive 表时,会将 HiveTableRelation 作为 ResolvedRelation 的一部分,其中包含了 Hive 表的元数据信息,如表的名称、列的类型等。因此 UnresolvedRelation 就会自动变成 SubqueryAlias 别名信息,并包含 HiveTableRelation 即 Hive 表的元数据信息。

    == Analyzed Logical Plan ==
    sum(v): bigint, name: string
    Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
    +- SubqueryAlias tmp
       +- Project [id#103, ((100 + 10) + score#108) AS v#97, name#104]
          +- Filter ((id#103 = id#106) AND (age#105 >= 11))
             +- Join Inner
                :- SubqueryAlias spark_catalog.default.stu
                :  +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
                +- SubqueryAlias spark_catalog.default.score
                   +- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
    
    image.png

    2.3 Optimizer 阶段

    部分逻辑优化的规则,如下所示。

    名称 功能描述 案例
    ConstantFolding 对常量表达式进行折叠 select 100+10+a from t 转换为 select 110+a from t
    PushDownPredicate 将谓词下推到适当的位置,以尽早过滤掉不必要的行 例如 Filter 操作,select rand(), a from (select * from t) where a>1 转换为 select rand(), a from (select * from t where a>1)
    ColumnPruning 去除不需要的列,即删除 child 无用的 output 字段 select a from (select a, b from t) 转换为 select a from (select a from t)
    BooleanSimplification 对布尔表达式进行简化,主要是针对 where 语句中的 and/or 组合逻辑 true or a=b 转换为 true
    SimplifyCasts 简化类型转换 cast 操作。如果 cast 前后数据类型没有变化,即可以删除 cast 操作 select cast(a as int) from t 转换为 select a from t
    SimplifyCaseConversionExpressions 简化大小写转换操作,如果对字符串进行连续多次 Upper/Lower 操作,只需要进行最后一次操作即可 select lower(upper(lower(a))) as c from 转换成 select lower(a) as c from t;
    CollapseProject 合并相邻的投影操作,将 Project 与子 Project 或者子 Aggregate 进行合并 select c+1 from (select a+b as c from t) 转换为 select a+b+1 as c+1 from t
    CollapseRepartition 合并相邻的 Repartition 重新分区操作 Repartion(numPartitions, shuffle, Repartition(_, _, child)) 转换为 Repartion(numPartitions, shuffle, child)

    左图 Analyzed Logical Plan 使用了如下规则,优化成右图 Optimized Logical Plan。

    • 投影消除 RemoveRedundantProjects:左图的 Project 节点中,(110 + score#108) AS v#97 是一个冗余的投影操作。因为没有其他使用 v#97 的表达式,优化器删除这个投影操作。
    • 谓词下推 PushDownPredicate:左图 Filter 节点中,age#105 >= 11 会被下推到右图靠近 HiveTableRelation 的 Filter 节点,age#105 >= 11isnotnull(id#103) 会被下推到右图靠近 HiveTableRelation 的 Filter 节点,以减少数据的读取量。
    image.JPG
    == Optimized Logical Plan ==
    Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
    +- Project [(110 + score#108) AS v#97, name#104]
       +- Join Inner, (id#103 = id#106)
          :- Project [id#103, name#104]
          :  +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
          :     +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
          +- Project [id#106, score#108]
             +- Filter isnotnull(id#106)
                +- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
    

    2.4 Planner 阶段

    左图 Optimized Logical Plan 最终化成右图 Physical Plan,如下所示。

    • Aggregate 执行节点:左边执行计划中 Aggregate 节点 sum(v),转换成右边物理计划中 HashAggregate 分组 sum + Exchange hashpartitioning + HashAggregate 的汇总 sum
    • Join Inner 执行节点:左边执行计划中 Join Inner 节点转换成右边物理计划中 SortMergeJoin + Sort + Exchange hashpartitioning
    image.jpg
    == Physical Plan ==
    AdaptiveSparkPlan isFinalPlan=false
    +- HashAggregate(keys=[name#104], functions=[sum(v#97)], output=[sum(v)#110L, name#104])
       +- Exchange hashpartitioning(name#104, 200), ENSURE_REQUIREMENTS, [plan_id=248]
          +- HashAggregate(keys=[name#104], functions=[partial_sum(v#97)], output=[name#104, sum#112L])
             +- Project [(110 + score#108) AS v#97, name#104]
                +- SortMergeJoin [id#103], [id#106], Inner
                   :- Sort [id#103 ASC NULLS FIRST], false, 0
                   :  +- Exchange hashpartitioning(id#103, 200), ENSURE_REQUIREMENTS, [plan_id=240]
                   :     +- Project [id#103, name#104]
                   :        +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
                   :           +- Scan hive default.stu [age#105, id#103, name#104], HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
                   +- Sort [id#106 ASC NULLS FIRST], false, 0
                      +- Exchange hashpartitioning(id#106, 200), ENSURE_REQUIREMENTS, [plan_id=241]
                         +- Filter isnotnull(id#106)
                            +- Scan hive default.score [id#106, score#108], HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
    

    说明:spark-sql 中,join 操作是根据各种条件选择不同的 join 策略,包括 BroadcastHashJoinSortMergeJoinShuffleHashJoin

    相关文章

      网友评论

        本文标题:【Spark 精选】SQL 执行计划详解

        本文链接:https://www.haomeiwen.com/subject/rjekedtx.html