之前的文章说一个spark任务其实就是一系列rdd构成的有向无环图(dag),今天我们来看看,spark是如何表示rdd之间的依赖关系建立这个dag的。
一、rdd如何构成dag
上篇文章讲到了Partition和Partitioner知道了rdd是由一系列分区(partition)组成的,rdd之间的关系主要的其实就是分区之间的关系,也就是子rdd的某个分区数据需要依赖哪些rdd的哪些分区计算得到。
spark将rdd之间的关系抽象成了Dependency这个类,用于连接父子rdd,子rdd持有Dependency对象,Dependency对象里包含了父rdd。也就是dag的构成就像下面这样rdd1是rdd2和rdd3的子rdd
dag
二、Dependency的定义
依赖类图abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
只包含了一个rdd,就是父rdd的对象了 。Dependency有两个子类就是大家熟悉的款依赖和窄依赖了。
三、窄依赖 NarrowDependency
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
这是窄依赖的定义,一看就知道它想干嘛了是吧,就一个getParents函数,给一个子rdd的partitionId输出所依赖的父rdd的partitionId。我们还能知道代表partition的其实就是一个int值的partitionId。我们还能知道只有通过子rdd的partition才能知道依赖的父rdd的partition,而不能通过父rdd得到子rdd,这就说明rdd得计算方式只能是从子rdd向上遍历进行计算。
窄依赖有两个子类一个是OneToOneDependency
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
很明显就是一一对应,子rdd的一个partitionid就依赖一个父rdd同样的partitionid
另一个窄依赖的子类RangeDependency只用于union的时候,子rdd会有多个依赖每一个依赖都指向一个父rdd,大家可以先想想如果是你,你会怎么去实现多个rdd的union。
四、宽依赖 ShuffleDependency
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false,
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
extends Dependency[Product2[K, V]] {
……
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
}
上面就是ShuffleDependency的定义了,Dependency对象里面包含父rdd的对象,DAGScheduler在进行stage划分和task分配的时候就可以通过Dependency获取shuffleWriter写数据了。
子rdd持有这个Dependency对象,子rdd就可以通过它获得shuffle信息拉取上个stage的数据。
五、总结
Dependency是rdd之间的连接,表达了子rdd在计算某个partition的时候应该去哪个rdd的哪个partitions取数据。Dependency又分宽窄依赖,而宽依赖包含了shuffle信息,父rdd通过它写数据,子rdd通过它获取数据。
相关文章:
https://www.jianshu.com/p/67fff2e477fa
https://www.jianshu.com/p/982f1013360a
https://www.jianshu.com/p/be80dcfcb85a
关注众公号(曾二爷) 和优秀的人一起学习。
网友评论