Spark 控制算子源码解析
RDD
- persist() 算子
使用指定的level来标记RDD进行存储。
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
//TODO: Handle changes of StorageLevel
// 如果该RDD已经有存储level, 同时不允许覆盖,则新设置存储level会报错。
if (storageLevel!= StorageLevel.NONE&& newLevel !=storageLevel&& !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
// 第一次设置,则进行注册
if (storageLevel== StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel= newLevel
this
}
可以看出注册的就是一个ConcurrentMap
private val referenceBuffer =
Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
private[spark] valpersistentRdds= {
val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
map.asScala
}
从源码中可以看出,执行persist仅仅是设置了StorageLevel, 同时在sc的Map中注册RDD id, 来标记该RDD的存储等级。所以说persist并不是一个action算子,只有真正执行时存储进行存储。
下面我们来看下源码中是如何调用的:
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U] = {
// 在clean 函数时的一个作用就是会将对象按照level序列化写出
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
// 验证是否在cleaning 后可序列化
if (checkSerializable) {
ensureSerializable(func)
}
// 执行java的序列化,并将func对象进行写出
override def serialize[T: ClassTag](t: T): ByteBuffer = {
val bos = new ByteBufferOutputStream()
val out = serializeStream(bos)
out.writeObject(t)
out.close()
bos.toByteBuffer
}
// 调用writeObject0
private void writeObject0(Object obj, boolean unshared){
...
} else if (obj instanceof Serializable) {
writeOrdinaryObject(obj, desc, unshared);
} else {
...
}
// 如果对象开启可外部写
if (desc.isExternalizable() && !desc.isProxy()) {
writeExternalData((Externalizable) obj);
} else {
// 最终调用StorageLevel, 这里的Toint, 是将用户的选择是否开启内存,磁盘等方式转换为了数字。
class StorageLevel private(...{
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeByte(toInt)
out.writeByte(_replication)
}
}
// toInt 的实现,相当与使用二进制数的每一位,来表示存储的状态。
def toInt: Int = {
var ret = 0
if (_useDisk) {
ret |= 8
}
if (_useMemory) {
ret |= 4
}
if (_useOffHeap) {
ret |= 2
}
if (_deserialized) {
ret |= 1
}
ret
}
实现存储写出的方法是在StorageLevel对象中进行封装, 实现将标志的记录写入外部存储。最后在ShuffleMapTask的反序列化的时候将其连带RDD进行读出val (rdd, dep) = ser.deserialize,在Worker节点实现写入的时候完成数据的存储设置。
- cache 算子
使用内存存储作为存储等级,底层是调用了persist() 算子。
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
同上,cache也不是action算子,只有在action的runJob中的clean函数时才会进行写出。cache算子和persist算子是必须进行返回的。使用时需要val rdd1 = rdd.cache()
ReliableCheckpointRDD
- checkpoint 算子
将此RDD设置为检查点,它将会被保存到检查文件内。
可以通过SparkContext#setCheckpointDir 设置检查点存放的目录,其父RDDs将被删除。强烈建议使用前先将RDD使用persist存储于内存,否则会重新进行计算。
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// NOTE: we use a global lock here due to complexities downstream with ensuring
// children RDD partitions point to the correct parent partitions. In the future
// we should revisit this consideration.
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
// 创建checkpointData
checkpointData= Some(new ReliableRDDCheckpointData(this))
}
}
这里仅仅进行了的创建ReliableRDDCheckpointData,并为执行其中的方法,在ReliableRDDCheckpointData中会实现一个doCheckpoint()的方法,接下来会进行介绍如何调用,所以这里也只是创建和标注的作用。
调用是在SparkContext类的runJob方法中的最后,可以看出会调用每一个RDD的doCheckpoint方法,如果前面有创建Checkpoint的实现,默认最后一次判断是否创建checkpointData。
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
...
rdd.doCheckpoint()
}
doCheckpointCalled = false;
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
// 第一个一定会进入这个方法的,之后会向前调用每一个RDD,
if (!doCheckpointCalled) {
doCheckpointCalled= true
if (checkpointData.isDefined) {
if (checkpointAllMarkedAncestors) {
//TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
// them in parallel.
// Checkpoint parents first because our lineage will be truncated after we
// checkpoint ourselves
dependencies.foreach(_.rdd.doCheckpoint())
}
checkpointData.get.checkpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
那么我们来看下checkpointData的实现类ReliableCheckpointRDD 和 ReliableRDDCheckpointData 是如何实现的。
- 执行rdd. doCheckpoint() 方法
从上面的源码可以看出,执行doCheckpoint方法的条件是checkpointData.isDefined,checkpointData被定义,而我们在执行checkpoint()方法是,最后的实现就是创建了一个new ReliableRDDCheckpointData(this)对象。所以最终会执行ReliableRDDCheckpointData的doCheckpoint()方法。
protected override def doCheckpoint(): CheckpointRDD[T] = {
// 调用写RDD到checkpoint目录的方法
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd,cpDir)
// Optionally clean our checkpoint files if the reference is out of scope
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}
}
logInfo(s"Done checkpointing RDD${rdd.id} to$cpDir, new parent is RDD${newRDD.id}")
newRDD
}
def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val checkpointStartTimeNs = System.nanoTime()
val sc = originalRDD.sparkContext
// Create the output path for the checkpoint
// 获取写出目录
val checkpointDirPath = new Path(checkpointDir)
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
if (!fs.mkdirs(checkpointDirPath)) {
throw new SparkException(s"Failed to create checkpoint path$checkpointDirPath")
}
// Save to file, and reload it as an RDD
val broadcastedConf = sc.broadcast(
new SerializableConfiguration(sc.hadoopConfiguration))
//TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
// 提交runJob, 可以看出这里相当于重新提交任务,会重新走一遍计算,这也是为什么推荐在checkpoint前进行persist
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
// 调用writeObject(partitioner)方法将数据写出到目录
if (originalRDD.partitioner.nonEmpty) {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}
val checkpointDurationMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
logInfo(s"Checkpointing took$checkpointDurationMs ms.")
// 最终执行创建ReliableCheckpointRDD
val newRDD = new ReliableCheckpointRDD[T](
sc, checkpointDirPath.toString, originalRDD.partitioner)
if (newRDD.partitions.length != originalRDD.partitions.length) {
throw new SparkException(
"Checkpoint RDD has a different number of partitions from original RDD. Original " +
s"RDD [ID:${originalRDD.id}, num of partitions:${originalRDD.partitions.length}]; " +
s"Checkpoint RDD [ID:${newRDD.id}, num of partitions: " +
s"${newRDD.partitions.length}].")
}
newRDD
}
那么数据是如何从checkpoint目录中读出的?这涉及到了ReliableCheckpointRDD。
- 获取分区
protected override def getPartitions: Array[Partition] = {
// listStatus can throw exception if path does not exist.
val inputFiles =fs.listStatus(cpath)
.map(_.getPath)
.filter(_.getName.startsWith("part-"))
.sortBy(_.getName.stripPrefix("part-").toInt)
// Fail fast if input files are invalid
inputFiles.zipWithIndex.foreach { case (path, i) =>
if (path.getName != ReliableCheckpointRDD.checkpointFileName(i)) {
throw new SparkException(s"Invalid checkpoint file:$path")
}
}
Array.tabulate(inputFiles.length)(i => new CheckpointRDDPartition(i))
}
遍历分区文件夹,并对文件名进行排序。
- 计算compute
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
ReliableCheckpointRDD.readCheckpointFile(file,broadcastedConf, context)
}
执行readCheckpointFile函数,通过指定文件夹,和读取的缓存大小进行读取。
最后,checkpoint是新提交一个job进行重新执行,和原任务没有依赖关系,所以调用checkpoint也不需要进行返回一个新的RDD。
网友评论