美文网首页
spark3.x 生产调优笔记

spark3.x 生产调优笔记

作者: 架构师老狼 | 来源:发表于2021-11-05 16:52 被阅读0次

1 spark sql写入mysql非常慢

  • 有这样一个业务场景:需要将通过Spark处理之后的数据写入MySQL,并在在网页端进行可视化输出。Spark处理之后有大概40万条数据,写入MySQL却要耗费将近30分钟,这也太慢了!
  • 后来翻看了Spark向JDBC数据源写数据的那部分源码,虽然源码中的实现使用的确实是 PreparedStatement 的addBatch()方法和executeBatch()方法,但是我们再去翻看executeBatch()方法的实现后发现,它并不是每次执行一批插入,而是循环的去执行每条insert插入语句,这就造成只插入一条数据,而不是一批数据,导致大多数的时间都耗费在了与数据库的交互连接上了
  • 解决方法:
jdbc.saas.url=jdbc:mysql://172.25.1.*/saas-hospital?characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true

2 spark sql jdbc并发分区

  • jdbcDF.rdd.partitions.size # 结果返回 1。该操作的并发度为1,你所有的数据都会在一个partition中进行操作,意味着无论你给的资源有多少,只有一个task会执行任务,执行效率可想而之,并且在稍微大点的表中进行操作分分钟就会OOM。
def jdbc(
  url: String,
  table: String,
  columnName: String,    # 根据该字段分区,需要为整形,比如id等
  lowerBound: Long,      # 分区的下界
  upperBound: Long,      # 分区的上界
  numPartitions: Int,    # 分区的个数
  connectionProperties: Properties): DataFrame

# 指定字段区间分区
val predicates =
    Array(
      "2015-09-16" -> "2015-09-30",
      "2015-10-01" -> "2015-10-15",
      "2015-10-16" -> "2015-10-31",
      "2015-11-01" -> "2015-11-14",
      "2015-11-15" -> "2015-11-30",
      "2015-12-01" -> "2015-12-15"
    ).map {
      case (start, end) =>
        s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time as date) <= date '$end'"
    }

// 取得该表数据
val jdbcDF = sqlContext.read.jdbc(url,tableName,predicates,prop)

3 SparkSQL与Parquet格式兼容性

  • spark.sql.parquet.writeLegacyFormat 默认是false。如果设置为true 数据会以spark 1.4和更早的版本的格式写入。比如,decimal类型的值会被以apache parquet的fixed-length byte array格式写出,该格式是其他系统例如hive,impala等使用的。如果是false,会使用parquet的新版格式。例如,decimals会以int-based格式写出。

4 RDD复用并序列化存储

  • 必须对多次使用的 RDD 进行持久化,通过持久化将公共 RDD 的数据
    缓存到内存/磁盘中,之后对于公共 RDD 的计算都会从内存/磁盘中直接获取 RDD 数据
  • RDD 的持久化是可以进行序列化的,当内存无法将 RDD 的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中
    public static final StorageLevel MEMORY_ONLY_SER_2 
    public static final StorageLevel MEMORY_AND_DISK_SER_2 

5 RDD合理设置并行度

  • Spark 官方推荐,task 数量应该设置为 Spark 作业总 CPU core 数量的 2~3 倍。之所以没有推荐 task 数量与 CPU core 总数相等,是因为 task 的执行时间不同,有的 task 执行速度快而有的 task 执行速度慢,如果 task 数量与 CPU core 总数相等,那么执行快的 task 执行完成后,会出现 CPU core 空闲的情况。如果 task 数量设置为 CPU core 总数的 2~3 倍,那么一个task 执行完毕后,CPU core 会立刻执行下一个 task,降低了资源的浪费,同时提升了 Spark作业运行的效率。
val conf = new SparkConf().set("spark.default.parallelism", "500")

6 广播大变量

  • 默认情况下,task 中的算子中如果使用了外部的变量,每个 task 都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对 RDD 进行持久化,可能就无法将 RDD数据存入内存,只能写入磁盘,磁盘 IO 将会严重消耗性能;另一方面,task 在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的 GC,GC 会导致工作线程停止,进而导致 Spark 暂停工作一段时间,严重影响 Spark 性能。

7 Kryo 序列化

  • Kryo 序列化机制比 Java 序列化机制性能提高 10 倍左右,Spark 之所以没有默认使用
    Kryo 作为序列化类库,是因为它不支持所有对象的序列化,同时 Kryo 需要用户在使用前注
    册需要序列化的类型,不够方便,但从 Spark 2.0.0 版本开始,简单类型、简单类型数组、字
    符串类型的 Shuffling RDDs 已经默认使用 Kryo 序列化方式了
public class MyKryoRegistrator implements KryoRegistrator {
     @Override
     public void registerClasses(Kryo kryo){
        kryo.register(StartupReportLogs.class);
     }
 }

//创建 SparkConf 对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用 Kryo 序列化库,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 
//在 Kryo 序列化库中注册自定义的类集合,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", "MyKryoRegistrator");

