Spark中对于数据的保存除了缓存操作外,还提供了一种检查点的机制,检查点是为了通过血缘关系实现容错辅助,血缘关系过长会造成容错成本过高,如果在中间阶段设立检查点进行容错,当后续节点出现问题是,从检查点开始重新建立血缘会减少开销。
对一个RDD设置检查点,回将RDD序列化为二进制文件,并存储在设定的路径下,该路径通过SparkContext对象的setCheckPointDir()
方法进行设置。
在设置checkpoint的过程中,该RDD所有的父RDD依赖关系将会被移除。此外,对RDD进行设置checkpoint操作不会立刻执行,只有通过执行action算子才会触发。
例子如下:
object CheckpointDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("CheckpointDemo")
val sc = new SparkContext(conf)
sc.setCheckpointDir("checkpoint")
val rdd = sc.parallelize(Array(1, 2, 3, 4))
val mapRdd = rdd.map(_.toString + " - " + System.currentTimeMillis())
mapRdd.checkpoint()
mapRdd.foreach(println)
println(mapRdd.toDebugString)
}
}
输出结果:
1 - 1573566211111
2 - 1573566211111
3 - 1573566211111
4 - 1573566211111(4) MapPartitionsRDD[1] at map at CheckpointDemo.scala:12 []
| ReliableCheckpointRDD[2] at foreach at CheckpointDemo.scala:16 []
可以看到依赖关系中,最早的一个RDD已经变为ReliableCheckpointRDD
,表明在这之前的依赖关系已经被移除,后续计算均可以从检查点恢复。
网友评论