美文网首页
Spark调优

Spark调优

作者: 让我们荡起双桨呀 | 来源:发表于2020-03-18 10:18 被阅读0次

spark调优方面

数据序列化

序列化在任何分布式应用程序的性能中都起着重要的作用。将对象序列化为大量字节的格式将大大降低计算速度。因此,选择合适的序列化方式将是优化程序的第一件事。spark旨在在便利性(允许在操作中使用任何的java类型)和性能之间取得平衡,提供了两种序列化库。

Java序列化:默认情况下,spark允许使用javaObjectOutputStream框架序列化对象,并且可以与程序中创建的任何实现java.io.Serializable的类一起使用。还可以通过扩展java.io.Externalizable来更紧密地控制序列化的性能。java序列化是灵活的,但通常很慢,并导致许多大型序列化格式的类。

kryo序列化:spark还可以使用kryo库更快地序列化对象。kryojava序列化(通常高达10倍)明显更快,更紧凑,但不支持所有Serializable类型,并且要求提前注册在程序中使用的类以获得最佳性能。

通过使用sparkConf初始化作业并调用conf.set("spark.serializer", "org.apache.spark.serializer.kryoSerializer")来使用kryo序列化方式。该设置的序列化方式,不仅用于在work节点之间数据shuffle,还用于将RDD序列化到磁盘。kryo不是默认的唯一原因是因为需要自定义注册要求。从spark 2.0.0开始,在使用简单类型,简单类型数组或字符串类型对RDD进行数据shuffle,内部使用kryo进行序列化。

spark自动包含kryo序列化器,用于Twitter chill库中AllscalaRegistrar涵盖的许多常用核心scala库。

要使用kryo注册自己的自定义类,需要使用registerKryoClasses方法

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classof[MyClass1], classof[Myclass2]))
val sc = new SparkContext(conf)

如果需要注册的对象很大的时候,可以通过增加spark.kryoserializer.buffer的大小来容纳允许的序列化对象的大小。

如果没有注册自定义类,kryo仍会工作,但它必须存储每个对象的完整类名,这是一种浪费!

内存调整

内存优化有三个方面的考虑:对象所占的内存,访问对象的成本以及垃圾回收所占的开销。

默认情况下,java对象访问速度很快,但与其“原始”数据相比,可以轻松占用2-5倍的空间。这是由于一下几个原因:

  • 每个不同的java对象都有一个”对象头“,大约16个字节,并包含诸如指向该类的指针之类的信息。对于其中包含非常少数据的对象(比如一个int字段),这可能比数据大。
  • java String比原始字符串数据上有大约40个字节的开销(因为它们将它存储在一个char数组中并保留额外的数据,如长度),并且由于String内部使用UTF-16编码,一个字符需要占用两个字节,因此一个长度为10字节的字符串需要60个字节。
  • 公共集合类,例如HashMapLinkedList,使用链表数据结构,其中每个节点都有一个“包装”对象(例如Map.Entry)。此对象不仅包含“对象头”,还具有指向列表中下一个对象的指针(通常每个指针占8个字节)。
  • 原始类型的集合通常将它们存储为“装箱”对象,例如java.lang.Integer

内存管理概述

spark中的内存使用大致分为两类:executionstorageexecution内存是用于shufflejoinsortaggregation计算的内存,而storage内存是用于在集群中cachebroadcast内部数据的内存。在spark中,executionstorage共享一个统一的区域(M)。当没有使用execution内存时,storage可以获取所有可用的内存,反之亦然。如有必要,execution可以驱逐storage,但仅限于总存储内存使用量低于某个阈值(R)。换句话说,R描述了M从不驱逐缓存块的子区域。由于实现的复杂性,storage不会驱逐execution

这种设计保证了几个显著的特征。首先,不使用缓冲的应用程序可以使用整个空间执行,从而避免不必要的磁盘溢出。其次,使用缓冲的应用程序可以保留最小存储空间(R),其中数据块不受驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户内部划分内存的专业知识。

虽然有两种相关配置,但大部分用户不需要调整他们,因为默认值适用于大多数工作负载:

  • spark.memory.fraction表示大小M为(JVM堆空间 - 300MB)的一小部分(默认值为0.6)。剩余的空间(0.4)保留用于数据结构,spark中内部元数据,防止在异常大而稀疏的记录下发生OOM错误。
  • spark.memory.storageFraction表示RM(默认为0.5)的一小部分。RM缓冲块不受执行驱逐的存储空间。
  • spark.memory.fraction值的配置不仅仅调试JVM堆空间或trunred设置。还有一些GC优化

