方法一 :避免shuffle
对于可以避免进行shu的要避免进行shuffle这样就从根本上解决数据倾斜,大致思路就是如果直接将同一个key对应的多个value进行合并也可以完成需要的话,那就直接进行合并,比如说对应一个key,他的value是一个数字那我们就可以直接使用累加器进行累加,对于string类型的value我们可以进行拼接,最后使用的时候再截取开就行了,这样 就 从根本上避免了数据倾斜。
方法二 :过滤导致倾斜的key
如果在Spark 作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的key 进行过滤,滤除可能导致数据倾斜的key 对应的数据,这样,在Spark 作业中就不会发生数据倾斜了。
方法三:提高 shuffle 操作中的 reduce 并行度
当方案一和方案二对于数据倾斜的处理没有很好的效果时, 可以考虑提高shuffle 过程中的 reduce 端并行度, reduce 端并行度的提高就增加了 reduce 端 task的数量,那么每个 task 分配到的数据量就会相应减少, 由此缓解数据倾斜问题。
1.reduce 端并行度的设置
在 大 部 分 的 shuffle 算 子 中 , 都 可 以 传 入 一 个 并 行 度 的 设 置 参 数 , 比如reduceByKey(500),这个参数会决定 shuffle 过程中 reduce 端的并行度,在进行 shuffle操作的时候,就会对应着创建指定数量的 reduce task。对于 Spark SQL 中的 shuffle 类语句,比如 group by、join 等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了 shuffle read task 的并行度,该值默认是 200, 对于很多场景来说都有点过小。增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task,从而让每个 task 处理比原来更少的数据。举例来说,如果原本有 5 个 key, 每个 key 对应 10 条数据,这 5 个 key 都是分配给一个 task 的,那么这个 task 就要处理 50 条数据。而增加了 shuffle read task 以后,每个 task 就分配到一个 key,即每个 task 就处理 10 条数据, 那么自然每个 task 的执行时间都会变短了。
2.reduce 端并行度设置存在的缺陷
提高 reduce 端并行度并没有从根本上改变数据倾斜的本质和问题( 方案一和方案二从根本上避免了数据倾斜的发生) , 只是尽可能地去缓解和减轻 shuffle reduce task 的数据压力,以及数据倾斜的问题, 适用于有较多 key 对应的数据量都比较大的情况。
该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个 key 对应的数据量有 100 万,那么无论你的 task 数量增加到多少,这个对应着 100 万数据的 key 肯定还是会分配到一个 task 中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段, 尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。在理想情况下,reduce 端并行度提升后,会在一定程度上减轻数据倾斜的问题, 甚至基本消除数据倾斜;但是,在一些情况下,只会让原来由于数据倾斜而运行缓慢的 task 运行速度稍有提升, 或者避免了某些 task 的 OOM 问题, 但是, 仍然运行缓慢,此时,要及时放弃方案三,开始尝试后面的方案。
方法四:使用随机 key 实现双重聚合
当使用了类似于 groupByKey、reduceByKey 这样的算子时,可以考虑使用随机
key 实现双重聚合,如图 3-1 所示:
图 3-1 随机 key 实现双重聚合首先, 通过 map 算子给每个数据的 key 添加随机数前缀, 对 key 进行打散, 将原先一样的 key 变成不一样的 key, 然后进行第一次聚合, 这样就可以让原本被一个 task处理的数据分散到多个 task 上去做局部聚合;随后,去除掉每个 key 的前缀, 再次进行聚合。
此方法对于由 groupByKey、reduceByKey 这类算子造成的数据倾斜由比较好的效果,仅仅适用于聚合类的 shuffle 操作,适用范围相对较窄。如果是 join 类的 shuffle 操作,还得用其他的解决方案。
此方法也是前几种方案没有比较好的效果时要尝试的解决方案。
方法五:将 reduce join 转换为 map join
正常情况下, join 操作都会执行 shuffle 过程,并且执行的是 reduce join,也就是先将所有相同的 key 和对应的 value 汇聚到一个 reduce task 中,然后再进行 join。
普通 join 的过程如下图所示:
图 3-2 普通 join 过程普通的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join, 此时就是 reduce join。但是如果一个
RDD 是比较小的,则可以采用广播小 RDD 全量数据+map 算子来实现与 join 同样的效果,也就是 map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜。
( 注意,RDD 是并不能进行广播的,只能将 RDD 内部的数据通过 collect 拉取
到 Driver 内存然后再进行广播)
1.核心思路:
不使用 join 算子进行连接操作, 而使用 Broadcast 变量与 map 类算子实现 join
操作,进而完全规避掉 shuffle 类的操作,彻底避免数据倾斜的发生和出现。将较小
RDD 中的数据直接通过 collect 算子拉取到 Driver 端的内存中来,然后对其创建一个 Broadcast 变量;接着对另外一个 RDD 执行 map 类算子, 在算子函数内, 从
Broadcast 变量中获取较小 RDD 的全量数据,与当前 RDD 的每一条数据按照连接
key 进行比对,如果连接 key 相同的话,那么就将两个 RDD 的数据用你需要的方式连接起来。
根据上述思路,根本不会发生 shuffle 操作,从根本上杜绝了 join 操作可能导致的数据倾斜问题。
当 join 操作有数据倾斜问题并且其中一个 RDD 的数据量较小时, 可以优先考虑这种方式,效果非常好。map join 的过程如图 3-3 所示:
图 3-3 map join 过程2.不适用场景分析:
由于 Spark 的广播变量是在每个 Executor 中保存一个副本,如果两个 RDD 数据量都比较大, 那么如果将一个数据量比较大的 RDD 做成广播变量,那么很有可能会造成内存溢出。
方法六:sample 采样对倾斜key 单独进行join
在Spark 中,如果某个RDD 只有一个key,那么在shuffle 过程中会默认将此key 对应的数据打散,由不同的reduce 端task 进行处理。
当由单个key 导致数据倾斜时,可有将发生数据倾斜的key 单独提取出来,组成一个RDD,然后用这个原本会导致倾斜的key 组成的RDD 根其他RDD 单独join,此时,根据Spark 的运行机制,此RDD 中的数据会在shuffle 阶段被分散到多个task 中去进行join 操作。倾斜key 单独join 的流程如图3-4 所示:
图3-4 倾斜key 单独join 流程1.适用场景分析:
对于RDD 中的数据,可以将其转换为一个中间表,或者是直接使用countByKey() 的方式,看一个这个RDD 中各个key 对应的数据量,此时如果你发现整个RDD 就一个key 的数据量特别多,那么就可以考虑使用这种方法。
当数据量非常大时,可以考虑使用sample 采样获取10%的数据,然后分析这
10%的数据中哪个key 可能会导致数据倾斜,然后将这个key 对应的数据单独提取出来。
2. 不适用场景分析:
如果一个RDD 中导致数据倾斜的key 很多,那么此方案不适用。
方案七:使用随机数以及扩容进行join
简单理解原理:简单理解就是对于两个rdd,如果一个rdd的的某些key出现频率很高,那么可以在他的key前面随机的加上1~N之间的一个数字,这样再shuffle的时候就会将同一个key分散开,但是如果只对一侧的rdd进行这样的操作那么会产生这样的情况左侧的key在右侧的rdd中没有与之对应的key了,所以右侧的key要进行扩容,因为左侧的key加的数字是1~N随机的,所以右侧的rdd,要从1~N各加一次,这样才会让左侧的新key在右侧都有与之对应的key。
如果在进行join 操作时,RDD 中有大量的key 导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了,对于join 操作,我们可以考虑对其中一个RDD 数据进行扩容,另一个RDD 进行稀释后再join。
我们会将原先一样的key 通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task 中去处理,而不是让一个task 处理大量的相同key。这一种方案是针对有大量倾斜key 的情况,没法将部分key 拆分出来进行单独处理,需要对整个RDD 进行数据扩容,对内存资源要求很高。
核心思想:
选择一个RDD,使用flatMap 进行扩容,对每条数据的key 添加数值前缀( 1~N
的数值),将一条数据映射为多条数据;(扩容)
选择另外一个RDD,进行map 映射操作,每条数据的key 都打上一个随机数作为前缀( 1~N 的随机数) ;(稀释)
将两个处理后的RDD,进行join 操作。
局限性:
图3-6 使用随机数以及扩容进行join如果两个RDD 都很大,那么将RDD 进行N 倍的扩容显然行不通;使用扩容的方式只能缓解数据倾斜,不能彻底解决数据倾斜问题。
使用方案七对方案六进一步优化分析:
当RDD 中有几个key 导致数据倾斜时,方案六不再适用,而方案七又非常消耗资源,此时可以引入方案七的思想完善方案六:对包含少数几个数据量过大的key 的那个RDD,通过sample 算子采样出一份样本来,然后统计一下每个key 的数量,计算出来数据量最大的是哪几个key。然后将这几个key 对应的数据从原来的RDD 中拆分出来,形成一个单独的RDD,并给每个key 都打上n 以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。接着将需要join 的另一个RDD,也过滤出来那几个倾斜key 对应的数据并形成一个单独的RDD,将每条数据膨胀成n 条数据,这n 条数据都按顺序附加一个0~n 的前缀,不会导致倾斜的大部分key 也形成另外一个RDD。再将附加了随机前缀的独立RDD 与另一个膨胀n 倍的独立RDD 进行join,此时就可以将原先相同的key 打散成n 份,分散到多个task 中去进行join 了。而另外两个普通的RDD 就照常join 即可。最后将两次join 的结果使用union 算子合并起来即可,就是最终的join 结果。
方法一 :避免shuffle
方法二 :过滤导致倾斜的key
方法三:提高 shuffle 操作中的 reduce 并行度
方法四:使用随机 key 实现双重聚合
方法五:将 reduce join 转换为 map join
方法六:sample 采样对倾斜key 单独进行join
方案七:使用随机数以及扩容进行join
网友评论