spark 3.0 亮点
the cost-based optimization framework
基于成本的优化框架
该框架收集并利用各种数据统计信息(如行数,不同值的数量,NULL 值,最大/最小值等)来帮助 Spark 选择更好的计划。这些基于成本的优化技术很好的例子就是选择正确的 Join 类型(broadcast hash join vs. sort merge join),在 hash join 的时候选择正确的连接顺序,或在多个 join 中调整 join 顺序。然而,过时的统计信息和不完善的基数估计可能导致次优查询计划
spark sql join 原理
喊三遍 流式遍历表
,查找表
,流式遍历表
,查找表
,流式遍历表
,查找表
,
Spark将参与Join的两张表抽象为流式遍历表(streamIter
)和查找表(buildIter
),通常streamIter
为大表,buildIter
为小表,我们不用担心哪个表为streamIter
,哪个表为buildIter
,这个spark会根据join语句自动帮我们完成。但是在其它的sql执行引擎中,通常是左表作为流失遍历表,右表作为查找表,也就是小表放在前面的说法,优化sql执行性能,减少遍历次数
- spark提供了三种join实现:sort merge join、broadcast join以及hash join。
-
sort merge join
实现 SerializedShuffleHandle
image.png
spark 2.4 源码 SparkStrategy.scala#SortMergeJoin
// --- SortMergeJoin ------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
采用sort merge join,spark在shuffle write
输出sort file,在shuffle read
阶段,分别对streamIter
和buildIter
进行merge sort
,在遍历streamIter
时,对于每条记录,都采用顺序查找的方式从buildIter
查找对应的记录,由于两个表都是排序的,每次处理完streamIter的一条记录后,对于streamIter的下一条记录,只需从buildIter中上一次查找结束的位置开始查找,所以说每次在buildIter中查找不必重头开始,整体上来说,查找性能还是较优的。
-
broadcast join
就是map端join,小表直接放在内存
spark 2.4 源码 SparkStrategy.scala#BroadcastHashJoin
// --- BroadcastHashJoin --------------------------------------------------------------------
// broadcast hints were specified
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// broadcast hints were not specified, so need to infer it from size and configuration.
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// 判断广播小的表
private def broadcastSide(
canBuildLeft: Boolean,
canBuildRight: Boolean,
left: LogicalPlan,
right: LogicalPlan): BuildSide = {
def smallerSide =
if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft
if (canBuildRight && canBuildLeft) {
// Broadcast smaller side base on its estimated physical size
// if both sides have broadcast hint
smallerSide
} else if (canBuildRight) {
BuildRight
} else if (canBuildLeft) {
BuildLeft
} else {
// for the last default broadcast nested loop join
smallerSide
}
}
可见是先根据broadcast hint来判断,其次是广播阈值
-
hash join
实现 BypassMergeSortShuffle
在shuffle read阶段不对记录排序,反正来自两格表的具有相同key的记录会在同一个分区,只是在分区内不排序,将来自buildIter的记录放到hash表中,以便查找,如下图所示。
image.png
spark 2.4 源码 SparkStrategy.scala#ShuffledHashJoin
// --- ShuffledHashJoin ---------------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
&& muchSmaller(right, left) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
&& muchSmaller(left, right) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
}
buildIter总体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件
- 开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin=false
- 每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每个分区
- 来自buildIter的记录要能放到内存中 streamIter的大小是buildIter三倍以上
spark3.0 adaptive-query-execution
- 自适应查询框架优化
时机
:在shuffle的物化阶段,完成的当前阶段检索运行时统计信息,并相应地更新逻辑,当查询开始时,自适应查询执行框架首先启动所有叶子阶段(leaf stages
)——这些阶段不依赖于任何其他阶段。一旦其中一个或多个阶段完成物化,框架便会在物理查询计划中将它们标记为完成,并相应地更新逻辑查询计划,同时从完成的阶段检索运行时统计信息 - 查询计划基于这些新的统计信息,框架将运行优化程序、物理计划程序以及物理优化规则,其中包括常规物理规则(regular physical rules)和自适应执行特定的规则,如合并分区(
coalescing partitions
)、Join 数据倾斜处理(skew join handling
)等。现在我们有了一个新优化的查询计划,其中包含一些已完成的阶段,自适应执行框架将搜索并执行子阶段已全部物化的新查询阶段,并重复上面的execute-reoptimize-execute
过程,直到完成整个查询。
Spark 3.0 的 AQE 框架带来了以下三个特性:
动态合并 shuffle 的分区(Dynamically coalescing shuffle partitions)
- 如果分区数太少,那么每个分区处理的数据大小可能非常大,处理这些大分区的任务可能需要将数据溢写到磁盘(例如,涉及排序或聚合),从而减慢查询速度。
- 如果分区数太多,那么每个分区处理的数据大小可能非常小,并且将有大量的网络数据获取来读取 shuffle 块,这也会由于低效的 I/O 模式而减慢查询速度。拥有大量的任务也会给 Spark 任务调度程序带来更多的负担。
例子:SELECT max(i)FROM tbl GROUP BY j
image.png
image.png
- tbl 表的输入数据相当小,所以在分组之前只有两个分区。我们把初始的 shuffle 分区数设置为 5,因此在 shuffle 的时候数据被打乱到 5 个分区中。如果没有 AQE,Spark 将启动 5 个任务来完成最后的聚合。然而,这里有三个非常小的分区,为每个分区启动一个单独的任务将是一种浪费。
- 以上发现分区2,3,4都非常小,开启动态合并分区策略,小分区合并为一个,因此,最终的聚合只需要执行三个任务,而不是五个。
动态调整 Join 策略(Dynamically switching join strategies)
Spark 支持许多 Join 策略,其中 broadcast hash join 通常是性能最好的,前提是参加 join 的一张表的数据能够装入内存。由于这个原因,当 Spark 估计参加 join 的表数据量小于广播大小的阈值时,其会将 Join 策略调整为 broadcast hash join。但是,很多情况都可能导致这种大小估计出错——例如存在一个非常有选择性的过滤器。
为了解决这个问题,AQE 现在根据最精确的连接关系大小在运行时重新规划 join 策略。在下面的示例中可以看到,Join 的右侧比估计值小得多,并且小到足以进行广播,因此在 AQE 重新优化之后,静态计划的 sort merge join 现在被转换为 broadcast hash join。
image.png
对于在运行时转换的 broadcast hash join ,我们可以进一步将常规的 shuffle 优化为本地化 shuffle来减少网络流量。
动态优化数据倾斜的 Join(Dynamically optimizing skew joins)
当数据在集群中的分区之间分布不均时,就会发生数据倾斜。严重的倾斜会显著降低查询性能,特别是在进行 Join 操作时。
AQE 倾斜 Join 优化从 shuffle 文件统计信息中自动检测到这种倾斜。然后,它将倾斜的分区分割成更小的子分区,这些子分区将分别从另一端连接到相应的分区。
假设表 A join 表B,其中表 A 的分区 A0 里面的数据明显大于其他分区。
image.png
skew join optimization 将把分区 A0 分成两个子分区,并将每个子分区连接到表B的相应分区B0。
image.png
如果没有这个优化,将有四个任务运行 sort merge join,其中一个任务将花费非常长的时间。在此优化之后,将有5个任务运行 join,但每个任务将花费大致相同的时间,从而获得总体更好的性能。
Enabling AQE
AQE can be enabled by setting SQL config spark.sql.adaptive.enabled to true
(default false in Spark 3.0),
满足以下条件建议启用:
It is not a streaming query
It contains at least one exchange (usually when there’s a join, aggregate or window operator) or one subquery
参考
https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
https://mp.weixin.qq.com/s/VU5oXjMqvCV7SZzfWCpfgw
网友评论