美文网首页
Spark 性能优化 高级篇

Spark 性能优化 高级篇

作者: 博弈史密斯 | 来源:发表于2018-05-17 15:19 被阅读0次

本篇文章的上篇是:Spark 性能调优 基础篇

1. 数据倾斜调优

有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。

数据倾斜,本质上就是:分配给 Task 的数据 不均衡的现象

1.1 数据倾斜发生时的现象

  • 绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。
  • 原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。

1.2 数据倾斜发生的原理

在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。

数据量大的 Task会比其他 Task 执行时间更长,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。

1.3 如何定位导致数据倾斜的代码

数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:**ByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。

1.3.1 某个task执行特别慢的情况

首先要看的,就是数据倾斜发生在第几个stage中。

可以通过Spark Web UI来查看当前运行到了第几个stage。
在启动spark-shell时,调试信息会给出spark driver’s UI的入口,如:
Spark context Web UI available at http://192.168.56.156:4040
在浏览器输入 ip:端口号 进入 Spark Web UI 界面,点击 Stages,下面看到 Tasks:



倒数第三列显示了每个task的运行时间。明显可以看到,有的task运行特别快,只需要几秒钟就可以运行完;而有的task运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,倒数第一列显示了每个task处理的数据量,明显可以看到,运行时间特别短的task只需要处理几百KB的数据即可,而运行时间特别长的task需要处理几千KB的数据,处理的数据量差了10倍。此时更加能够确定是发生了数据倾斜。

数据倾斜只发生在 Shuffle,通过Web UI,可确定数据倾斜发生在哪个 Stage,然后再定位代码,确定是哪个 Shuffle类算子。

1.3.2 某个task莫名其妙内存溢出的情况

内存溢出 可能是 task 数据量任务过大,可能发生了数据倾斜。如果运行在 YARN 上,可查看 yarn-cluster 模式下的log中的异常栈,定位到你的代码中哪一行发生了内存溢出,可能会看到 Shuffle 类算子。通过 Web UI 辅助查看报错的那个stage 的各个 task 的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。

1.4 查看导致数据倾斜的key的数据分布情况

根据你执行操作的情况不同,可以有很多种查看key分布的方式:

  1. 如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
  2. 如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。

如下示例,我们可以先对pairs采样10%的样本数据,然后使用countByKey算子统计出每个key出现的次数,最后在客户端遍历和打印样本数据中各个key的出现次数。

val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

1.5 数据倾斜的解决方案

解决方案一:使用Hive ETL预处理数据

导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。

可以通过Hive来进行数据预处理然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

但是这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。

解决方案二:过滤少数导致倾斜的key

如果发现导致倾斜的key就少数几个,而且对数据分析结果影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

比如,在Spark SQL 中可以使用 where子 句过滤掉这些key,或者在Spark Core 中对执行filter算子过滤掉这些key。

解决方案三:提高shuffle操作的并行度

这种方案是处理数据倾斜最简单的一种方案。

在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。

增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,这5个key都是分配给一个task的。而增加了shuffle read task以后,每个 task 就分配到一个key,那么自然每个task的执行时间都会变短了。

该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理。

解决方案四:两阶段聚合(局部聚合+全局聚合)

这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

将原本相同的 key 通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task 处理的数据分散到多个 task 上去做局部聚合,进而解决单个task处理数据量过多的问题。

对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。但如果是join类的shuffle操作,还得用其他的解决方案。

解决方案五:将reduce join转为map join

在对RDD使用 join 类操作,发生了数据倾斜,而且join操作中的一个 RDD数据量比较小(比如几百M或者一两G),比较适用此方案。

不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着另外一个RDD从 Broadcast变量中获取较小RDD的全量数据,将两个RDD的数据连接起来。

普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join,此时就是 reduce join。但是如果一个RDD是比较小的,则可以采用广播小 RDD 全量数据 + map算子来实现与 join 同样的效果,也就是map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜。具体原理如下图所示。


image.png

这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果广播的RDD数据比较大就可能发生内存溢出了。因此并不适合两个都是大表的情况。

解决方案六:采样倾斜key并分拆join操作

