序言
-
spark1.6之后引入DataSet,一种基于RDD的高级抽象,在RDD之上加入了scheme信息,给RDD的元素的每一列提供了名称和数据类型的标志。
-
同时DataSet还提供了更多的api,可以实现类似于sql的操作,而且在catalyst优化器的优化下我们的代码将更加高效。
-
其实sql最最厉害的就是将逻辑和物理执行分开,上层专注于让程序员更好的表达数据的处理逻辑,下层专注于把逻辑执行的更高效。而DataSet就是这上层用于表达数据处理逻辑的API的封装。
一些术语约定(下文中有不理解再来看)
逻辑计划:LogicalPlan --指生成的逻辑树
物理计划:这里指SparkPlan --对接了rdd,可以直接执行,也是用树表示的。
解析:parse --指对sql进行解析生成逻辑计划
分析:analyze --指对逻辑计划进行分析
整体执行流程
整体执行流程图大概经过下面这几个阶段:
- 逻辑计划生成:
- spark.sql() 这种方式的话会涉及到sql的解析,解析之后就会生成逻辑计划
- 如果是直接在DataFrame的api上直接操作的话,使用的api将直接生成逻辑计划
- 分析:生成的逻辑计划将会通过解析器结合元数据信息(catalog)进行分析,比如识别到你读的表是什么文件放在哪儿。
- 优化:解析后的逻辑计划会经过优化器进行优化。 例如谓词下推,比如将你的过滤条件下推到下一层,这样就让上一层少算点数据了。
- 生成物理计划:将优化后的逻辑计划转化成能执行的物理计划,也就是转化成rdd的操作。比如你的inner join到底是用broadcatJoin还是用shuffleJoin。这里传说会使用基于代价的优化(RBO)对来进行优化,但是源码中只看到了收集统计信息。
逻辑的表示
- 在spark里面是如何用代码来表示逻辑的呢 答案就是用树结构来表示。也就是你写的sql或者dataframe的操作,spark都会转化成一颗逻辑树,来看两个例子。
例子
首先我们先伪造两张表 people score还有对映的两个DataFrame变量
# 首先我们先伪造两张表 people score还有对映的两个DataFrame变量
from pyspark.sql.functions import sum
people = spark.range(100).selectExpr("id","id+100 age")
score = spark.range(100).selectExpr("id","id+1 math_score","id+2 english_score")
score.cache().count()
people.cache().count()
spark.sql("use tmp")
people.write.saveAsTable("people")
score.write.saveAsTable("score")
DataFrame的操作
people = spark.table("people")
score = spark.table("score")
tmp = people.join(score,score.id == people.id ).filter(people.age > 10)
tmp2 = tmp.select(score.id, (score.math_score+100+80+score.english_score).alias("v") )
res = tmp2.select(sum(tmp2.v))
sql语句的操作
res = spark.sql("""
select sum(v)
from (
select score.id
,100+80+score.math_score+score.english_score as v
from people
join score
on people.id=score.id
where people.age>10
)tmp""")
解释:
以上两种虽然是不同的代码写出来的但是表达的逻辑其实都是一样的,在spark里面就长下图这样。这样一来就将我们通过代码写出来的逻辑转化成了一棵逻辑树,每一个节点都是一步操作。
解析之后的计划
逻辑树在 spark 中的实现
上面的例子我们用图完成了数据操作逻辑的表示,树中的每个节点都相当于一步操作,只要我们获取了最上层的那个源节点我们就能遍历整棵树了。也可以在这个源节点上层增加各种操作,形成更大的一棵树。
spark里面逻辑节点都是TreeNode 主要用在三个地方
- 表达式的表示:用在解析sql的时候,用于表示sql。
- 逻辑计划的表示:用来分析逻辑计划 和 优化逻辑计划的时候
- 物理计划(sparkplan)的表示: 用于执行
逻辑计划和物理计划都继承自QueryPlan,节点有三种类型
- 有两个子节点的叫binaryNode 例如join这样的操作。
- 只有一个子节点的叫unaryNode,例如filter操作。
- 没有子节点的叶子节点称为leafNode,例如读取数据的操作。
表达式可能会有三个的节点。
分析与优化
分析和优化spark里都是使用的规则来进行的,所有抽象出了一个规则执行器的类(RuleExecutor)然后分析器(Analyzer)与优化器(Optmizer)都是它的子类。看图:
- Rule : 具体的规则,把一个LogicalPlan转化成另一个LogicalPlan,实现的过程就大量利用了scala模式匹配的优势。比如PrushDownPredicate(谓词下推)。
- Batch: 一批规则,有些规则需要结合起来使用的,所以规则都统一封装成Batch。
- Strategy:每个Batch都有自己的执行策略,比如有的只执行一次,有个可能需要一直迭代执行到结果不在改变为止。
-
规则执行器 (RuleExecutor):这个类里面就包含了很多Batch,用于使用这些Batch
规则执行器
转化成物理计划
上文说到物理计划在spark里面使用SparkPlan这个类进行表示,它使用rdd来完成各项操作,是可执行的计划。那么LogicalPlan 是如何转化成SparkPlan来进行执行的呢。
GenericStrategy: 这个类就是用来把LogicalPlan 转化成SparkPlan的。他也是基于规则的,子类就是不同规则的实现,比如DataSourceStrategy就是用来处理一些数据源相关的LogicalPlan变成SparkPlan的。
QueryPlanner:计划的执行者,手上拥有很多LogicalPlan转SparkPlan的策略集合。具体的实现是SparPlanner去干的。
例子 spark.read.jdbc().write.jdbc()
广告
欢迎加我微信 Zeal-Zeng
网友评论