美文网首页Spark
spark中cache()、persist()、checkpoi

spark中cache()、persist()、checkpoi

作者: 南宋临安府 | 来源:发表于2018-05-17 13:54 被阅读0次

“夫唯兵者,不祥之器,物或恶之,故有道者不处。
君子居则贵左,用兵则贵右。
兵者不祥之器,非君子之器,不得已而用之,恬淡为上。
胜而不美,而美之者,是乐杀人。
夫乐杀人者,则不可得志于天下矣。
吉事尚左,凶事尚右。
偏将军居左,上将军居右,言以丧礼处之。
杀人之众,以悲哀泣之,战胜以丧礼处之。”[1]

Spark对RDD的持久化操作(cache()persist()checkpoint())是很重要的,可以将rdd存放在不同的存储介质中,方便后续的操作能重复使用。

cache()

persist()

cache和persist都是用于将一个RDD进行缓存,这样在之后使用的过程中就不需要重新计算,可以大大节省程序运行时间。
cache和persist的区别:cache只有一个默认的缓存级别MEMORY_ONLY,而persist可以根据情况设置其它的缓存级别
RDD的缓存级别:

查看 StorageLevel 类的源码
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)
  ......
}

可以看到这里列出了12种缓存级别,但这些有什么区别呢?可以看到每个缓存级别后面都跟了一个StorageLevel的构造函数,里面包含了4个或5个参数,如下:

val MEMORY_ONLY = new StorageLevel(false, true, false, true)

查看其构造函数
class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {
  ......
  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication
  ......
}

可以看到StorageLevel类的主构造器包含了5个参数:

  • useDisk:使用硬盘(外存)
  • useMemory:使用内存
  • useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
  • deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
  • replication:备份数(在多个节点上备份)

理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)

checkpoint()

sc.sparkContext.setCheckpointDir('...')
......
......
rdd.cache()
rdd.checkpoint()
......

checkpoint接口是将RDD持久化到HDFS中,与persist的区别是checkpoint会切断此RDD之前的依赖关系,而persist会保留依赖关系。checkpoint的两大作用:一是spark程序长期驻留,过长的依赖会占用很多的系统资源,定期checkpoint可以有效的节省资源;二是维护过长的依赖关系可能会出现问题,一旦spark程序运行失败,RDD的容错成本会很高。

注意:checkpoint执行前要先进行cache,避免两次计算。


  1. 老子《道德经》第三十一章,老子故里,中国鹿邑。

相关文章

网友评论

    本文标题:spark中cache()、persist()、checkpoi

    本文链接:https://www.haomeiwen.com/subject/xttbdftx.html