8 foreachPartition 优化数据库操作

  • 在生产环境中,通常使用 foreachPartition 算子来完成数据库的写入,通过 foreachPartition
    算子的特性,可以优化写数据库的性能。
  • 对于我们写的 function 函数,一次处理一整个分区的数据;
  • 对于一个分区内的数据,创建唯一的数据库连接;
  • 只需要向数据库发送一次 SQL 语句和多组参数;

9 repartition 解决 SparkSQL 低并行度问题

  • Spark SQL 的并行度不允许用户自己指定,Spark SQL 自己会默认根据 hive 表对应的
    HDFS 文件的 split 个数自动设置 Spark SQL 所在的那个 stage 的并行度,用户自己通
    spark.default.parallelism 参数指定的并行度,只会在没 Spark SQL 的 stage 中生效。
  • Spark SQL 这一步的并行度和 task 数量肯定是没有办法去改变了,但是,对于
    Spark SQL 查询出来的 RDD,立即使用 repartition 算子,去重新进行分区,这样可
    以重新分区为多个 partition,从 repartition 之后的 RDD 操作,由于不再设计 Spark
    SQL,因此 stage 的并行度就会等于你手动设置的值,这样就避免了 Spark SQL 所在
    的 stage 只能用少量的 task 去处理大量数据并执行复杂的算法逻辑。


    spark sql重分区

10 reduceByKey 预聚合

  • 本地聚合后,在 map 端的数据量变少,减少了磁盘 IO,也减少了对磁盘空间的占用;
  • 本地聚合后,下一个 stage 拉取的数据量变少,减少了网络传输的数据量;
  • 本地聚合后,在 reduce 端进行数据缓存的内存占用减少;
  • 本地聚合后,在 reduce 端进行聚合的数据量减少。
  • 基于 reduceByKey 的本地聚合特征,我们应该考虑使用 reduceByKey 代替其他的 shuffle 算
    子,例如 groupByKey。

11 故障:shuffle file not found

  • 原因:Shuffle 操作中,后面 stage 的 task 想要去上一个 stage 的
    task 所在的 Executor 拉取数据,结果对方正在执行 GC,执行 GC 会导致 Executor 内所有的工作现场全部停止,比如 BlockManager、基于 netty 的网络通信等,这就会导致后面的 task拉取数据拉取了半天都没有拉取到,就会报出 shuffle file not found 的错误,而第二次再次执行就不会再出现这种错误。
# 调整 reduce 端拉取数据重试次数和 reduce 端拉取数据时间间隔这两个参数
val conf = new SparkConf()
 .set("spark.shuffle.io.maxRetries", "60")
 .set("spark.shuffle.io.retryWait", "60s")

12 故障:OOM

  • Shuffle map端:map 端缓冲的默认配置是 32KB,如果每个 task 处理 640KB 的数据,那么会发生 640/32 = 20 次溢写,如果每个 task 处理 64000KB 的数据,机会发生 64000/32=2000 此溢写,这对于性能的影响是非常严重的。
val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")
  • shuffle reduce端: reduce task 的 buffer 缓冲区大小决定了 reduce task 每次能够缓冲的数据量,也就是每次能够拉取的数据量。reduce 端数据拉取缓冲区的大小可以通过 park.reducer.maxSizeInFlight 参数进行设置,默认为 48MB
val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96")

13 Executor 堆外内存故障:OOM与task lost

  • Spark 作业处理的数据量非常大,达到几亿的数据量,此时运行 Spark作业会时不时地报错,例如 shuffle output file cannot find,executor lost,task lost,out of memory等,这可能是 Executor 的堆外内存不太够用,导致 Executor 在运行的过程中内存溢出。
  • Executor 的堆外内存主要用于程序的共享库、Perm Space、 线程 Stack 和一些 Memory mapping 等, 或者类 C 方式 allocate object。
  • stage 的 task 在运行的时候,可能要从一些 Executor 中去拉取 shuffle map output 文件,但是 Executor 可能已经由于内存溢出挂掉了,其关联的 BlockManager 也没有了,这就可能会报出 shuffle output file cannot find,executor lost,task lost,out of memory 等错误
# Executor 堆外内存的配置需要在 spark-submit :
--conf spark.yarn.executor.memoryOverhead=2048

14 spark数据倾斜

1. 数据倾斜的表现

  • Spark 作业的大部分 task 都执行迅速,只有有限的几个 task 执行的非常慢,此时可能出
    现了数据倾斜,作业可以运行,但是运行得非常慢;
  • Spark 作业的大部分 task 都执行迅速,但是有的 task 在运行过程中会突然报出 OOM,
    反复执行几次都在某一个 task 报出 OOM 错误,此时可能出现了数据倾斜,作业无法正
    常运行。

2. 定位数据倾斜问题

  • 查阅代码中的 shuffle 算子,例如 reduceByKey、countByKey、groupByKey、join 等算子,根据代码逻辑判断此处是否会出现数据倾斜;
  • 查看 Spark 作业的 log 文件,log 文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个 stage,对应的 shuffle 算子是哪一个;

