美文网首页Spark源码精读分析计划
Spark Core源码精读计划#20:RDD检查点的具体实现

Spark Core源码精读计划#20:RDD检查点的具体实现

作者: LittleMagic | 来源:发表于2019-06-20 21:57 被阅读10次

    目录

    前言

    RDD检查点(Checkpoint)是Spark Core计算过程中的容错机制。通过将RDD的数据与状态持久化,一旦计算过程出错,就可以从之前的状态直接恢复现场,而不必从头重算,大大提高了效率与可靠性。本文从之前已经研究过的RDD类入手,探索一下检查点的具体实现。

    RDD类中的检查点方法

    在RDD类中,对外提供了两个方法可以将RDD做Checkpoint,分别为checkpoint()方法和localCheckpoint()方法。还有一个对内的doCheckpoint()方法,它在调度模块中提交Job时使用,并且可以递归地对父RDD做Checkpoint,这里暂时不提。

    代码#20.1 - o.a.s.rdd.RDD.checkpoint()/localCheckpoint()方法

      def checkpoint(): Unit = RDDCheckpointData.synchronized {
        if (context.checkpointDir.isEmpty) {
          throw new SparkException("Checkpoint directory has not been set in the SparkContext")
        } else if (checkpointData.isEmpty) {
          checkpointData = Some(new ReliableRDDCheckpointData(this))
        }
      }
    
      def localCheckpoint(): this.type = RDDCheckpointData.synchronized {
        if (conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
            conf.contains("spark.dynamicAllocation.cachedExecutorIdleTimeout")) {
          logWarning(/*本地检查点不适用于Executor动态分配的情况...*/)
        }
    
        if (storageLevel == StorageLevel.NONE) {
          persist(LocalRDDCheckpointData.DEFAULT_STORAGE_LEVEL)
        } else {
          persist(LocalRDDCheckpointData.transformStorageLevel(storageLevel), allowOverride = true)
        }
    
        if (isCheckpointedAndMaterialized) {
          logWarning("Not marking RDD for local checkpoint because it was already " +
            "checkpointed and materialized")
        } else {
          checkpointData match {
            case Some(_: ReliableRDDCheckpointData[_]) => logWarning(
              "RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
            case _ =>
          }
          checkpointData = Some(new LocalRDDCheckpointData(this))
        }
        this
      }
    

    这两个方法最终都是将RDD的checkpointData属性赋值,对应的是检查点数据抽象类RDDCheckpointData的两种实现:ReliableRDDCheckpointData与LocalRDDCheckpointData。

    它们两个的区别正如名称的区别:ReliableRDDCheckpointData是将检查点数据保存在可靠的外部存储(HDFS)的文件中,需要重算时从文件读取数据。LocalRDDCheckpointData则将其保存在Executor节点本地,默认存储等级DEFAULT_STORAGE_LEVEL是StorageLevel.MEMORY_AND_DISK,也就是保存在内存与磁盘上。很显然,LocalRDDCheckpointData不如ReliableRDDCheckpointData可靠,一旦Executor失败,检查点数据就会丢失。但它相当于牺牲了可靠性换来了速度,在那些RDD Lineage过长的场景很有效。

    在本文中,我们研究的主要对象是ReliableRDDCheckpointData。需要注意的是,必须先设定Checkpoint目录(通过调用SparkContext.setCheckpointDir()方法)才能启用可靠的检查点。

    检查点数据的包装

    在看ReliableRDDCheckpointData之前,我们先来看看它的父类RDDCheckpointData。

    RDDCheckpointData

    代码#20.2 - o.a.s.rdd.RDDCheckpointData抽象类

    private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
      extends Serializable {
      import CheckpointState._
    
      protected var cpState = Initialized
      private var cpRDD: Option[CheckpointRDD[T]] = None
    
      def isCheckpointed: Boolean = RDDCheckpointData.synchronized {
        cpState == Checkpointed
      }
    
      final def checkpoint(): Unit = {
        RDDCheckpointData.synchronized {
          if (cpState == Initialized) {
            cpState = CheckpointingInProgress
          } else {
            return
          }
        }
    
        val newRDD = doCheckpoint()
    
        RDDCheckpointData.synchronized {
          cpRDD = Some(newRDD)
          cpState = Checkpointed
          rdd.markCheckpointed()
        }
      }
    
      protected def doCheckpoint(): CheckpointRDD[T]
    
      def checkpointRDD: Option[CheckpointRDD[T]] = RDDCheckpointData.synchronized { cpRDD }
    
      def getPartitions: Array[Partition] = RDDCheckpointData.synchronized {
        cpRDD.map(_.partitions).getOrElse { Array.empty }
      }
    }
    

    RDDCheckpointData类的构造参数rdd表示当前检查点数据与该RDD相关。cpRDD则表示一个CheckpointRDD实例,它是一个特殊的RDD实现,用于保存检查点,以及从检查点数据恢复现场。cpState是当前检查点进行的状态,由CheckpointState对象定义,实际上是个枚举,分为三个阶段:初始化、正在Checkpoint、Checkpoint完成。

    代码#20.3 - o.a.s.rdd.CheckpointState对象

    private[spark] object CheckpointState extends Enumeration {
      type CheckpointState = Value
      val Initialized, CheckpointingInProgress, Checkpointed = Value
    }
    

    checkpoint()方法包含了保存检查点的逻辑,注意它由final关键词修饰,子类不可以覆写。它的执行流程是:在检查点状态是Initialized的情况下,将其置为CheckpointingInProgress,然后调用doCheckpoint()方法生成CheckpointRDD。注意doCheckpoint()是个抽象方法,由ReliableRDDCheckpointData与LocalRDDCheckpointData分别实现。最后将生成的CheckpointRDD赋值给cpRDD,将状态置为Checkpointed,并调用RDD.markCheckpointed()方法标记检查点已经保存完毕。

    markCheckpointed()方法的源码如下。

    代码#20.4 - o.a.s.rdd.RDD.markCheckpointed()方法

      private[spark] def markCheckpointed(): Unit = {
        clearDependencies()
        partitions_ = null
        deps = null
      }
    
      protected def clearDependencies(): Unit = {
        dependencies_ = null
      }
    

    可见是将RDD原先持有的分区和依赖信息清除了。很显然,这些东西都已经保存在了检查点里,不需要再保留一份。下面来读读ReliableRDDCheckpointData是如何实现的。

    ReliableRDDCheckpointData

    ReliableRDDCheckpointData类没有很特殊的逻辑,下面是doCheckpoint()方法的实现。

    代码#20.5 - o.a.s.rdd.ReliableRDDCheckpointData.doCheckpoint()方法

      protected override def doCheckpoint(): CheckpointRDD[T] = {
        val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
    
        if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
          rdd.context.cleaner.foreach { cleaner =>
            cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
          }
        }
    
        logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
        newRDD
      }
    

    可见,CheckpointRDD是通过调用ReliableCheckpointRDD.writeRDDToCheckpointDirectory()方法生成的。另外,在其伴生对象中还提供了两个方法,分别用来返回RDD检查点的路径,以及删除检查点数据。

    代码#20.6 - o.a.s.rdd.ReliableRDDCheckpointData.checkpointPath()/cleanCheckpoint()方法

      def checkpointPath(sc: SparkContext, rddId: Int): Option[Path] = {
        sc.checkpointDir.map { dir => new Path(dir, s"rdd-$rddId") }
      }
    
      def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {
        checkpointPath(sc, rddId).foreach { path =>
          path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
        }
      }
    

    然后来看CheckpointRDD的相关细节,通过它,我们就可以真正地创建检查点,以及从检查点数据恢复现场了。

    检查点RDD

    CheckpointRDD

    CheckpointRDD实际上也是个抽象类,继承自RDD。

    代码#20.7 - o.a.s.rdd.CheckpointRDD抽象类

    private[spark] abstract class CheckpointRDD[T: ClassTag](sc: SparkContext)
      extends RDD[T](sc, Nil) {
      override def doCheckpoint(): Unit = { }
      override def checkpoint(): Unit = { }
      override def localCheckpoint(): this.type = this
    
      // scalastyle:off
      protected override def getPartitions: Array[Partition] = ???
      override def compute(p: Partition, tc: TaskContext): Iterator[T] = ???
      // scalastyle:on
    }
    

    可见,它将RDD类中doCheckpoint()、checkpoint()和localCheckpoint()三个方法都覆写成了空的,因为CheckpointRDD本身并不需要再次被Checkpoint。另外它也覆写了文章#18中提到的getPartitions()和compute()方法,看官可能对三个问号比较好奇,实际上它是在scala.Predef中定义的:
    def ??? : Nothing = throw new NotImplementedError

    相当于没有实现,而把具体工作下放给子类去做。要使用???,也必须像上面代码一样,用scalastyle:off关闭静态检查。

    普通RDD的compute()方法用于计算分区数据,在CheckpointRDD中,它的作用就是从检查点恢复数据了。如同RDDCheckpointData一样,CheckpointRDD也有两个子类,即ReliableCheckpointRDD和LocalCheckpointRDD。下面来看ReliableCheckpointRDD。

    ReliableCheckpointRDD

    ReliableCheckpointRDD是一个相对复杂的实现,并且其大多数方法都在伴生对象中。我们就不按部就班地阅读代码了,而直接从代码#20.5中调用的
    writeRDDToCheckpointDirectory()方法入手,看看检查点数据是如何写入的。

    代码#20.8 - o.a.s.rdd.ReliableCheckpointRDD.writeRDDToCheckpointDirectory()方法

      def writeRDDToCheckpointDirectory[T: ClassTag](
          originalRDD: RDD[T],
          checkpointDir: String,
          blockSize: Int = -1): ReliableCheckpointRDD[T] = {
        val checkpointStartTimeNs = System.nanoTime()
    
        val sc = originalRDD.sparkContext
    
        val checkpointDirPath = new Path(checkpointDir)
        val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
        if (!fs.mkdirs(checkpointDirPath)) {
          throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
        }
    
        val broadcastedConf = sc.broadcast(
          new SerializableConfiguration(sc.hadoopConfiguration))
        sc.runJob(originalRDD,
          writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
    
        if (originalRDD.partitioner.nonEmpty) {
          writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
        }
    
        val checkpointDurationMs =
          TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
        logInfo(s"Checkpointing took $checkpointDurationMs ms.")
    
        val newRDD = new ReliableCheckpointRDD[T](
          sc, checkpointDirPath.toString, originalRDD.partitioner)
        if (newRDD.partitions.length != originalRDD.partitions.length) {
          throw new SparkException(
            "Checkpoint RDD has a different number of partitions from original RDD. Original " +
              s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " +
              s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
              s"${newRDD.partitions.length}].")
        }
        newRDD
      }
    

    该方法的执行流程是:调用HDFS相关的API创建检查点的目录,然后调用SparkContext.runJob()方法起一个Job,该Job执行writePartitionToCheckpointFile()方法的逻辑,将RDD的分区数据写入检查点目录。再检查原RDD是否定义了分区器,如有,就调用writePartitionerToCheckpointDir()方法将分区器的逻辑写入检查点目录。最后创建ReliableCheckpointRDD实例,并检查它的分区数是否与原RDD的分区数相同,相同则成功返回。

    上面涉及到的两个写入方法代码比较多,但是理解起来很容易,故不再贴出来。那么如何读取检查点的数据呢?来看compute()方法的实现。

    代码#20.9 - o.a.s.rdd.ReliableCheckpointRDD.compute()方法

      override def compute(split: Partition, context: TaskContext): Iterator[T] = {
        val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
        ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
      }
    

    可见是调用了readCheckpointFile()方法,其代码如下。

    代码#20.10 - o.a.s.rdd.ReliableCheckpointRDD.compute()方法

      def readCheckpointFile[T](
          path: Path,
          broadcastedConf: Broadcast[SerializableConfiguration],
          context: TaskContext): Iterator[T] = {
        val env = SparkEnv.get
        val fs = path.getFileSystem(broadcastedConf.value.value)
        val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
        val fileInputStream = {
          val fileStream = fs.open(path, bufferSize)
          if (env.conf.get(CHECKPOINT_COMPRESS)) {
            CompressionCodec.createCodec(env.conf).compressedInputStream(fileStream)
          } else {
            fileStream
          }
        }
        val serializer = env.serializer.newInstance()
        val deserializeStream = serializer.deserializeStream(fileInputStream)
    
        context.addTaskCompletionListener(context => deserializeStream.close())
        deserializeStream.asIterator.asInstanceOf[Iterator[T]]
      }
    

    该方法仍然使用HDFS API打开检查点目录下的文件,并用SparkEnv中初始化的JavaSerializer反序列化,最终返回数据的迭代器,整个现场就恢复了。

    总结

    本文研究了与Spark RDD检查点相关的重要组件——RDDCheckpointData和CheckpointRDD,并且以可靠版本的实现——ReliableRDDCheckpointData和ReliableCheckpointRDD为例,详细解析了检查点数据从写入到读取的整个流程。

    相关文章

      网友评论

        本文标题:Spark Core源码精读计划#20:RDD检查点的具体实现

        本文链接:https://www.haomeiwen.com/subject/ablsqctx.html