美文网首页
SparkCore之RDD缓存与RDD CheckPoint

SparkCore之RDD缓存与RDD CheckPoint

作者: 大数据小同学 | 来源:发表于2020-08-07 08:10 被阅读0次

RDD缓存

RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。
但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。


image.png

通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。


image

在存储级别的末尾加上“_2”来把持久化数据存为两份


image.png

缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

  1. 创建一个RDD
scala> val rdd = sc.makeRDD(Array("liujh"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at makeRDD at <console>:25
  1. 将RDD转换为携带当前时间戳不做缓存
scala> val nocache = rdd.map(_.toString+System.currentTimeMillis)
nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at map at <console>:27
  1. 多次打印结果
scala> nocache.collect
res0: Array[String] = Array(liujh1538978275359)
scala> nocache.collect
res1: Array[String] = Array(liujh1538978282416)
scala> nocache.collect
res2: Array[String] = Array(liujh1538978283199)
  1. 将RDD转换为携带当前时间戳并做缓存
scala> val cache =  rdd.map(_.toString+System.currentTimeMillis).cache
cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at map at <console>:27
  1. 多次打印做了缓存的结果
scala> cache.collect
res3: Array[String] = Array(liujh1538978435705)                                   
scala> cache.collect
res4: Array[String] = Array(liujh1538978435705)
scala> cache.collect
res5: Array[String] = Array(liujh1538978435705)

RDD CheckPoint

Spark中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。
为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。
案例实操:

  1. 设置检查点
scala> sc.setCheckpointDir("hdfs://hadoop102:9000/checkpoint")
  1. 创建一个RDD
scala> val rdd = sc.parallelize(Array("liujh"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:24
  1. 将RDD转换为携带当前时间戳并做checkpoint
scala> val ch = rdd.map(_+System.currentTimeMillis)
ch: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:26
scala> ch.checkpoint
  1. 多次打印结果
scala> ch.collect
res55: Array[String] = Array(liujh1538981860336)
scala> ch.collect
res56: Array[String] = Array(liujh1538981860504)
scala> ch.collect
res57: Array[String] = Array(liujh1538981860504)
scala> ch.collect
res58: Array[String] = Array(liujh1538981860504)
关注微信公众号
简书:https://www.jianshu.com/u/0278602aea1d
CSDN:https://blog.csdn.net/u012387141

相关文章

  • SparkCore之RDD缓存与RDD CheckPoint

    RDD缓存 RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() ...

  • spark rdd 爬坑集

    对于在调用rdd.checkpoint()之前就做过action的rdd ,checkpoint是无效的,不会产生...

  • SparkCore之RDD

    RDD 五大特性 A list of partitions一组分区:多个分区,在RDD中用分区的概念。 A fun...

  • SparkCore之RDD概述

    什么是RDD RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中...

  • SparkCore之RDD编程模型与RDD的创建

    编程模型 在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transfor...

  • SparkCore之RDD编程进阶

    累加器 累加器用来对信息进行聚合,通常在向 Spark传递函数时,比如使用 map() 函数或者用 filter(...

  • SparkCore之RDD依赖关系

    Lineage RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记...

  • spark checkpoint 原理

    1. 在 rdd 上调用 checkpoint() 方法,并没有立刻执行 只是在 rdd 上创建了一个 Relia...

  • spark checkpoint 和 RDD

    spark checkpoint 可以切断血缘关系,持久化再hdfs上,在checkpoint 之后的RDD后就无...

  • Spark RDD持久化级别

    RDD持久化用于RDD重用和节省重新计算,方便构建迭代算法,缓存粒度为整个RDD 持久化级别 如何选择存储级别? ...

网友评论

      本文标题:SparkCore之RDD缓存与RDD CheckPoint

      本文链接:https://www.haomeiwen.com/subject/fcyjyhtx.html