Catalyst Optimizers是Spark SQL的一个重要功能,他会将数据查询转换为执行计划。他分为四个步骤:
- 分析
- 逻辑优化
- 物理规划
- 生成代码
例子:
M&Ms例子
两段不同语言代码的执行代码是相同的。所以无论是你使用了什么语言,你的查询和计算会经过相同处理。
# In Python
count_mnm_df = (mnm_df
.select("State", "Color", "Count")
.groupBy("State", "Color")
.agg(count("Count")
.alias("Total"))
.orderBy("Total", ascending=False))
-- In SQL
SELECT State, Color, Count, sum(Count) AS Total
FROM MNM_TABLE_NAME
GROUP BY State, Color, Count
ORDER BY Total DESC
使用count_mnm_df.explain(True)可以查看具体Python Code的详细步骤。(在以后关于Debugging时,我们会更深入的讨论这部分)
count_mnm_df.explain(True)
== Parsed Logical Plan ==
'Sort ['Total DESC NULLS LAST], true
+- Aggregate [State#10, Color#11], [State#10, Color#11, count(Count#12) AS...]
+- Project [State#10, Color#11, Count#12]
+- Relation[State#10,Color#11,Count#12] csv
== Analyzed Logical Plan ==
State: string, Color: string, Total: bigint
Sort [Total#24L DESC NULLS LAST], true
+- Aggregate [State#10, Color#11], [State#10, Color#11, count(Count#12) AS...]
+- Project [State#10, Color#11, Count#12]
+- Relation[State#10,Color#11,Count#12] csv
== Optimized Logical Plan ==
Sort [Total#24L DESC NULLS LAST], true
+- Aggregate [State#10, Color#11], [State#10, Color#11, count(Count#12) AS...]
+- Relation[State#10,Color#11,Count#12] csv
== Physical Plan ==
*(3) Sort [Total#24L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(Total#24L DESC NULLS LAST, 200)
+- *(2) HashAggregate(keys=[State#10, Color#11], functions=[count(Count#12)], output=[State#10, Color#11, Total#24L])
+- Exchange hashpartitioning(State#10, Color#11, 200)
+- *(1) HashAggregate(keys=[State#10, Color#11], functions=[partial_count(Count#12)], output=[State#10, Color#11, count#29L])
+- *(1) FileScan csv [State#10,Color#11,Count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/jules/gits/LearningSpark2.0/chapter2/py/src/... dataset.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<State:string,Color:string,Count:int>
四个步骤
阶段1:Analysis(分析)
在进行SQL或者Dataframe查询时,Spark SQL生成抽象Abstract Syntax Tree(逻辑树)。在这个阶段,任何行名和列名都会被抹除,取而代之的是一个内部Catalog(日志),里面将会记录所有的列名、行名、数据类型、函数、列表、数据库等等。在所有这些属性都被抹除后,查询就会到下一个阶段。
阶段2:Logical Optimization(逻辑优化)
在上图中可以看到,Logical Optimization分为两个小阶段。首先根据标准的Rule Based Optimization,Catalyst Optimizer会构建一个包含了多个plan的集,然后Cost-based optimizer(CBO)会分配每个plan的消耗。这些plan会被布置呈operator trees。
阶段3:Physical Planning(规划)
Spark SQL会为每个逻辑生成一个最佳规划,使Operators和Spark执行引擎进行匹配。
阶段4:Code Generation(代码生成)
最后一个阶段为生成高效的Java代码,并在每个机器上运行。因为Spark SQL可以对保存在内存里的数据进行操作,所以Spark会使用state-of-the-art编译技术去提升执行效率。总而言之,他的作用类似编译器。
Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee
网友评论