两个RDD 进行 join的时候,如果数据量都比较大,如果 RDD 的少数几个 key 数据量很大,可以采取类似 解决方案四 的方式,对数据量大的 key 打上一个随机数,分成了若干个小数据量的 key,然后再进行 join。

如果导致倾斜的key特别多的话,这种方式也不适合。

解决方案七:使用随机前缀和扩容RDD进行join

如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

该方案的实现思路基本和“解决方案六”类似:对两个要 join 的RDD 的每条数据,都进行 n倍扩容(每条数据 随机打上 0~n 的前缀)。

这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。

解决方案八:多种方案组合使用

在实践中发现,很多情况下,如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方案组合起来使用。

比如说,我们针对出现了多个数据倾斜环节的Spark作业,可以先运用解决方案一和二,预处理一部分数据,并过滤一部分数据来缓解;其次可以对某些shuffle操作提升并行度,优化其性能;最后还可以针对不同的聚合或join操作,选择一种方案来优化其性能。

大家需要对这些方案的思路和原理都透彻理解之后,在实践中根据各种不同的情况,灵活运用多种方案,来解决自己的数据倾斜问题。

2. shuffle调优

大多数Spark作业的性能主要就是消耗在了 shuffle 环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。但是也必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。因此大家务必把握住调优的基本原则,千万不要舍本逐末。下面我们就给大家详细讲解shuffle的原理:Spark Shuffle,以及相关参数的说明,同时给出各个参数的调优建议。

2.1 shuffle相关参数调优

以下是Shffule过程中的一些主要参数,这里详细讲解了各个参数的功能、默认值以及基于实践经验给出的调优建议。

spark.shuffle.file.buffer
  • 默认值:32k
  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.reducer.maxSizeInFlight
  • 默认值:48m
  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.shuffle.io.maxRetries
  • 默认值:3
  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
  • 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性
spark.shuffle.io.retryWait
  • 默认值:5s
  • 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性(时间有点长吧。。)。
spark.shuffle.memoryFraction
  • 默认值:0.2
  • 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
spark.shuffle.manager
  • 默认值:sort
  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
  • 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。
spark.shuffle.sort.bypassMergeThreshold
  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。
spark.shuffle.consolidateFiles
  • 默认值:false
  • 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
  • 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

参考

https://tech.meituan.com/spark-tuning-pro.html

相关文章

  • spark性能调优

    [Spark性能优化指南——基础篇][Spark性能优化指南——高级篇]

  • 美团关于大数据技术的文章

    Spark性能优化指南——基础篇Spark性能优化指南——高级篇Spark在美团的实践Kafka文件存储机制那些事...

  • Spark性能调优

    《Spark性能优化:开发调优篇》《Spark性能优化:资源调优篇》《Spark性能优化:数据倾斜调优》《Spar...

  • Spark性能优化:数据倾斜调优(转)

    《Spark性能优化:开发调优篇》《Spark性能优化:资源调优篇》《Spark性能优化:数据倾斜调优》《Spar...

  • Spark性能优化:开发调优篇(转)

    《Spark性能优化:开发调优篇》《Spark性能优化:资源调优篇》《Spark性能优化:数据倾斜调优》《Spar...

  • Spark性能优化:资源调优篇(转)

    《Spark性能优化:开发调优篇》《Spark性能优化:资源调优篇》《Spark性能优化:数据倾斜调优》《Spar...

  • Awesome Extra

    性能优化 性能优化模式 常见性能优化策略的总结 Spark 性能优化指南——基础篇 Spark 性能优化指南——高...

  • 目录

    Spark之参数介绍 Spark之性能优化2.1. 官方性能优化指南2.2. Spark性能优化指南——基础篇2....

  • Spark 性能优化 高级篇

    本篇文章的上篇是:Spark 性能调优 基础篇 1. 数据倾斜调优 有的时候,我们可能会遇到大数据计算中一个最棘手...

  • Spark性能优化篇三:数据倾斜调优

    前言 继Spark性能优化篇二: 开发调优和Spark性能优化篇一:资源调优讲解了每个Spark开发人员都必须熟...

网友评论

      本文标题:Spark 性能优化 高级篇

      本文链接:https://www.haomeiwen.com/subject/mowmdftx.html