spark核心构件之Dependency(依赖)

作者: 曾二爷耶 | 来源:发表于2019-03-23 11:57 被阅读3次

之前的文章说一个spark任务其实就是一系列rdd构成的有向无环图(dag),今天我们来看看,spark是如何表示rdd之间的依赖关系建立这个dag的。

一、rdd如何构成dag

上篇文章讲到了Partition和Partitioner知道了rdd是由一系列分区(partition)组成的,rdd之间的关系主要的其实就是分区之间的关系,也就是子rdd的某个分区数据需要依赖哪些rdd的哪些分区计算得到。

spark将rdd之间的关系抽象成了Dependency这个类,用于连接父子rdd,子rdd持有Dependency对象,Dependency对象里包含了父rdd。也就是dag的构成就像下面这样rdd1是rdd2和rdd3的子rdd


dag

二、Dependency的定义

依赖类图
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

只包含了一个rdd,就是父rdd的对象了 。Dependency有两个子类就是大家熟悉的款依赖和窄依赖了。

三、窄依赖 NarrowDependency

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  def getParents(partitionId: Int): Seq[Int]
  override def rdd: RDD[T] = _rdd
}

这是窄依赖的定义,一看就知道它想干嘛了是吧,就一个getParents函数,给一个子rdd的partitionId输出所依赖的父rdd的partitionId。我们还能知道代表partition的其实就是一个int值的partitionId。我们还能知道只有通过子rdd的partition才能知道依赖的父rdd的partition,而不能通过父rdd得到子rdd,这就说明rdd得计算方式只能是从子rdd向上遍历进行计算。
窄依赖有两个子类一个是OneToOneDependency

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

很明显就是一一对应,子rdd的一个partitionid就依赖一个父rdd同样的partitionid

另一个窄依赖的子类RangeDependency只用于union的时候,子rdd会有多个依赖每一个依赖都指向一个父rdd,大家可以先想想如果是你,你会怎么去实现多个rdd的union。

四、宽依赖 ShuffleDependency

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false,
    val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
  extends Dependency[Product2[K, V]] {
……
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
  val shuffleId: Int = _rdd.context.newShuffleId()
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)
}

上面就是ShuffleDependency的定义了,Dependency对象里面包含父rdd的对象,DAGScheduler在进行stage划分和task分配的时候就可以通过Dependency获取shuffleWriter写数据了。
子rdd持有这个Dependency对象,子rdd就可以通过它获得shuffle信息拉取上个stage的数据。

五、总结

Dependency是rdd之间的连接,表达了子rdd在计算某个partition的时候应该去哪个rdd的哪个partitions取数据。Dependency又分宽窄依赖,而宽依赖包含了shuffle信息,父rdd通过它写数据,子rdd通过它获取数据。

相关文章:
https://www.jianshu.com/p/67fff2e477fa
https://www.jianshu.com/p/982f1013360a
https://www.jianshu.com/p/be80dcfcb85a

关注众公号(曾二爷) 和优秀的人一起学习。

相关文章

网友评论

    本文标题:spark核心构件之Dependency(依赖)

    本文链接:https://www.haomeiwen.com/subject/yzqssqtx.html