Spark 控制算子源码解析

作者: Tim在路上 | 来源:发表于2022-02-08 14:06 被阅读0次

    Spark 控制算子源码解析

    RDD

    • persist() 算子

    使用指定的level来标记RDD进行存储。

    private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
      //TODO: Handle changes of StorageLevel
    // 如果该RDD已经有存储level, 同时不允许覆盖,则新设置存储level会报错。
    if (storageLevel!= StorageLevel.NONE&& newLevel !=storageLevel&& !allowOverride) {
        throw new UnsupportedOperationException(
          "Cannot change storage level of an RDD after it was already assigned a level")
      }
      // If this is the first time this RDD is marked for persisting, register it
      // with the SparkContext for cleanups and accounting. Do this only once.
      // 第一次设置,则进行注册
      if (storageLevel== StorageLevel.NONE) {
        sc.cleaner.foreach(_.registerRDDForCleanup(this))
        sc.persistRDD(this)
      }
    storageLevel= newLevel
      this
    }
    

    可以看出注册的就是一个ConcurrentMap

    private val referenceBuffer =
        Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
    
    private[spark] valpersistentRdds= {
      val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
      map.asScala
    }
    

    从源码中可以看出,执行persist仅仅是设置了StorageLevel, 同时在sc的Map中注册RDD id, 来标记该RDD的存储等级。所以说persist并不是一个action算子,只有真正执行时存储进行存储。

    下面我们来看下源码中是如何调用的:

    def runJob[T, U: ClassTag](
        rdd: RDD[T],
        func: Iterator[T] => U,
        partitions: Seq[Int]): Array[U] = {
      // 在clean 函数时的一个作用就是会将对象按照level序列化写出
      val cleanedFunc = clean(func)
      runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
    }
    
    // 验证是否在cleaning 后可序列化
    if (checkSerializable) {
    ensureSerializable(func)
    }
    // 执行java的序列化,并将func对象进行写出
    override def serialize[T: ClassTag](t: T): ByteBuffer = {
        val bos = new ByteBufferOutputStream()
        val out = serializeStream(bos)
        out.writeObject(t)
        out.close()
        bos.toByteBuffer
      }
    // 调用writeObject0
    private void writeObject0(Object obj, boolean unshared){
    ...
    } else if (obj instanceof Serializable) {
                    writeOrdinaryObject(obj, desc, unshared);
    } else {
    ...
    }
    // 如果对象开启可外部写
    if (desc.isExternalizable() && !desc.isProxy()) {
                    writeExternalData((Externalizable) obj);
    } else {
    
    // 最终调用StorageLevel, 这里的Toint, 是将用户的选择是否开启内存,磁盘等方式转换为了数字。
    class StorageLevel private(...{
    override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
        out.writeByte(toInt)
        out.writeByte(_replication)
      }
    }
    // toInt 的实现,相当与使用二进制数的每一位,来表示存储的状态。
    def toInt: Int = {
        var ret = 0
        if (_useDisk) {
          ret |= 8
        }
        if (_useMemory) {
          ret |= 4
        }
        if (_useOffHeap) {
          ret |= 2
        }
        if (_deserialized) {
          ret |= 1
        }
        ret
      }
    

    实现存储写出的方法是在StorageLevel对象中进行封装, 实现将标志的记录写入外部存储。最后在ShuffleMapTask的反序列化的时候将其连带RDD进行读出val (rdd, dep) = ser.deserialize,在Worker节点实现写入的时候完成数据的存储设置。

    • cache 算子

    使用内存存储作为存储等级,底层是调用了persist() 算子。

    def cache(): this.type = persist()
    def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
    

    同上,cache也不是action算子,只有在action的runJob中的clean函数时才会进行写出。cache算子和persist算子是必须进行返回的。使用时需要val rdd1 = rdd.cache()

    ReliableCheckpointRDD

    • checkpoint 算子

    将此RDD设置为检查点,它将会被保存到检查文件内。

    可以通过SparkContext#setCheckpointDir 设置检查点存放的目录,其父RDDs将被删除。强烈建议使用前先将RDD使用persist存储于内存,否则会重新进行计算。

    def checkpoint(): Unit = RDDCheckpointData.synchronized {
      // NOTE: we use a global lock here due to complexities downstream with ensuring
      // children RDD partitions point to the correct parent partitions. In the future
      // we should revisit this consideration.
      if (context.checkpointDir.isEmpty) {
        throw new SparkException("Checkpoint directory has not been set in the SparkContext")
      } else if (checkpointData.isEmpty) {
    // 创建checkpointData
    checkpointData= Some(new ReliableRDDCheckpointData(this))
      }
    }
    

    这里仅仅进行了的创建ReliableRDDCheckpointData,并为执行其中的方法,在ReliableRDDCheckpointData中会实现一个doCheckpoint()的方法,接下来会进行介绍如何调用,所以这里也只是创建和标注的作用。

    调用是在SparkContext类的runJob方法中的最后,可以看出会调用每一个RDD的doCheckpoint方法,如果前面有创建Checkpoint的实现,默认最后一次判断是否创建checkpointData。

    def runJob[T, U: ClassTag](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        resultHandler: (Int, U) => Unit): Unit = {
      ...
      rdd.doCheckpoint()
    }
    
    doCheckpointCalled = false;
    private[spark] def doCheckpoint(): Unit = {
      RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
        // 第一个一定会进入这个方法的,之后会向前调用每一个RDD, 
        if (!doCheckpointCalled) {
    doCheckpointCalled= true
          if (checkpointData.isDefined) {
            if (checkpointAllMarkedAncestors) {
              //TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
    // them in parallel.
              // Checkpoint parents first because our lineage will be truncated after we
              // checkpoint ourselves
              dependencies.foreach(_.rdd.doCheckpoint())
            }
    checkpointData.get.checkpoint()
          } else {
            dependencies.foreach(_.rdd.doCheckpoint())
          }
        }
      }
    }
    

    那么我们来看下checkpointData的实现类ReliableCheckpointRDD 和 ReliableRDDCheckpointData 是如何实现的。

    1. 执行rdd. doCheckpoint() 方法

    从上面的源码可以看出,执行doCheckpoint方法的条件是checkpointData.isDefined,checkpointData被定义,而我们在执行checkpoint()方法是,最后的实现就是创建了一个new ReliableRDDCheckpointData(this)对象。所以最终会执行ReliableRDDCheckpointData的doCheckpoint()方法。

    protected override def doCheckpoint(): CheckpointRDD[T] = {
    // 调用写RDD到checkpoint目录的方法
      val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd,cpDir)
    
      // Optionally clean our checkpoint files if the reference is out of scope
      if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
        rdd.context.cleaner.foreach { cleaner =>
          cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
        }
      }
    
      logInfo(s"Done checkpointing RDD${rdd.id} to$cpDir, new parent is RDD${newRDD.id}")
      newRDD
    }
    
    def writeRDDToCheckpointDirectory[T: ClassTag](
        originalRDD: RDD[T],
        checkpointDir: String,
        blockSize: Int = -1): ReliableCheckpointRDD[T] = {
      val checkpointStartTimeNs = System.nanoTime()
    
      val sc = originalRDD.sparkContext
    
      // Create the output path for the checkpoint
      // 获取写出目录
      val checkpointDirPath = new Path(checkpointDir)
      val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
      if (!fs.mkdirs(checkpointDirPath)) {
        throw new SparkException(s"Failed to create checkpoint path$checkpointDirPath")
      }
    
      // Save to file, and reload it as an RDD
      val broadcastedConf = sc.broadcast(
        new SerializableConfiguration(sc.hadoopConfiguration))
      //TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
    // 提交runJob, 可以看出这里相当于重新提交任务,会重新走一遍计算,这也是为什么推荐在checkpoint前进行persist
    sc.runJob(originalRDD,
    writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
      // 调用writeObject(partitioner)方法将数据写出到目录
      if (originalRDD.partitioner.nonEmpty) {
    writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
      }
    
      val checkpointDurationMs =
        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
      logInfo(s"Checkpointing took$checkpointDurationMs ms.")
      // 最终执行创建ReliableCheckpointRDD
      val newRDD = new ReliableCheckpointRDD[T](
        sc, checkpointDirPath.toString, originalRDD.partitioner)
      if (newRDD.partitions.length != originalRDD.partitions.length) {
        throw new SparkException(
          "Checkpoint RDD has a different number of partitions from original RDD. Original " +
            s"RDD [ID:${originalRDD.id}, num of partitions:${originalRDD.partitions.length}]; " +
            s"Checkpoint RDD [ID:${newRDD.id}, num of partitions: " +
            s"${newRDD.partitions.length}].")
      }
      newRDD
    }
    

    那么数据是如何从checkpoint目录中读出的?这涉及到了ReliableCheckpointRDD。

    1. 获取分区
    protected override def getPartitions: Array[Partition] = {
      // listStatus can throw exception if path does not exist.
      val inputFiles =fs.listStatus(cpath)
        .map(_.getPath)
        .filter(_.getName.startsWith("part-"))
        .sortBy(_.getName.stripPrefix("part-").toInt)
      // Fail fast if input files are invalid
      inputFiles.zipWithIndex.foreach { case (path, i) =>
        if (path.getName != ReliableCheckpointRDD.checkpointFileName(i)) {
          throw new SparkException(s"Invalid checkpoint file:$path")
        }
      }
      Array.tabulate(inputFiles.length)(i => new CheckpointRDDPartition(i))
    }
    

    遍历分区文件夹,并对文件名进行排序。

    1. 计算compute
    override def compute(split: Partition, context: TaskContext): Iterator[T] = {
      val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
      ReliableCheckpointRDD.readCheckpointFile(file,broadcastedConf, context)
    }
    

    执行readCheckpointFile函数,通过指定文件夹,和读取的缓存大小进行读取。

    最后,checkpoint是新提交一个job进行重新执行,和原任务没有依赖关系,所以调用checkpoint也不需要进行返回一个新的RDD。

    相关文章

      网友评论

        本文标题:Spark 控制算子源码解析

        本文链接:https://www.haomeiwen.com/subject/vgubkrtx.html