美文网首页大数据
Spark应用调优

Spark应用调优

作者: GoofyWang | 来源:发表于2018-08-08 13:32 被阅读62次

调整Spark应用

        由于Spark计算默认是在内存完成的,所有很多集群内的很多资源都会成为Spark应用的瓶颈:CPU,网络带宽,内存。一般情况下,如果数据都在内存里, 那么网络带宽是最常见的瓶颈,但有些时候,我们需要做些调整,比如RDD被序列化保存来降低内存使用量。这篇短文会包含两个主题:数据序列化(这可以优化网络使用量同时减少内存使用量),内存调整。当然还有一些小的零碎主题。

数据序列化

        分布式应用程序性能调优中数据序列化是一个很重要的角色。序列化行为会降低降低程序执行效率并提升空间消耗,同时也会降低计算耗时。调整序列化方式一般是在调整Spark应用时优先被考虑的选项。Spark希望在操作遍历性和性能之间取一个平衡,提供了两种序列化框架:

    1. Java序列化框架:默认情况下,Spark使用Java的ObjectOutputStream框架对数据做序列化和反序列化的工作,它可以支持所有实现了java.io.Serializable接口的类。我们也可以使用扩展的java.io.Externalizablek接口让性能更接近系统性能。Java的序列化框架很灵活,但也很慢,而且会导致大量的class的格式信息被存储了下来。

    2. Kryo序列化框架:Spark也支持Kryo框架(version 2)来更快的序列化。Kryp相比Java的序列化框架一般快10倍,但并不支持所有的Serializable类型,并且需要在使用的时候把需要做序列化的类注册一下。我们可以通过调用conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 把Spark应用的序列化框架切换到Kryo。这个配置除了会把shuffle阶段的数据使用Kryo之外,我们把RDD序列化到磁盘的时候也会用它来实现。Kryo不作为Spark默认的序列化框架的唯一原因是它需要用户在代码里手动配置可以使用Kryo框架序列化的类。但我们还是建议在有网络交互操作的时候使用这个框架。从Spark2.0开始,我们在对一些简单类型的RDD做Shuffle的时候,已经默认使用了Kryo框架,比如简单类型(Java8中基本类型),简单类型的数组,以及字符串类型。Spark自动对一些常用的scala类提供了Kryo框架支持。这些类包括在AllScalaRegistrar 中,它来自Twiiter chill框架把自己的类注册使用Kryo序列化,需要做下面的动作

     val conf = new SparkConf().setMaster(...).setAppName(...)

     conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

     val sc = new SparkContext(conf)

    Kryo框架的文档描述了更多高级的注册选项,比如添加自定义序列化代码。如果我们的类的对象很大,我们还需要提高spark.kryoserializer.buffer这个属性的配置。这个值需要足够的大到把最大的那个对象hold住才行。(听起来这东西是空间换时间)

    如果我们没用上面的代码去做registerKryoClasses去注册类,其实Kryo也可以正常工作,只是会为每个对象存储更多类信息这实在太浪费空间了。

调整内存

    调整内存有3个方面:我们的数据对象的内存占用(一般情况下我们肯定希望所有的数据都在内存里),操作这些数据的时间与空间开销,还有就是内存回收的消耗。一般情况下,Java数据对象操作都很快,但是一般会占用实际数据量的2-5倍的内存空间,原因主要是如下几点:

    1. 每个独立的Java对象都有一个所谓的“头”信息,头信息大概需要16分字节,保存了诸如这个对象所属类的指针。如果一个对象中只有少量的数据(比如就一个Int数据),那么实际上这类对象在内存中头信息就比对象本身代表的信息要大得多。Java的String包含了比字符串内存大概多了40个字节的数据(内部存储是char数组,然后还要有一些额外的信息比如长度之类的),并且Java中一个字符是两个字节(UTF-16)。所以一个10个字符的String可以轻松占60个字节内存。

    2. 常用的集合类,比如HashMap(这个人不了解集合类,Map没有实现Collection接口)或者LinkedList,这些类时数据对象的封装,每个封装后的对象都会有头信息,并且除此之外LinkedList还会有指向下一个元素的指针(一般8个字节)。这些集合类还经常使用基本类型的包装类,也会占更多的存储空间。

    这一节会开始讨论Spark的内存管理,然后会讨论些特殊场景下的策略来让我们的应用内存利用率更高。 尤其是我们会讨论如何确认我们的对象的内存占用情况,并且讨论如何改进我们的数据结构(或者是用序列化方式存储数据迹象)。所以会有调整Spark的cache size和Java内存回收方面的内容。

