1.RDD重新分区
针对大量小分区的RDD,使用RDD重分区函数coalesce将小分区合并成大分区;同样当分区数据量过大也可以使用重新分区,增加分区数量,提高并行计算能力。
2.并行度
通过配置和代码来设置task数量。task数量太多导致task的启动和切换开销;task数量太小,无法利用并行计算的能力。
3.DAG优化
a.一个Stage尽量容纳更多的算子,减少Shuffle的发生;
b.将经常用到的数据缓存到内存,然后复用已经cache的数据。
4.倾斜问题
倾斜问题是指在个别分区上,执行时间过长,最长时长远大于平均时长。
数据倾斜(比如分区key或者分区函数设计的不好),task倾斜(数据倾斜导致任务倾斜),task执行速度倾斜(解决方式:去掉执行过长的task任务节点,重新分配调度)。
解决方式:
4.1.增大task数量,减少每个分区的数据量;
4.2.对特殊的key进行处理,如空值映射为特定key,分发到不同节点进行特殊处理;
4.3.将数据量大的表划分成多个小表,分成多个阶段进行;
4.4.拆分RDD:将倾斜数据与元数据分离,分成2个job进行计算。
4.5.优化聚集操作。
5.JVM调优
5.1 设计好的数据结构,避免使用链式集合;
5.2 减少对象嵌套;
5.3 尽量使用数字ID或者枚举对象作为key,尽量避免使用字符串;
5.4 JVM的GC调优;
5.5 磁盘临时空间优化,当执行Shuffle或者内存不够缓存RDD时,RDD的数据会写到磁盘临时目录中;
6.网络传输优化
6.1 增大task的分发缓存大小,避免过大的task造成缓存区溢出;
6.2 小数据量表数据直接广播到各个节点上,这样该节点上的task共享广播数据;
6.3 Collect结果过大时要使用序列化,因为collect函数将每个分区变为数组,返回主节点,并将所有分区的数组合并成一个分组。
7.序列化和压缩
7.1 序列化存储RDD,官方推荐kyro序列化方式;
7.2 压缩存储RDD,spark支持LZF和snappy两种压缩方式。
8.方法的优化
8.1 尽量避免使用reduce方法,而使用reduceByKey,因为reduce是Action算子,是聚集函数,会导致结果汇聚到主节点;而reduceByKey是transformation算子,使计算变成分布式计算。
8.2 针对shuffle操作(如sortByKey,groupByKey,reduceByKey,join等),可以通过增加并行度,这样每个task处理的数据量就会相对减少,占用内存就会减少,避免OutOfMemory产生。
网友评论