美文网首页
Dependency in Spark

Dependency in Spark

作者: Lutouch | 来源:发表于2019-01-04 15:18 被阅读0次

    What is Dependency in Spark?

    依赖(Dependency)用于表示子 RDD 的一个分区与父 RDD 中的哪些分区相关。

    Type of Dependencies

    Narrow Dependency

    Each partition of the parent RDD is used by at most one partition of the child RDD.

    窄依赖(Narrow Dependency)指父 RDD 的一个分区至多被一个子 RDD 的分区使用到。

    Shuffle Dependency

    混合依赖(Shuffle Dependency)指父 RDD 中的一个分区被子 RDD 中多个分区使用到。

    Code Snippets

    Dependency

    abstract class Dependency[T] extends Serializable {
      def rdd: RDD[T]
    }
    

    NarrowDependency

    Each partition of the child RDD depends on a small number of partitions of the parent RDD.

    由于窄依赖中每个子 RDD 的分区只能依赖于父 RDD 中的一部分分区(具体是哪些分区取决于所采用的转换操作-Transformation)。因而在窄依赖的类定义中要求任何具体子类都必须实现 getParents(partitionId) 方法,指示子 RDD 的该分区依赖于父 RDD 中的哪些分区。

    /**
     * :: DeveloperApi ::
     * Base class for dependencies where each partition of the child RDD depends on a small number
     * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
     */
    @DeveloperApi
    abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
      /**
       * Get the parent partitions for a child partition.
       * @param partitionId a partition of the child RDD
       * @return the partitions of the parent RDD that the child partition depends upon
       */
      def getParents(partitionId: Int): Seq[Int]
    
      override def rdd: RDD[T] = _rdd
    }
    

    ShuffleDependency

    ShuffleDependency 类用于表征对一个 shuffle stage 输出结果的依赖。

    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
        @transient private val _rdd: RDD[_ <: Product2[K, V]],
        val partitioner: Partitioner,
        val serializer: Serializer = SparkEnv.get.serializer,
        val keyOrdering: Option[Ordering[K]] = None,
        val aggregator: Option[Aggregator[K, V, C]] = None,
        val mapSideCombine: Boolean = false)
      extends Dependency[Product2[K, V]] {
    
      override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
    
      private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
      private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
      // Note: It's possible that the combiner class tag is null, if the combineByKey
      // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
      private[spark] val combinerClassName: Option[String] =
        Option(reflect.classTag[C]).map(_.runtimeClass.getName)
    
      val shuffleId: Int = _rdd.context.newShuffleId()
    
      val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
        shuffleId, _rdd.partitions.length, this)
    
      _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
    }
    

    ShuffleDependency 中几个关键参数的说明如下:

    1. _rdd: 父 RDD
    2. partitioner: shuffle write 端采用的分区方法
    3. serializer: 序列化方法
    4. keyOrdering: key ordering for RDD's shuffles ???
    5. aggregator: shuffle read 端的聚合器
    6. mapSideCombine: 是否实行 map 端合并

    相关文章

      网友评论

          本文标题:Dependency in Spark

          本文链接:https://www.haomeiwen.com/subject/qcxkrqtx.html