内存管理概览

    Spark对内存的使用一般会涉及到两个方面:执行和存储。执行内存指那些用来在shuffle、join、sort、aggregation操作做计算时使用的内存存储内存指用来缓存数据或者在集群各个节点间收发数据使用的内存。在Spark中,执行内存和存储内存使用同一个区域(M)。当不需要执行内存的时候,存储内存可以占所有内存,反之亦然。如果有必要,执行内存优先级会更高,当执行内存不够的时候,存储内存会被降低(数据应该是会刷到磁盘上),降低到存储内存占用量低于某个阈值(R)的时候。也就是说,M中必定有R这么大的区域是用来做存储的,不会被执行内存挤占。这个设计确认了如下几点:首先,不用缓存的Spark应用可以使用全部的空间来做执行(这个需要配置),这样也避免了写磁盘。其次使用缓存的Spark应用可以有一个最低的空间(R)来保存数据,且可以确认这部分不会被执行内存挤占。最后,这样也为大多数的Spark应用提供了默认的配置不需要用户知道内存管理的相关信息。有两个相关的配置项,一般用户无需关心。spark.memory.fraction 这个是M的大小,配置里这是个小数,Spark用来换算成百分数表示当前executor中,虚拟机内存的百分之多少用于配置给M。默认是0.6,剩下的40%用来存储用户数据结构,Spark的内部元数据,以及为不一些稀疏数据和大体积的记录预防OOM。spark.memory.storageFraction 这个就是R的大小,同样是个小数,Spark用来换算成百分数,表示M中有多大的空间用来保存数据,且这部分空间不可以被挤占。spark.memory.fraction设置的比例要和JVM中的老年代或者"tenured"区域匹配,避免所有数据老年代装不下(但总空间是够用的),但数据还在进JVM内存,从而造成的OOM。

确认内存消耗

    确认计算一个数据集需要多少内存最好的办法就是创建一个RDD,缓存到cache里,然后去WEB UI的 Storage页面去看一下,这个页面会告诉我们RDD 到底占了多少内存。评估某些特殊对象的内存消耗可以使用 SizeEstimator类的estimate方法。这个方法在我们尝试去缩小内存占用的时候很有用,广播变量占用内存的确认也可以使用这个方法。

