既然要说CacheManager,那么就从RDD获取数据开始入手,例如MapPartitionsRDD,在RDD的compute方法中通过firstParent[T].iterator(split, context)
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
//todo 判断RDD的数据是否进行了缓存
if (storageLevel != StorageLevel.NONE) {
//todo 如果缓存了则使用cacheManager获取数据
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
//todo 否则冲checkpoint中获取数据或者从新计算rdd的数据
computeOrReadCheckpoint(split, context)
如果blockManager的get方法没有获取到数据那么就要从新计算该RDD的数据,如下源码可见,先给指定的partition加锁,然后使用rdd.computeOrReadCheckpoint(partition, context)
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
//todo 从缓存中获取RDD的数据,如果缓存中没有数据则重新计算
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(, partition.index)
logDebug(s"Looking for partition $key")
//todo cacheManager利用blockManager获取数据
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
//todo 如果有数据则直接返回
val existingMetrics = context.taskMetrics
val iter =[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
case None =>
//todo 如果没有数据则从新计算,并将计算后的数据存入缓存中
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
//todo 给指定的分区加锁
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
// Otherwise, we have to load the partition ourselves
try {
logInfo(s"Partition $key not found, computing it")
//todo 从新计算rdd的数据
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
// If the task is running locally, do not persist the result
//todo 如果任务运行在本地,则直接返回结果数据不进行持久化
if (context.isRunningLocally) {
return computedValues
// Otherwise, cache the values and keep track of any updates in block statuses
//todo 将结果数据放入到缓存中
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
val metrics = context.taskMetrics
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
刚才在CacheManager的getOrCompute方法中调用了rdd.computeOrReadCheckpoint(partition, context)
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
//todo 判断数据是否checkPoint
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
//todo 如果没有checkPoint则从新进行RDD的comput
compute(split, context)
override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>
firstParent[T].iterator(parentPartition, context)
private def putInBlockManager[T](
key: BlockId,
values: Iterator[T],
level: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
//todo 获取store级别
val putLevel = effectiveStorageLevel.getOrElse(level)
if (!putLevel.useMemory) {
* This RDD is not to be cached in memory, so we can just pass the computed values as an
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
//todo 不使用memory的方式存储
updatedBlocks ++=
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) =>[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
} else {
* This RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is because
* we may end up dropping a partition from memory store before getting it back.
* In addition, we must be careful to not unroll the entire partition in memory at once.
* Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
* single partition. Instead, we unroll the values cautiously, potentially aborting and
* dropping the partition to disk if applicable.
//todo 采用memory的形式存储
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
updatedBlocks ++=
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
case Right(it) =>
// There is not enough space to cache this partition in memory
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
useOffHeap = false, deserialized = false, putLevel.replication)
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
} else {