调优目的
1、提高cpu利用率
2、缩短作业执行时间
详细调优步骤
1、增加资源分配
在一定范围内,增加资源和性能提升成正比,给足资源,再考虑其它优化步骤
executor、cpu per executor、memory per executor、driver memory
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ 配置executor的数量
--driver-memory 100m \ 配置driver的内存(影响不大)
--executor-memory 100m \ 配置每个executor的内存大小
--executor-cores 3 \ 配置每个executor的cpu core数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
2、调整并行度
cpu核心数的2-3倍
spark.default.parallelism
SparkConf conf = new SparkConf()
.set("spark.default.parallelism", "500")
3、RDD 重用和优化
对于重复使用的RDD一定要进行持久化(可以使用序列化方式,更加节省内存)
注意:一般spark作业经过以上三步(三板斧)优化性能已经得到大幅度提升,没有经过任何调优手段的spark作业,16个小时,就可以到5个小时
4、广播大变量
举例来说,(虽然是举例,但是基本都是用我们实际在企业中用的生产环境中的配置和经验来说明的)。50个executor,1000个task。一个map,10M。
默认情况下,1000个task,1000份副本。10G的数据,网络传输,在集群中,耗费10G的内存资源。
如果使用了广播变量。50个execurtor,50个副本。500M的数据,网络传输,而且不一定都是从Driver传输到每个节点,还可能是就近从最近的节点的executor的bockmanager上拉取变量副本,网络传输速度大大增加;500M的内存消耗。
10000M,500M,20倍。20倍~以上的网络传输性能消耗的降低;20倍的内存消耗的减少。对性能的提升和影响,还是很客观的。
- 比如运行30分钟的spark作业,可能做了广播变量以后,速度快了2分钟,或者5分钟
5、fastutil调优
fastutil其实没有你想象中的那么强大,也不会跟官网上说的效果那么一鸣惊人。广播变量、Kryo序列化类库、fastutil,都是之前所说的,对于性能来说,类似于一种调味品,烤鸡,本来就很好吃了,然后加了一点特质的孜然麻辣粉调料,就更加好吃了一点。分配资源、并行度、RDD架构与持久化,这三个就是烤鸡;broadcast、kryo、fastutil,类似于调料。
6、kyro序列化
Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。
所以Kryo序列化优化以后,可以让网络传输的数据变少;在集群中耗费的内存资源大大减少。
在spark中会自动启用
1、算子函数中使用到的外部变量
2、持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER
3、shuffle
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
7、数据本地化
观察日志,spark作业的运行日志,推荐大家在测试的时候,先用client模式,在本地就直接可以看到比较全的日志。
日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL
观察大部分task的数据本地化级别
如果大多都是PROCESS_LOCAL,那就不用调节了
如果是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长
调节完,应该是要反复调节,每次调节完以后,再来运行,观察日志
看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短
别本末倒置,本地化级别倒是提升了,但是因为大量的等待时长,spark作业的运行时间反而增加了,那就还是不要调节了
new SparkConf()
.set("spark.locality.wait", "10")
shuffle调优
JVM调优
算子调优
1、filter算子之后使用coalesce算子对数据进行风区数据合并,使数据尽量均匀紧凑
数据倾斜产生的可能原因之一
例如:第二个partition的数据量才100;但是第三个partition的数据量是900;那么在后面的task处理逻辑一样的情况下,不同的task要处理的数据量可能差别达到了9倍,甚至10倍以上;同样也就导致了速度的差别在9倍,甚至10倍以上。
coalesce(numPartitions: Int, shuffle: Boolean = false)
2、map->MapPartitions
如果是普通的map,比如一个partition中有1万条数据;ok,那么你的function要执行和计算1万次。
但是,使用MapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。
可能会造成OOM, 100万数据,一次传入一个function以后,那么可能一下子内存不够
3、groupByKey->reduceByKey
reduceByKey,相较于普通的shuffle操作(比如groupByKey),会进行map端的本地聚合。
4、foreach->foreachpartition
原理同2
5、spark sql 并行度底使用repartition进行调整
网友评论