调整数据结构

    节省内存的第一步是避免前面提到的Java中的某些特性,比如基于指针的数据结构(LinkedList)和一些包装类。下面是常用的办法:

        1. 设计的时候更多倾向使用数组和简单数据类型。fastutil 框架(http://fastutil.di.unimi.it/)提供了方便的集合类和基本类型,来替代Java原生的那些集合类和简单数据类型。

         2. 尽量避免嵌套数据结构,尤其是很多小对象的嵌套。

         3. KV类型数据中,尽量使用数值类型ID或者可迭代的对象做Key

        4. 如果每个work内存不到32G,设置JVM参数  -XX:+UseCompressedOops 让指针大小从8字节改为4字节。这个参数可以放到 spark-env.sh里,做为默认配置。

序列化存储RDD

        如果我们现在感觉数据对象还是太大了影响性能,一个简单的降低内存使用量办法,就是使用RDD持久化API的序列化方式来存储他们(比如MEMROY_ONLY_SER)。Spark会把RDD的每个partition存储一个大的字节数组。这样的操作为宜的缺点是慢,因为需要动态的反序列化每个对象。强烈建议使用Kryo框架,它的序列化结果比Java序列化框架的结果小很多,比原始的Java对象也小很多。

调整垃圾回收机制

        当我们的Spark应用超级大的时候,内存回收机制也有可能成为问题(简单的加载一下,然后来几个transformation再来个action一般没啥问题)。当JVM需要剔除旧对象来给新对象腾出空间的时候,它会遍历所有的Java对象找到没用的。这里需要记住的一点是:垃圾回收的时间消耗依赖于Java对象的数量,所以使用尽量少一些对象的数据结构(数组来替代LinkedList)会显著的降低GC消耗。更好的办法就是用序列化的方式持久化RDD,这样一个分区是一个大对象(字节数组) 垃圾回收的时候,一下就过去了。所以,在我们尝试调整GC之前,首先要想到的一个方案就是序列化存储机制。

度量GC的影响

        GC调整的第一步是要统计下GC的频率和总共花费在GC上的时间。这个信息可以通过添加 -verbose:gc -XX:+PrintGCDetails-XX:+PrintGCTimeStamps 这两个配置到Java 属性中得到。(查看配置向导页面可以得到更多关于Spark应用的Java配置信息)下次再执行Spark应用的时候,这些信息会被打印到Worker的log里。注意这些log是在集群中的worker节点上的(在stdout文件中),而不是在driver program中。

进阶GC技巧

        要调整GC参数,我们需要先知道一些关于JVM内存管理的基础知识。(关于内存分代这块我实在不想写了,随便百度下都能知道。)Spark中GC策略调整的目的是确保RDD中长期存储的数据进老年代,同时保证新生代有足够的空间存储“短命”对象。这样可以避免在Task执行时因为创建临时对象带来的出现FullGC。下面的信息可以有些帮助:

    1. 是否有太多的GC过程。如果在任务完成前出现了多次Full GC,说明当前Task分配的内存不够多。

    2. 是否有太多的Minor GC,但没有Full GC。这就需要给Eden空间多分点内存了。我们可以谁Eden空间设置一个比预估的总内存值要高的值。如果Eden空间大小为E,则  使用-Xmn=(4/3)*E,来设置新生代内存大小。

    3. 如果发现老年代使用率很高,可以降低spark.memory.fraction来减少缓存的内存量(这个降低了,对应的R也会降低),少缓存数据比频繁GC的效果要好。或者考虑减小-Xmn新生代大小,也可以调整JVM的 NewRatio参数达到减少新生代的目的。大多数JVM默认这个值是2,也就是说老年代会占堆内存的三分之二。把这个比值调大到比spark.memory.fraction计算结果大。

    4. 可以尝试下G1回收器, 设置的参数是-XX:+UseG1GC。G1回收器在某些场景下可以提升GC的性能。需要注意的是:executor分配了大量内存的话,增加G1Region Size十分重要,参数是-XX:G1HeapRegionSize。

    5. 当数据是从HDFS中读取到的时候,内存的占用量可以通过文件块大小算出来。注意从HDFS解压出来的文件块一般是文件块大小的2-3倍。所以如果有4个task,HDFS的文件块大小设置是128M,读到内存里的话就是4 * 3 * 128M我们的经验是建议GC的调整依赖的是Spark应用和申请的内存数量。所以更多的JVM调优经验可以在这里得到(官网上给的链接是这个http://www.oracle.com/technetwork/java/javase/gc-tuning-6-140523.html),但在高层次的降低Full GC频率可以帮助减少开销。GC调整的参数配置是在 spark.executor.extraJavaOptions 参数上配置的。

其他意见

并行度

        集群一般不会被占满,除非我们给所有的操作都设置了非常高的并行度。Spark根据每个文件的大小自动设置了maptask的并行度,而对于reduce类的操作,比如reducebykey、groupbykey之类的操作,因为要用一个RDD的多个Partition做shuffle,我们可以在调用这个方法的时候指定并行度。或者在spark-env.sh中指定spark.default.parallelism来设置一个默认的并行度。一般情况下,我们推荐一个CPU跑2-3个Task。

Reduce类任务的内存使用

        有些时候,我们会得到OOM异常,但这不是因为RDD太大了内存不够造成的,而是因为我们的某些Task做类似groupbykey的时候,这个数据太大了造成的。Spark的shuffle动作在每个task里构建了一个hash table,来执行分区操作,这个hash table经常比较大。最简单的办法就是提高并行度,让每个task的输入小一点。Spark可以很好的支持执行时间小于200毫秒的task,因为这些task会重用一个JVM上的executor,减少了task启动分配资源之类的工作(和线程池类似)

大的广播变量

    使用广播变量可以很大的降低应用程序的内存消耗,尤其是广播变量本身就比较大的时候。Spark会把每个task中的序列化结果的大小打印出来,所以我们可以 查看这个数值,判断task是否过大,一般情况下超过20Kb就值得优化了。

数据本地化

    数据本地化程度对Spark应用的调优很重要。如果数据和对数据的操作在一起(同一台机器上)的话,计算过程通常都会很快。但如果他们分开了,其中一个肯定要移动到另外一个所在的机器上。一般情况下,移动操作都比移动数据要开销更小。Spark根据数据本地化的一般原则来构建任务的调度过程。数据本地化程度是衡量数据与处理数据的代码的接近程度,根据数据的当前位置,有多个级别,从最近到最远的顺序是:

    1. PROCESS_LOCAL 数据与正在运行的代码在同一个JVM中,这个是最好的结果

    2. NODE_LOCAL 数据和运行的代码在同一台机器上

    3. NODE_PREF 可以从任意机器访问的高速存储中

    4. RACK_LOCAL 数据与运行代码在同一机架上(只过单个交换机)

    5. ANY 数据在网络上的其他位置上

    Spark更偏向在最佳位置执行所有任务,但这并非总是可以做到的。当空闲的executor都没有未执行的数据时,Spark会降低数据本地化级别。一般有两个选项:

    1. 一直等到数据所在的机器有CPU被释放出来,才去那里执行任务

    2. 在空闲的机器上立即启动一个task(同机架优先)

    一般Spark会尝试等待一小段时间,希望数据所在的机器的Executor上能有CPU释放出来。一旦超过了这个时间,Spark就是开始尝试移动数据到空闲的机器上了。各个级别间的等待时间可以分别被配置,或者是全都在一个参数里配置好。查看 spark.locality参数配置可以得到更多信息(http://spark.apache.org/docs/2.1.2/configuration.html#scheduling)当我们的Spark应用要跑很久或者本地资源有限的情况下,可以试着去增加这些参数的值,但是默认参数也是可以的。

小结

    这篇文章只是Spark应用调优相关信息的一部分,也是我们最应该知道的部分——数据序列化和内存调整。一般情况下,使用Kryo序列化框架对数据做序列化和持久化就可以解决大部分性能问题。其他调优问题可以发邮件去问下Spark开发人员(https://spark.apache.org/community.html)

相关文章

网友评论

    本文标题:Spark应用调优

    本文链接:https://www.haomeiwen.com/subject/rgntbftx.html