确定内存消耗

调整数据集所需内存消耗量的最佳方式就是创建RDD,将其放入缓冲中,然后查看web UI中的“storage”页面。来确定RDD占用多少内存。

为了估计特定对象的内存消耗,使用SizeEstimatorestimate方法。这对于尝试使用不同的数据布局来调整内存使用情况以及确定广播变量在每个执行程序堆上占用的空间量非常有用。

数据结构优化

减少内存消耗的第一种方法是避免增加开销的java功能,例如基于指针的数据结构和包装器对象。有很多中方法:

  1. 使用的数据结构优先选择对象数组和基本类型,而不是标准的javascala集合类(例如HashMap)。fastutil库提供了原始数据类型非常方便的集合类,同时兼容java标准类库。
  2. 尽可能避免使用包含大量小对象和指针的嵌套结构。
  3. 考虑使用数据ID或枚举对象而不是String类型的主键。
  4. 如果内存少于32GB,设置Jvm参数-XX:+UseCompressedOops来将8字节指针修改为4字节。

序列化RDD存储

当上面的优化都尝试了,但时当对象仍然太大而无法有效存储时,减少内存使用的一种更简单的方法是使用RDD持久化API中的序列化StorageLevels以序列化形式存储他们。spark将每个RDD分区存储为一个大字节数据。由于必须动态地反序列化每个对象,因此以序列化形式存储数据的唯一缺点是访问时间较慢。如果以序列化的形式缓冲数据,建议使用kryo,因为它占用的空间比java序列化小得多。

垃圾收集调整

当程序存储的RDD进行大量的“失效”时,JVM垃圾回收可能会出现问题。(在读取RDD一次然后在其上运行许多操作的程序中通常不会出现问题。)当java需要清理旧对象以便为新对象腾出空间时,它需要遍历所有java对象并查找未使用的。垃圾收集的成本与java对象的数量成正比,因此使用具有较少对象的数据结构(例如,使用int数组而不是LinkedList)大大降低了这种成本。一种更好的方法是如上述以序列化形式持久化对象:现在每个RDD分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果GC是一个问题,首先要使用序列化缓冲。

由于任务的execution内存(运行任务所需的空间量)与节点上缓冲的RDD之间的干扰,GC也可能成为问题。下面讨论如何控制分配给RDD缓存的空间来缓解这种情况。

测量GC的影响

GC调整的第一步是收集关于垃圾收集发生频率和GC花费的时间的统计信息。这可以通过添加 -Verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStampsjava选项来完成。下次运行spark作业时,每当发生垃圾回收时,都会看到工作日志中打印的消息。这些日志将在集群的工作节点(stdout在其工作目录的文件中),而不是驱动程序。

高级GC调整

为了进一步调整垃圾收集,首先介绍一下JVM中有关内存管理的一些基本信息:

  • java堆空间分为YoungOld两个区域。Young意味这持有新生的对象,而Old则持有生命周期较长的对象。
  • Young进一步分为三个区域[Eden, Survivor1, Survivor2]。
  • 垃圾收集过程的简化描述:当Eden已满时,在Eden上运行小级别GC,并将从EdenSurvivor1中存活的对象复制到Survivor2Survivor区被交换。如果一个对象足够大或Survivor2已满,则将其移到Old区。最后,当Old区接近满时,将调用大级别的GC

