参考:
how-to-tune-your-apache-spark-jobs-part-1
how-to-tune-your-apache-spark-jobs-part-2
tuning_spark_streaming
Spark Streaming性能调优详解
Spark性能优化:shuffle调优
Spark性能优化:数据倾斜调优
Spark性能优化:开发调优篇
top-5-mistakes-when-writing-spark-applications 强力推荐
一 基础说明
- job-->stage-->task
job划分为stage,stage划分为Task,一个Task运行在一个core上 - executor-->core
The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage.
二 Tuning Resource Allocation
Spark应用的GC调优 -->重点讲解了G1垃圾回收器的调优工作
Spark性能优化:资源调优篇
Every Spark executor in an application has the same fixed number of cores and same fixed heap size.
--executor-cores/ spark.executor.cores 提交时通过该参数设置每个executor的core数量,决定了Task的并行度
--executor-memory/spark.executor.memory 设置executor的JVM memory
--num-executors/spark.executor.instances 设置executor的数量
spark.dynamicAllocation.enabled 设置动态申请资源(value设为true),此时不要设置num-executors
spark.yarn.executor.memoryOverhead 设置堆外的memory大小
spark.dynamicAllocation.enabled
executor空闲超时后,会被移除
对于Spark Streaming,数据按时间段到达,为了防止executor频繁出现添加移除现象,应该禁用该功能。
内存格局
spark-memory.png说明:
-
The application master, which is a non-executor container with the special capability of requesting containers from YARN, takes up resources of its own that must be budgeted in. In yarn-client mode, it defaults to a 1024MB and one vcore. In yarn-cluster mode, the application master runs the driver, so it’s often useful to bolster its resources with the --driver-memory and --driver-cores properties.
-
Running executors with too much memory often results in excessive garbage collection delays. 64GB is a rough guess at a good upper limit for a single executor.最多4G内存,防止GC压力过大。
-
I’ve noticed that the HDFS client has trouble with tons of concurrent threads. A rough guess is that at most five tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor below that number. 最多5个Task可以同时达到最高的HDFS写入带宽
-
Running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM. For example, broadcast variables need to be replicated once on each executor, so many small executors will result in many more copies of the data.
注意事项:
保留内存和core给hadoop ,yarn等系统运行
Slimming Down Your Data Structures
定制序列化方法,减少序列化后的存储占用
spark.serializer=org.apache.spark.serializer.KryoSerializer
三 Tuning Parallelism
分区过少时,Task数量有限,无法充分利用机器资源。
方法:
- Use the repartition transformation, which will trigger a shuffle.
- Configure your InputFormat to create more splits.
- Write the input data out to HDFS with a smaller block size.
3.1 参数spark.default.parallelism
参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。
减少shuffle以及shuffle的数据量
-
操作repartition , join, cogroup, and any of the *By or *ByKey transformations can result in shuffles.
-
Avoid groupByKey when performing an associative reductive operation. For example, rdd.groupByKey().mapValues(_.sum) will produce the same results as rdd.reduceByKey(_ + _)
reduce_by.png
However, the former will transfer the entire dataset across the network, while the latter will compute local sums for each key in each partition and combine those local sums into larger sums after shuffling.
group_by.png
以下函数应该优先于 groupByKey :
- combineByKey组合数据,但是组合之后的数据类型与输入时值的类型不一样。
- foldByKey 合并每一个 key 的所有值,在级联函数和“零值”中使用。
- Avoid reduceByKey When the input and output value types are different.
rdd.map(kv => (kv._1, new Set[String]() + kv._2))
.reduceByKey(_ ++ _)
This code results in tons of unnecessary object creation because a new set must be allocated for each record. It’s better to use aggregateByKey, which performs the map-side aggregation more efficiently:
val zero = new collection.mutable.Set[String]()
rdd.aggregateByKey(zero)(
(set, v) => set += v,
(set1, set2) => set1 ++= set2)
- Avoid the
flatMap-join-groupBy
pattern. When two datasets are already grouped by key and you want to join them and keep them grouped, you can just usecogroup
. That avoids all the overhead associated with unpacking and repacking the groups. join数据源时直接使用cogroup
四 shuffle不发生的情况
- 两个数据源进行join时,已经进行group分组后,如果分组时使用的是同样的partitioner,那么进行join时是不需要进行shuffle的。
- 当数据量较少时,使用广播变量,不需要shuffle
When More Shuffles are Better
当数据partition较少,数据量较大时,进行shuffle可以提高partition数量,提高并行度,从而达到提高效率的目的。
五 RDD
- 原则一:避免创建重复的RDD
- 原则二:尽可能复用同一个RDD
- 原则三:对多次使用的RDD进行持久化 cache persist
- 原则四:尽量避免使用shuffle类算子 广播大变量
- 原则五:使用map-side预聚合的shuffle操作
- 原则六:使用高性能的算子
- 使用reduceByKey/aggregateByKey替代groupByKey
- 使用mapPartitions替代普通map(mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。)
- 使用foreachPartitions替代foreach(一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据)
- 使用filter之后进行coalesce操作(通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。)
- 原则七:广播大变量
- 原则八:使用Kryo优化序列化性能
- 原则九:优化数据结构
5.1 不要将大型RDD中所有元素发送到Driver端
慎重使用collect countByKey countByValue collectAsMap
等函数,使用take或者takeSample
来限制数据大小的上限
六 其他
6.1 Spark优化:禁止应用程序将依赖的Jar包传到HDFS
Spark优化:禁止应用程序将依赖的Jar包传到HDFS
编辑spark-default.conf文件,添加以下内容:
spark.yarn.jar=hdfs://my/home/iteblog/spark_lib/spark-assembly-1.1.0-hadoop2.2.0.jar
也就是使得spark.yarn.jar指向我们HDFS上的Spark lib库。
网友评论