https://blog.csdn.net/u013332124/article/details/90677676
一、Spark 目前现有的一些问题
问题一:Shuffle partition数量没有达到最优
在Spark SQL中,我们可以通过spark.sql.shuffle.partition来设置shuffle后的partition数量,默认值是200。shuffle partition的数量等同于下一Stage的Reduce Task的数量。因为shuffle的原因,这些Task处理的数据量残差不齐,大的可能很大,小的可能很小。而Stage的完成又取决于最慢的那个Task,其他的Task可能早早完成,在那等待。如果没有开启动态资源,这势必会造成集群资源上的浪费。即使开启了动态资源,频繁的kill Executor和申请新的Executor一样可能会带来性能损耗。
虽然说我们可以认为设置shuffle partition数量,但是我们还是无法给出一个对所有任务来说都是最优的值,因为每个任务的数据和shuffle情况都不一样。
- 如果这个值调整的太大可能会导致大量的Task,大量的Task就意味着Task调度开销以及资源调度开销(如果开启了动态资源)。另外,如果这个Stage最后要输出,也会造成大量的小文件存在hdfs上。大量的小文件就意味着集群的namenode需要承受更大的压力
- 如果这个值调整的太小,就会导致每个Task处理的数据量变大,可能会导致OOM的问题。就算不发生OOM,Task的处理性能我们也不能接受
因此,现阶段Shuffle partition数量只能针对不同的任务不断的去优化调整,才能得到一个针对这个任务的最优值。但这个在实际的开发中是很难做到的(除非性能太差,否则大多数的spark job开发人员并不会主动去做这种优化)。
所有,有没有一种办法,可以让我们在执行过程中动态的设置shuffle partition数量,让其达到一个近似最优值呢?
问题二:现有执行计划的一些不足
我们都知道,shuffle是一个很耗性能的操作。通过避免不必要的shuffle也能带上一定的性能提升。最常见的做法就是在大小表做Join时,将小表提前加载进内存,之后直接使用内存的数据进行join,这样就少了shuffle带来的性能损耗了。这种做法就是MapJoin,在Spark中,也叫做BroadcastHashJoin。原理是将小表数据以broadcast变量加载到内存,然后广播到各个Executor上,直接在map中做join。在Spark中,可以通过spark.sql.autoBroadcastJoinThreshold来设置启动BroadcastHashJoin的阀值,默认是10MB。
SparkSQL在执行过中,在经过逻辑优化时,会估算是否要开启BroadcastHashJoin。但是这种优化对于复杂的SQL效果并不明显,因为复杂SQL会产生大量的Stage,spark优化程序很难准确的估算各个Stage的数据量来判断是否要开启BroadcastHashJoin。下面是网上的一张图:
在这里插入图片描述图中左边的Stage的数据量只有46.9KB,完全可以优化成BroadcastHashJoin。然而Spark使用的还是常规的SortMergeJoin(也就是Shuffle)。
这个问题主要还是在逻辑优化时无法准确的估算数据量导致的,那么我们是否可以在执行过程中根据数据量动态的去调整执行计划来解决这个问题呢?
问题三:数据倾斜的问题
不管是mapreduce还是spark,都可能存在数据倾斜问题。数据倾斜是某一些partition的数据量远大于其他的partition,数据量大的那个partition处理速度就会拖慢整个任务的处理速度(很可能所有的task都处理完了,只剩下一个task还在处理)。对于数据倾斜问题,我们也有多种解决办法。比如:
- 如果partition数据从外界获取,就保证外界输入的数据是可以Split的,并保证各个Split后的块是均衡的。比如保证Kafka的各个partition数据均衡,读取一个目录时,保证下面的文件大小是均衡的等等
- 如果是shuffle partition,可以通过调整shuffle partition数量来避免某个shuffle partition数据量特别大
- 如果存在一个Key的数据量非常大,调整shuffle partition数量也没办法很好的规避数据倾斜问题。就可以对Key加一些前缀或者后缀来分散数据
- 从shuffle的角度出发,如果两个join的表中有一个表是小表,可以优化成BroadcastHashJoin来消除shuffle从而消除shuffle引起的数据倾斜问题
但是上面这些解决方案都是针对单一任务进行调优,没有一个解决方案可以有效的解决所有的数据倾斜问题。
对于这种问题,我们是不是可以在执行过程中,通过判断shuffle write后各个partition的数据量,动态的调整后面的执行计划。比如对于存在数据倾斜的分区,我们是否可以开启多个task处理,之后再将处理的结果做union?
二、Spark Adaptive Execution提出的相关解决方案
1、自动设置Shuffle Partition数量
Shuffle的过程是先通过Shuffle Write将各个分区的数据写到磁盘,之后另外一个Stage通过Shuffle Read来读取这些数据。那么我们其实可以在开启下一个Stage前先计算好Shuffle Write产生的各个分区的数据量是多少,之后对于那些比较小的分区,将它们当成一个分区来处理。
一般情况下,一个分区是由一个task来处理的。经过优化,我们可以安排一个task处理多个分区,这样,我们就可以保证各个分区相对均衡,不会存在大量数据量很小的partitin了。
比如Shuffle Write外我们检测到有5个partition,数据量大小分别是64M、1M、2M、20M、4M。如果没有进行优化,会开启5个task来处理,要等64M的那个partiiton处理完后整个Stage才算完成。经过优化后,我们可以1M、2M、20M、4M这些分区都交给一个task来处理。这样,总共就只有两个task,但是整个stage的处理速度并不会比之前的慢,还少了3个task所需要的资源损耗。
一些关键点:
- 目前只会合并连在一起的那些partition,主要是为了保证顺序读,提高磁盘IO性能
- 可以通过配置spark.sql.adaptive.shuffle.targetPostShuffleInputSize来设置合并的阀值,默认为64M
- 只会合并小的分区,太大的分区并不会进行拆分
开启方式:
spark.sql.adaptive.enabled=true:启动Adaptive Execution。
通过spark.sql.adaptive.shuffle.targetPostShuffleInputSize可以设置shuffle后每个partition的目标数据量。一个Task加起来处理的所有分区的数据量不会超过个阀值。
2、执行过程中动态调整执行计划
还是在Shuffle Write之后,我们可以观察两个Stage输出的数据量。如果有一个Stage数据量明显比较小,可以转换成BroadcastHashJoin,那么我们就可以动态的去调整执行计划。
虽然shuffle write的数据已经输出到磁盘上,这时候我们如果开启了动态调整执行计划,shuffle read改成BroadcastHashJoin。假设表A(1M)和表B(4G)做join时,并已经进行了Shuffle Write,转换成BroadcastHashJoin的过程如下:
- 将表A的数据加载成broadcast
- 假设上游表B有5个partition,那么此时下游Stage也创建对应5个reduce task,每个reduce task都读取对应上游partition的shuffle write生成的文件,然后在读取过程中从内存读取表A的数据进行join
因为下游的Reduce Task可以直接发到表B Shuffle Write文件所在的Executor上,此时读取数据是直接读取磁盘文件了,避开了网络IO的开销,性能会比原先的shuffle read快很多。
开启方式:
- spark.sql.adaptive.enabled和
spark.sql.adaptive.join.enabled
都设置为true。
-
spark.sql.adaptiveBroadcastJoinThreshold
设置了 SortMergeJoin 转 BroadcastJoin 的阈值。如果不设置该参数,该阈值与spark.sql.autoBroadcastJoinThreshold
的值相等
3、自动处理数据倾斜
还是在Shuffle Write之后解决问题。一样是获取到shuffle Write后各个partition的数据量,根据一定算法算出哪些partition数据超标,出现倾斜。
对于那些存在大量小数据的partiiton,我们可以通过合并来解决问题(一个task处理多个partition的数据)。那对于这种数据量特别大的partition,我们完全可以反其道而行,用多个task来处理这个partition。
开启自动处理数据倾斜后,在执行过程中,spark会自动找出那些出现倾斜的partiiton,然后用多个task来处理这个partition,之后再将这些task的处理结果进行union。
比如表A和表B做join,表A在shuffle write完,partition 0有4G的数据,其他partition都只有1,200M。这时候我们可以开启多个task,每个task读取几个上游mapper生成的partition 0的数据,然后和表B的partition 0做join,最后这个几个task再进行union。这样虽然表B的partition 0要被多次读取,但是并行处理带来的收益还是要高过这些消耗的。
开启方式:
-
spark.sql.adaptive.skewedJoin.enabled
设置为 true -
spark.sql.adaptive.skewedPartitionMaxSplits
控制处理一个倾斜 Partition 的 Task 个数上限,默认值为 5 - spark.sql.adaptive.skewedPartitionRowCountThreshold ,partition的条数如果少于这个值,数据量再大也不会被当成是倾斜的partition。默认是1000W
- spark.sql.adaptive.skewedPartitionSizeThreshold,被认定为是倾斜partiiton的大小下限。默认是64M
- spark.sql.adaptive.skewedPartitionFactor,倾斜因子。如果一个 Partition 的大小大于
spark.sql.adaptive.skewedPartitionSizeThreshold
的同时大于各 Partition 大小中位数与该因子的乘积,或者行数大于spark.sql.adaptive.skewedPartitionRowCountThreshold
的同时大于各 Partition 行数中位数与该因子的乘积,则它会被视为倾斜的 Partition
三、总结
Adaptive Execution是英特尔大数据技术团队和百度大数据基础架构部工程师在Spark 社区版本的基础上,改进并实现的自适应执行引擎。目前代码在https://github.com/Intel-bigdata/spark-adaptive里,并没有全部merge到spark中。相关的issue也还是In proccess状态:https://issues.apache.org/jira/browse/SPARK-23128。
目前就看到有博客说2.3.1版本中已经有了"自动设置Shuffle Partition数量"的特性,我在spark-2.2之后的代码中也可以搜到spark.sql.adaptive.enabled和spark.sql.adaptiveBroadcastJoinThreshold配置。截止到2.4.3版本,另外两个特性应该还没真正发布,可以期待一下。
参考资料
http://www.jasongj.com/spark/adaptive_execution/
网友评论