spark核心构件之Dependency(依赖)

作者: 曾二爷耶 | 来源:发表于2019-03-23 11:57 被阅读3次

    之前的文章说一个spark任务其实就是一系列rdd构成的有向无环图(dag),今天我们来看看,spark是如何表示rdd之间的依赖关系建立这个dag的。

    一、rdd如何构成dag

    上篇文章讲到了Partition和Partitioner知道了rdd是由一系列分区(partition)组成的,rdd之间的关系主要的其实就是分区之间的关系,也就是子rdd的某个分区数据需要依赖哪些rdd的哪些分区计算得到。

    spark将rdd之间的关系抽象成了Dependency这个类,用于连接父子rdd,子rdd持有Dependency对象,Dependency对象里包含了父rdd。也就是dag的构成就像下面这样rdd1是rdd2和rdd3的子rdd


    dag

    二、Dependency的定义

    依赖类图
    abstract class Dependency[T] extends Serializable {
      def rdd: RDD[T]
    }
    

    只包含了一个rdd,就是父rdd的对象了 。Dependency有两个子类就是大家熟悉的款依赖和窄依赖了。

    三、窄依赖 NarrowDependency

    abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
      def getParents(partitionId: Int): Seq[Int]
      override def rdd: RDD[T] = _rdd
    }
    

    这是窄依赖的定义,一看就知道它想干嘛了是吧,就一个getParents函数,给一个子rdd的partitionId输出所依赖的父rdd的partitionId。我们还能知道代表partition的其实就是一个int值的partitionId。我们还能知道只有通过子rdd的partition才能知道依赖的父rdd的partition,而不能通过父rdd得到子rdd,这就说明rdd得计算方式只能是从子rdd向上遍历进行计算。
    窄依赖有两个子类一个是OneToOneDependency

    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
      override def getParents(partitionId: Int): List[Int] = List(partitionId)
    }
    

    很明显就是一一对应,子rdd的一个partitionid就依赖一个父rdd同样的partitionid

    另一个窄依赖的子类RangeDependency只用于union的时候,子rdd会有多个依赖每一个依赖都指向一个父rdd,大家可以先想想如果是你,你会怎么去实现多个rdd的union。

    四、宽依赖 ShuffleDependency

    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
        @transient private val _rdd: RDD[_ <: Product2[K, V]],
        val partitioner: Partitioner,
        val serializer: Serializer = SparkEnv.get.serializer,
        val keyOrdering: Option[Ordering[K]] = None,
        val aggregator: Option[Aggregator[K, V, C]] = None,
        val mapSideCombine: Boolean = false,
        val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
      extends Dependency[Product2[K, V]] {
    ……
      override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
      val shuffleId: Int = _rdd.context.newShuffleId()
      val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
        shuffleId, _rdd.partitions.length, this)
    }
    

    上面就是ShuffleDependency的定义了,Dependency对象里面包含父rdd的对象,DAGScheduler在进行stage划分和task分配的时候就可以通过Dependency获取shuffleWriter写数据了。
    子rdd持有这个Dependency对象,子rdd就可以通过它获得shuffle信息拉取上个stage的数据。

    五、总结

    Dependency是rdd之间的连接,表达了子rdd在计算某个partition的时候应该去哪个rdd的哪个partitions取数据。Dependency又分宽窄依赖,而宽依赖包含了shuffle信息,父rdd通过它写数据,子rdd通过它获取数据。

    相关文章:
    https://www.jianshu.com/p/67fff2e477fa
    https://www.jianshu.com/p/982f1013360a
    https://www.jianshu.com/p/be80dcfcb85a

    关注众公号(曾二爷) 和优秀的人一起学习。

    相关文章

      网友评论

        本文标题:spark核心构件之Dependency(依赖)

        本文链接:https://www.haomeiwen.com/subject/yzqssqtx.html