美文网首页
Spark-checkpoint检查点

Spark-checkpoint检查点

作者: 布莱安托 | 来源:发表于2020-07-06 19:57 被阅读0次

    Spark中对于数据的保存除了缓存操作外,还提供了一种检查点的机制,检查点是为了通过血缘关系实现容错辅助,血缘关系过长会造成容错成本过高,如果在中间阶段设立检查点进行容错,当后续节点出现问题是,从检查点开始重新建立血缘会减少开销。

    对一个RDD设置检查点,回将RDD序列化为二进制文件,并存储在设定的路径下,该路径通过SparkContext对象的setCheckPointDir()方法进行设置。

    在设置checkpoint的过程中,该RDD所有的父RDD依赖关系将会被移除。此外,对RDD进行设置checkpoint操作不会立刻执行,只有通过执行action算子才会触发。

    例子如下:

    object CheckpointDemo {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[4]").setAppName("CheckpointDemo")
        val sc = new SparkContext(conf)
        sc.setCheckpointDir("checkpoint")
    
        val rdd = sc.parallelize(Array(1, 2, 3, 4))
        val mapRdd = rdd.map(_.toString + " - " + System.currentTimeMillis())
    
        mapRdd.checkpoint()
    
        mapRdd.foreach(println)
        println(mapRdd.toDebugString)
    
      }
    }
    

    输出结果:

    1 - 1573566211111
    2 - 1573566211111
    3 - 1573566211111
    4 - 1573566211111

    (4) MapPartitionsRDD[1] at map at CheckpointDemo.scala:12 []
    | ReliableCheckpointRDD[2] at foreach at CheckpointDemo.scala:16 []

    可以看到依赖关系中,最早的一个RDD已经变为ReliableCheckpointRDD,表明在这之前的依赖关系已经被移除,后续计算均可以从检查点恢复。

    相关文章

      网友评论

          本文标题:Spark-checkpoint检查点

          本文链接:https://www.haomeiwen.com/subject/zvbgqktx.html