一、出现的异常错误
ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 22.1 GB of 21 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
在执行spark任务的时候,我们会遇到以上的异常。
result.cache
使用cache的时候会报这个异常信息,会有任务失败,进行重试,有时可以重试成功,有时可以重试失败。
二、不增加内存得方法解决以上问题
改为result.persist(StorageLevel.MEMORY_AND_DISK)
这样的话当内存不足时会保存到硬盘上,就不会出现任务失败的情况。
三、从代码来看persist和cache的区别
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache():this.type = persist()
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist():this.type = persist(StorageLevel.MEMORY_ONLY)
以上我们可以看出cache默认就是persist(StorageLevel.MEMORY_ONLY)
而persist方法可以指定不同的StorageLevel,其值可以查看object StorageLevel
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet. Local checkpointing is an exception.
*/
def persist(newLevel: StorageLevel):this.type = {
if (isLocallyCheckpointed) {
// This means the user previously called localCheckpoint(), which should have already
// marked this RDD for persisting. Here we should override the old storage level with
// one that is explicitly requested by the user (after adapting it to use disk).
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride =true)
}else {
persist(newLevel, allowOverride =false)
}
}
object StorageLevel {
val NONE =new StorageLevel(false,false,false,false)
val DISK_ONLY =new StorageLevel(true,false,false,false)
val DISK_ONLY_2 =new StorageLevel(true,false,false,false,2)
val MEMORY_ONLY =new StorageLevel(false,true,false,true)
val MEMORY_ONLY_2 =new StorageLevel(false,true,false,true,2)
val MEMORY_ONLY_SER =new StorageLevel(false,true,false,false)
val MEMORY_ONLY_SER_2 =new StorageLevel(false,true,false,false,2)
val MEMORY_AND_DISK =new StorageLevel(true,true,false,true)
val MEMORY_AND_DISK_2 =new StorageLevel(true,true,false,true,2)
val MEMORY_AND_DISK_SER =new StorageLevel(true,true,false,false)
val MEMORY_AND_DISK_SER_2 =new StorageLevel(true,true,false,false,2)
val OFF_HEAP =new StorageLevel(false,false,true,false)
}
四、总结
选择一种最合适的持久化策略
默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。
如果上述策略会报内存溢出异常的情况,这个时候就需要使用MEMORY_AND_DISK的持久化策略。
网友评论