美文网首页大数据相关sparkSpark & Flink
Spark 性能调优--开发阶段

Spark 性能调优--开发阶段

作者: Alex90 | 来源:发表于2018-01-05 14:44 被阅读31次

    开发阶段调优

    原则一:避免创建重复的RDD

    一个Spark作业时,首先是基于某个数据源创建一个初始的RDD,接着对这个RDD执行某个算子操作,得到下一个RDD;以此类推,循环往复,直到计算出最终我们需要的结果。在这个过程中,多个RDD会通过不同的算子操作(比如map、reduce等)串起来,就是RDD lineage,也就是“RDD的血缘关系链”

    对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据

    val rdd1 = sc.textFile("hdfs://master_ip:9000/hello.txt")
    rdd1.map(...)
    val rdd2 = sc.textFile("hdfs://master_ip:9000/hello.txt")
    rdd2.reduce(...)
    
    val rdd1 = sc.textFile("hdfs://master_ip:9000/hello.txt")
    rdd1.map(...)
    rdd1.reduce(...)
    

    这种写法很明显比上一种写法要好多了,对于同一份数据只创建了一个RDD,然后对这一个RDD执行了多次算子操作,但是由于rdd1被执行了两次算子操作,第二次执行reduce操作的时候,还会再次从源头处重新计算一次rdd1的数据,因此还是会有重复计算的性能开销



    原则二:尽可能复用同一个RDD

    除了要避免对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用一个RDD。

    比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数

    // 有一个<Long, String>格式的RDD,即rdd1。
    // 对rdd1执行了一个map操作,创建了一个rdd2,rdd2是rdd1的子集
    JavaPairRDD<Long, String> rdd1 = ...
    JavaRDD<String> rdd2 = rdd1.map(...)
    // 分别对rdd1和rdd2执行了不同的算子操作。
    rdd1.reduceByKey(...)
    rdd2.map(...)
    
    JavaPairRDD<Long, String> rdd1 = ...
    rdd1.reduceByKey(...)
    rdd1.map(tuple._2...)
    



    原则三:对多次使用的RDD进行持久化

    尽可能复用RDD,在这个基础之上,进行第二步优化,保证对一个RDD执行多次算子操作时,这个RDD本身仅仅被计算一次

    对于一个RDD执行多次算子的默认原理是这样的:
    每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的
    对于这种情况,对多次使用的RDD进行持久化。Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子

    // 如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。
    // cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。
    // 此时再对rdd1执行两次算子操作时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。
    // 第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。
    val rdd1 = sc.textFile("hdfs://master_ip:9000/hello.txt").cache()
    rdd1.map(...)
    rdd1.reduce(...)
    
    // persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。
    // 序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁GC。
    val rdd1 = sc.textFile("hdfs://master_ip:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
    

    持久化级别



    原则四:尽量减少使用shuffle类算子

    Spark作业运行过程中,最消耗性能的地方就是shuffle过程,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作

    shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key,而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中

    因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作,也是shuffle性能较差的主要原因。

    // 传统的join操作会导致shuffle操作。
    // 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
    val rdd3 = rdd1.join(rdd2)
    
    // Broadcast+map的join操作,不会导致shuffle操作
    // 使用Broadcast将一个数据量较小的RDD作为广播变量。
    val rdd2Data = rdd2.collect()
    val rdd2DataBroadcast = sc.broadcast(rdd2Data)
    // 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据
    // 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
    // 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
    val rdd3 = rdd1.map(rdd2DataBroadcast...)
    
    // 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
    // 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
    



    原则五:使用map-side预聚合的shuffle操作

    无法用map类的算子来替代shuffle操作,那么尽量使用可以map-side预聚合的算子

    每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了,其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销

    通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合



    原则六:使用高性能的算子

    使用 reduceByKey / aggregateByKey 替代 groupByKey

    使用 mapPartitions 替代普通 map ,有的时候使用 mapPartitions 会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个 partition 所有的数据

    使用 foreachPartitions 替代 foreach ,对于写数据库操作,能够减少连接数

    使用 filter 之后进行 coalesce 操作,对一个RDD执行 filter 算子过滤掉RDD中较多数据后,建议使用 coalesce 算子,手动减少RDD的partition数量,因为 filter
    之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费

    使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操作,官方建议,如果需要在 repartition 重分区之后,还要进行排序,建议直接使用
    repartitionAndSortWithinPartitions 算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序



    原则七:广播大变量

    有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能

    在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存

    如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中



    原则八:使用Kryo优化序列化性能

    在Spark中,主要有三个地方涉及到了序列化
    - 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输
    - 自定义的类型作为RDD的泛型类型时
    - 使用可序列化的持久化策略时

    对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能



    原则九:优化数据结构

    有三种类型比较耗费内存
    - 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间
    - 字符串,每个字符串内部都有一个字符数组以及长度等额外信息
    - 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry

    算子函数中的代码,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用

    相关文章

      网友评论

        本文标题:Spark 性能调优--开发阶段

        本文链接:https://www.haomeiwen.com/subject/kbfsnxtx.html