1.Spark SQL 执行流程
- Parser 阶段:使用 Antlr4 对 sql 语句进行词法和语法的解析
- Analyzer 阶段:利用 Catalog 信息将 Unresolved Logical Plan 解析成 Analyzed Logical plan
- Optimizer 阶段:使用一些规格 Rule 将 Analyzed Logical Plan 解析成 Optimized Logical Plan
- Planner 阶段:Logical Plan 转换成多个 Physical Plans,可以利用代价模型 cost model 选择最佳的 Physical Plan
2.案例分析
# stu 表
CREATE TABLE stu (
id INT,
name STRING,
age INT
);
# stu 表中的数据
+------+--------+-------+
|id |name | age |
+------+--------+-------+
|0 |John |10 |
|1 |Mike |11 |
|2 |Lisa |12 |
+------+--------+-------+
# score 表
CREATE TABLE score (
id INT,
xueke STRING,
score INT
);
# score 表中的数据
+------+--------+-------+
|id |xueke |score |
+------+--------+-------+
|0 |Chinese |80 |
|0 |Math |100 |
|0 |English |99 |
|1 |Chinese |40 |
|1 |Math |50 |
|1 |English |60 |
|2 |Chinese |70 |
|2 |Math |80 |
|2 |English |90 |
+------+--------+-------+
# 查看指定 sql 语句的 Parsed Logical Plan、Analyzed Logical Plan、Optimized Logical Plan 和 Physical Plan
EXPLAIN EXTENDED SELECT sum(v), name FROM
(SELECT stu.id, 100+10+score.score AS v, name FROM stu JOIN score
WHERE stu.id = score.id AND stu.age >= 11) AS tmp
GROUP BY name;
== Parsed Logical Plan ==
'Aggregate ['name], [unresolvedalias('sum('v), None), 'name]
+- 'SubqueryAlias tmp
+- 'Project ['stu.id, ((100 + 10) + 'score.score) AS v#97, 'name]
+- 'Filter (('stu.id = 'score.id) AND ('stu.age >= 11))
+- 'Join Inner
:- 'UnresolvedRelation [stu], [], false
+- 'UnresolvedRelation [score], [], false
== Analyzed Logical Plan ==
sum(v): bigint, name: string
Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
+- SubqueryAlias tmp
+- Project [id#103, ((100 + 10) + score#108) AS v#97, name#104]
+- Filter ((id#103 = id#106) AND (age#105 >= 11))
+- Join Inner
:- SubqueryAlias spark_catalog.default.stu
: +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
+- SubqueryAlias spark_catalog.default.score
+- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
== Optimized Logical Plan ==
Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
+- Project [(110 + score#108) AS v#97, name#104]
+- Join Inner, (id#103 = id#106)
:- Project [id#103, name#104]
: +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
: +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
+- Project [id#106, score#108]
+- Filter isnotnull(id#106)
+- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[name#104], functions=[sum(v#97)], output=[sum(v)#110L, name#104])
+- Exchange hashpartitioning(name#104, 200), ENSURE_REQUIREMENTS, [plan_id=248]
+- HashAggregate(keys=[name#104], functions=[partial_sum(v#97)], output=[name#104, sum#112L])
+- Project [(110 + score#108) AS v#97, name#104]
+- SortMergeJoin [id#103], [id#106], Inner
:- Sort [id#103 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#103, 200), ENSURE_REQUIREMENTS, [plan_id=240]
: +- Project [id#103, name#104]
: +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
: +- Scan hive default.stu [age#105, id#103, name#104], HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
+- Sort [id#106 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#106, 200), ENSURE_REQUIREMENTS, [plan_id=241]
+- Filter isnotnull(id#106)
+- Scan hive default.score [id#106, score#108], HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
2.1 Parser 阶段
sql 语句如下所示,经过 Parser 解析后变成抽象语法树即 Parsed Logical Plan,其中叶子节点 UnresolvedRelation
表示该节点还没有被解析,而下一步在 Analyzer 阶段进行处理,并解析这些 UnresolvedRelation
节点。
== Parsed Logical Plan ==
'Aggregate ['name], [unresolvedalias('sum('v), None), 'name]
+- 'SubqueryAlias tmp
+- 'Project ['stu.id, ((100 + 10) + 'score.score) AS v#97, 'name]
+- 'Filter (('stu.id = 'score.id) AND ('stu.age >= 11))
+- 'Join Inner
:- 'UnresolvedRelation [stu], [], false
+- 'UnresolvedRelation [score], [], false
image.JPG
上面树中的节点都是 LogicalPlan 类型的,即进行各种操作的 Operator,如下所示是部分 Operator
名称 | 功能描述 | |
---|---|---|
Project(projectList: Seq[NamedExpression], child: LogicalPlan) |
select 语句输出操作,其中 projectList 是输出对象,每个元素都是一个 expression | |
Filter(condition: Expression, child: LogicalPlan) |
根据 condition 对 child 的输入 rows 进行过滤 | |
Aggregate(groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], child: LogicalPlan) |
对 child 输出 rows 进行 aggregate 操作,例如 groupby | |
Join(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, condition: Option[Expression]) |
left 和 right 的输出结果进行 join 操作 | |
Union(children: Seq[LogicalPlan]) |
将 children 计算结果进行 Union 联合 | |
Distinct(child: LogicalPlan) |
对 child 输出 rows 取重操作 | |
Sort(order: Seq[SortOrder], global: Boolean, child: LogicalPlan) |
对 child 的输出进行 sort 排序 | |
SubqueryAlias(alias: Alias, child: LogicalPlan) |
对 child 取别名 | |
GlobalLimit(limitExpr: Expression, child: LogicalPlan) |
对 child 输出的数据进行 Limit 限制 | |
Window(windowExpressions: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], child: LogicalPlan) |
输入 child 进行窗口操作,windowExpressions 表示窗口函数的列表,每个窗口函数都是一个 NamedExpression,指定了要计算的聚合操作;partitionSpec 是表达式的列表,用于指定窗口的分区方式;orderSpec 是排序规则的列表,用于指定窗口内行的排序方式 |
2.2 Analyzer 阶段
分析器会根据上下文和元数据信息,将 UnresolvedRelation
解析成 ResolvedRelation
,并自动创建一个 SubqueryAlias
别名的子查询。当分析器解析 Hive 表时,会将 HiveTableRelation
作为 ResolvedRelation
的一部分,其中包含了 Hive 表的元数据信息,如表的名称、列的类型等。因此 UnresolvedRelation
就会自动变成 SubqueryAlias
别名信息,并包含 HiveTableRelation
即 Hive 表的元数据信息。
== Analyzed Logical Plan ==
sum(v): bigint, name: string
Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
+- SubqueryAlias tmp
+- Project [id#103, ((100 + 10) + score#108) AS v#97, name#104]
+- Filter ((id#103 = id#106) AND (age#105 >= 11))
+- Join Inner
:- SubqueryAlias spark_catalog.default.stu
: +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
+- SubqueryAlias spark_catalog.default.score
+- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
image.png
2.3 Optimizer 阶段
部分逻辑优化的规则,如下所示。
名称 | 功能描述 | 案例 |
---|---|---|
ConstantFolding | 对常量表达式进行折叠 | select 100+10+a from t 转换为 select 110+a from t |
PushDownPredicate | 将谓词下推到适当的位置,以尽早过滤掉不必要的行 | 例如 Filter 操作,select rand(), a from (select * from t) where a>1 转换为 select rand(), a from (select * from t where a>1) |
ColumnPruning | 去除不需要的列,即删除 child 无用的 output 字段 | select a from (select a, b from t) 转换为 select a from (select a from t) |
BooleanSimplification | 对布尔表达式进行简化,主要是针对 where 语句中的 and/or 组合逻辑 | true or a=b 转换为 true |
SimplifyCasts | 简化类型转换 cast 操作。如果 cast 前后数据类型没有变化,即可以删除 cast 操作 | select cast(a as int) from t 转换为 select a from t |
SimplifyCaseConversionExpressions | 简化大小写转换操作,如果对字符串进行连续多次 Upper/Lower 操作,只需要进行最后一次操作即可 | select lower(upper(lower(a))) as c from 转换成 select lower(a) as c from t; |
CollapseProject | 合并相邻的投影操作,将 Project 与子 Project 或者子 Aggregate 进行合并 | select c+1 from (select a+b as c from t) 转换为 select a+b+1 as c+1 from t |
CollapseRepartition | 合并相邻的 Repartition 重新分区操作 | Repartion(numPartitions, shuffle, Repartition(_, _, child)) 转换为 Repartion(numPartitions, shuffle, child) |
左图 Analyzed Logical Plan 使用了如下规则,优化成右图 Optimized Logical Plan。
-
投影消除 RemoveRedundantProjects:左图的
Project
节点中,(110 + score#108) AS v#97
是一个冗余的投影操作。因为没有其他使用v#97
的表达式,优化器删除这个投影操作。 -
谓词下推 PushDownPredicate:左图
Filter
节点中,age#105 >= 11
会被下推到右图靠近HiveTableRelation
的 Filter 节点,age#105 >= 11
和isnotnull(id#103)
会被下推到右图靠近HiveTableRelation
的 Filter 节点,以减少数据的读取量。
== Optimized Logical Plan ==
Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
+- Project [(110 + score#108) AS v#97, name#104]
+- Join Inner, (id#103 = id#106)
:- Project [id#103, name#104]
: +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
: +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
+- Project [id#106, score#108]
+- Filter isnotnull(id#106)
+- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
2.4 Planner 阶段
左图 Optimized Logical Plan 最终化成右图 Physical Plan,如下所示。
-
Aggregate 执行节点:左边执行计划中 Aggregate 节点
sum(v)
,转换成右边物理计划中 HashAggregate 分组 sum + Exchange hashpartitioning + HashAggregate 的汇总 sum - Join Inner 执行节点:左边执行计划中 Join Inner 节点转换成右边物理计划中 SortMergeJoin + Sort + Exchange hashpartitioning
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[name#104], functions=[sum(v#97)], output=[sum(v)#110L, name#104])
+- Exchange hashpartitioning(name#104, 200), ENSURE_REQUIREMENTS, [plan_id=248]
+- HashAggregate(keys=[name#104], functions=[partial_sum(v#97)], output=[name#104, sum#112L])
+- Project [(110 + score#108) AS v#97, name#104]
+- SortMergeJoin [id#103], [id#106], Inner
:- Sort [id#103 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#103, 200), ENSURE_REQUIREMENTS, [plan_id=240]
: +- Project [id#103, name#104]
: +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
: +- Scan hive default.stu [age#105, id#103, name#104], HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
+- Sort [id#106 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#106, 200), ENSURE_REQUIREMENTS, [plan_id=241]
+- Filter isnotnull(id#106)
+- Scan hive default.score [id#106, score#108], HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
说明:spark-sql 中,join 操作是根据各种条件选择不同的 join 策略,包括 BroadcastHashJoin、SortMergeJoin 和 ShuffleHashJoin
网友评论