美文网首页
2persist和cache方法

2persist和cache方法

作者: 舞阳人的北京路 | 来源:发表于2018-01-07 21:06 被阅读0次

    一、出现的异常错误

    ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 22.1 GB of 21 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

    在执行spark任务的时候,我们会遇到以上的异常。

    result.cache

    使用cache的时候会报这个异常信息,会有任务失败,进行重试,有时可以重试成功,有时可以重试失败。

    二、不增加内存得方法解决以上问题

    改为result.persist(StorageLevel.MEMORY_AND_DISK)

    这样的话当内存不足时会保存到硬盘上,就不会出现任务失败的情况。

    三、从代码来看persist和cache的区别

    /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */

    def cache():this.type = persist()

    /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */

    def persist():this.type = persist(StorageLevel.MEMORY_ONLY)

    以上我们可以看出cache默认就是persist(StorageLevel.MEMORY_ONLY)

    而persist方法可以指定不同的StorageLevel,其值可以查看object StorageLevel

    /**

    * Set this RDD's storage level to persist its values across operations after the first time

    * it is computed. This can only be used to assign a new storage level if the RDD does not

    * have a storage level set yet. Local checkpointing is an exception.

    */

    def persist(newLevel: StorageLevel):this.type = {

    if (isLocallyCheckpointed) {

    // This means the user previously called localCheckpoint(), which should have already

    // marked this RDD for persisting. Here we should override the old storage level with

    // one that is explicitly requested by the user (after adapting it to use disk).

        persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride =true)

    }else {

    persist(newLevel, allowOverride =false)

    }

    }

    object StorageLevel {

    val NONE =new StorageLevel(false,false,false,false)

    val DISK_ONLY =new StorageLevel(true,false,false,false)

    val DISK_ONLY_2 =new StorageLevel(true,false,false,false,2)

    val MEMORY_ONLY =new StorageLevel(false,true,false,true)

    val MEMORY_ONLY_2 =new StorageLevel(false,true,false,true,2)

    val MEMORY_ONLY_SER =new StorageLevel(false,true,false,false)

    val MEMORY_ONLY_SER_2 =new StorageLevel(false,true,false,false,2)

    val MEMORY_AND_DISK =new StorageLevel(true,true,false,true)

    val MEMORY_AND_DISK_2 =new StorageLevel(true,true,false,true,2)

    val MEMORY_AND_DISK_SER =new StorageLevel(true,true,false,false)

    val MEMORY_AND_DISK_SER_2 =new StorageLevel(true,true,false,false,2)

    val OFF_HEAP =new StorageLevel(false,false,true,false)

    }

    四、总结

    选择一种最合适的持久化策略

            默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。

            如果上述策略会报内存溢出异常的情况,这个时候就需要使用MEMORY_AND_DISK的持久化策略。

    相关文章

      网友评论

          本文标题:2persist和cache方法

          本文链接:https://www.haomeiwen.com/subject/gnumnxtx.html