3. 解决方法1 - 过滤产生数据倾斜的key

  • 在 Spark 作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的 key 进行
    过滤,滤除可能导致数据倾斜的 key 对应的数据

4. 解决方法2 - 聚合原始数据

  • 如果 Spark 作业的数据来源于 Hive 表,那么可以先在 Hive 表中对数据进行聚合,例如按照 key 进行分组,将同一 key 对应的所有 value 用一种特殊的格式拼接到一个字符串里去, 尚硅谷大数据技术之 Spark 优化 这样,一个 key 就只有一条数据了;之后,对一个 key 的所有 value 进行处理时,只需要进行 map 操作即可,无需再进行任何的 shuffle 操作。通过上述方式就避免了执行 shuffle 操作,也就不可能会发生任何的数据倾斜问题。

5. 解决方法3 - 提高 shuffle 操作中的 reduce 并行度

  • 在大部分的 shuffle 算子中,都可以传入一个并行度的设置参数,比如 reduceByKey(500),这个参数会决定 shuffle 过程中 reduce 端的并行度,在进行 shuffle 操作的时候,就会对应着
    创建指定数量的 reduce task。对于 Spark SQL 中的 shuffle 类语句,比如 group by、join 等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了 shuffle read task 的并行度,该值默认是 200,对于很多场景来说都有点过小。

6. 解决方法4 - 聚合算子:使用随机 key 实现双重聚合

  • 首先,通过 map 算子给每个数据的 key 添加随机数前缀,对 key 进行打散,将原先一
    样的 key 变成不一样的 key,然后进行第一次聚合,这样就可以让原本被一个 task 处理的数
    据分散到多个 task 上去做局部聚合;随后,去除掉每个 key 的前缀,再次进行聚合。

7. 解决方法5 - join算子:将 reduce join 转换为 map join

  • 将较小 RDD 中的数据直接通过 collect 算子拉取到 Driver 端的内存中来,然后对其创建一个 Broadcast 变量;接着对另外一个 RDD 执行 map 类算子,在算子函数内,从 Broadcast 变量中获取较小 RDD 的全量数据,与当前 RDD 的每一条数据按照连接 key 进行比对,如果连接 key 相同的话,那么就将两个 RDD 的数据用你需要的方式连接起来。

8. 解决方法6 - join算子:sample 采样对倾斜 key 单独进行 join

  • 当数据量非常大时,可以考虑使用 sample 采样获取 10%的数据,然后分析这 10%的数
    据中哪个 key 可能会导致数据倾斜,然后将这个 key 对应的数据单独提取出来。

9. 解决方法7 - join算子:使用随机数扩容进行 join

  • 我们会将原先一样的 key 通过附加随机前缀变成不一样的 key,然后就可以将这些处理
    后的“不同 key”分散到多个 task 中去处理,而不是让一个 task 处理大量的相同 key。这一种
    方案是针对有大量倾斜 key 的情况,没法将部分 key 拆分出来进行单独处理,需要对整个
    RDD 进行数据扩容,对内存资源要求很高。

相关文章

  • spark3.x 生产调优笔记

    1 spark sql写入mysql非常慢 有这样一个业务场景:需要将通过Spark处理之后的数据写入MySQL,...

  • 部门挖来了月薪80K的京东大佬,总结了堪称完美的Mysql调优笔

    因笔记内容笔记全面,篇幅过长,用以截图展示。 前文 笔记分为2个小节,分别为: 性能调优 架构设计 性能调优 影响...

  • JVM调优

    1 调优层次 性能调优包含多个层次,比如:架构调优、代码调优、JVM调优、数据库调优、操作系统调优等。架构调优和代...

  • Kafka 生产者调优

    讲解样例代码中的参数含义: buffer.memory 参数是什么意思? Kafka的客户端发送数据到服务器,一般...

  • InfluxDB生产环境配置调优

    不废话,直接上干货,本配置试用于InfluxDB version 1.6 vim /etc/influxdb/in...

  • Twitter 工程师谈 JVM 调优

    一. 调优需要关注的几个方面 内存调优 CPU 使用调优 锁竞争调优 I/O 调优 二. Twitter 最大的敌...

  • Spark性能优化-开发调优

    Spark性能优化分为四个方面: 1、开发调优2、资源调优3、数据倾斜调优4、shuffle调优 1. 开发调优 ...

  • Spark性能优化-资源调优

    Spark性能优化分为四个方面: 1、开发调优2、资源调优3、数据倾斜调优4、shuffle调优 资源调优 num...

  • 谈谈性能调优思路

    声明:本文为学习总结篇,参考资料见文末,如有侵权请联系作者,调优实践总结篇可参考以往文章:JVM学习笔记与调优实战...

  • Spark性能优化-数据倾斜调优

    Spark性能优化分为四个方面: 1、开发调优2、资源调优3、数据倾斜调优4、shuffle调优 数据倾斜调优 1...

网友评论

      本文标题:spark3.x 生产调优笔记

      本文链接:https://www.haomeiwen.com/subject/teprzltx.html