SparkGC调整的目标是确保只有长期存在的RDD存储在Old代中,并且Young代的大小足以存储短期对象。这将有助于避免大级别的GC收集在任务执行期间创建的临时对象。一些可能有用的步骤:

  • 通过收集GC统计数据来检查是否有太多的垃圾收集。如果在任务完成之前多次调用完整的GC,则意味这没有足够的内存可用于执行任务。
    • 如果有太多次要GC而不是主要GC,那么Eden分配更多的内存会有所帮助。可以将Eden的大小设置为偏大每个任务所需的内存量。如果Eden的大小E,则可以使用该选项设置Young代的大小-Xmn=4/3*E。(按比例增加4/3是为了考虑Survivor使用的空间)。
  • 在打印的GC统计信息中,如果Old接近满时,则通过降低来减少用于缓冲的内存量spark.memory.fraction;缓存更少的对象比减慢任务执行速度更好。考虑减小Young的大小。-Xmn如果没有,尝试更改JVM NewRatio参数的值。许多JVM将次默认为2,这意味着Old占堆的2/3。它应该足够大,使得该值超过spark.memory.fraction
  • 尝试使用G1GC垃圾收集器-XX:+UseG1GC。在垃圾收集成为瓶颈的某些情况下,它可以提高性能。可能重要的时增加G1region大小-XX:G1HeapRegionSize
  • 例如,如果任务时从hdfs读取数据,则可以使用从hdfs读取的数据块的大小来估计任务使用的内存量。解压块的大小通常时块大小的2或3倍。因此,如果有3或4个任务的工作空间,并且hdfs块大小为128M,可以估计Eden的带下4*3*128
  • 监控垃圾收集所用频率和时间如何随新设置而变化。

可以通过设置spark.executor.extraJavaOptions作业的配置来指定执行程序的GC调整标志。

其他考虑因素

并行程度

为每个操作设置足够高的并行度,否则将无法充分利用群集。Spark会根据其大小自动设置要在每个文件上运行的"map"任务的数量(尽管可以通过可选参数来控制它SparkContext.textFile等),并且对于分布式"reduce"操作,例如groupByKeyreduceByKey,它使用最大的父级RDD的分区数量。可以将并行级别作为第二个参数传递,或者设置config属性spark.default.parallelism以更改默认值。通常,建议群集中每个CPU核心有2-3个任务。

减少任务的内存使用情况

有时候,你会得到一个OutOfMemoryError,不是因为RDD不适合内存,而是因为一个任务的数据集太大了,例如SparkShuffle操作一个reduce任务groupByKeyreduceByKeyjoin等。建立每个任务中的哈希表来进行分组,而这往往是大的。最简单的解决方法是增加并行度,以便每个任务的输入集更小。Spark可以有效地支持短至200毫秒的任务,因为它在许多任务中重用了一个执行程序JVM,并且它具有较低的任务启动成本,因此可以安全地将并行度提高到超过群集中的核心数。

广播大变量

使用可用的广播变量可以大大减少每个序列化任务的大小,以及在群集上启动作业的成本。如果任务使用一个小的数据集进行类似与join操作,可以转化为mapjoin,建议转换为广播变量。Spark会在主服务器上打印每个任务的序列化大小,因此可以查看它以确定任务是否过大; 一般来说,大于约20 KB的任务可能值得优化。

数据位置

数据位置可能会对Spark作业的性能产生重大影响。如果数据和在其上运行的代码在一起,那么计算往往很快。但是如果代码和数据是分开的,那么必须进行移动。通常,将序列化代码从一个地方移动到另一个地方比数据移动更快,因为代码大小比数据小得多。Spark围绕数据局部性的一般原则构建其调度。

数据位置是数据与处理它的代码的接近程度。根据数据的当前位置,有多个级别的位置。从最近到最远的顺序:

  • PROCESS_LOCAL数据与正在运行的代码位于同一JVM中。这是最好的地方
  • NODE_LOCAL数据在同一节点上。可能位于同一节点上的HDFS中,也可能位于同一节点上的另一个执行程序中。这比PROCESS_LOCAL因为数据必须在进程之间传输要慢一些
  • NO_PREF 可以从任何地方快速访问数据,并且没有位置偏好
  • RACK_LOCAL数据位于同一机架服务器上。数据位于同一机架上的不同服务器上,因此需要通过网络发送,通常通过单个交换机
  • ANY 数据在网络上的其他位置,而不在同一个机架中

Spark更喜欢在最佳位置级别安排所有任务,但这并不总是可行的。在任何空闲执行程序上没有未处理数据的情况下,Spark会切换到较低的位置级别。有两个选项:a)等待忙碌的CPU释放以启动同一服务器上的数据任务,或b)立即在需要移动数据的更远的地方启动新任务。

Spark通常会做的是等待忙碌的CPU释放。一旦超时到期,它就开始将数据从远处移动到空闲CPU。每个级别之间的回退等待超时可以单独配置,也可以在一个参数中一起配置(spark.locality上的 参数)。默认情况下通常效果很好。

相关文章

网友评论

      本文标题:Spark调优

      本文链接:https://www.haomeiwen.com/subject/rccqyhtx.html