美文网首页spark
rdd依赖-源码分析

rdd依赖-源码分析

作者: 专职掏大粪 | 来源:发表于2021-09-20 08:58 被阅读0次

    NarrowDependency

    val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
          val x = random * 2 - 1
          val y = random * 2 - 1
          if (x*x + y*y <= 1) 1 else 0
        }
    

    MapPartitionsRDD

     def map[U: ClassTag](f: T => U): RDD[U] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
      }
    
    private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
        var prev: RDD[T],
        f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
        preservesPartitioning: Boolean = false,
        isFromBarrier: Boolean = false,
        isOrderSensitive: Boolean = false)
      extends RDD[U](prev) {
    

    var prev: RDD[T]

    OneToOneDependency

      def this(@transient oneParent: RDD[_]) =
        this(oneParent.context, List(new OneToOneDependency(oneParent)))
    
    abstract class RDD[T: ClassTag](
        @transient private var _sc: SparkContext,
         deps 就是new出来的OneToOneDependency
        @transient private var deps: Seq[Dependency[_]]
      ) extends Serializable with Logging {
    
    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
      override def getParents(partitionId: Int): List[Int] = List(partitionId)
    }
    
    abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
      /**
       * Get the parent partitions for a child partition.
       * @param partitionId a partition of the child RDD
       * @return the partitions of the parent RDD that the child partition depends upon
       */
      def getParents(partitionId: Int): Seq[Int]
      //Dependency的rdd就是之前传过来的prev rdd
      override def rdd: RDD[T] = _rdd
    }
    

    ShuffleDependency

    groupByKey()

      val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
          createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    

    combineByKeyWithClassTag

    //创建ShuffledRDD
     new ShuffledRDD[K, V, C](self, partitioner)
            .setSerializer(serializer)
            .setAggregator(aggregator)
            .setMapSideCombine(mapSideCombine)
    
    class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
        @transient var prev: RDD[_ <: Product2[K, V]],
        part: Partitioner)
       //RDD[(K, C)](prev.context, Nil)  这里的构造传的Dependency为nil,因为getDependencies有默认值
      extends RDD[(K, C)](prev.context, Nil) {
    
    override def getDependencies: Seq[Dependency[_]] = {
      val serializer = userSpecifiedSerializer.getOrElse {
        val serializerManager = SparkEnv.get.serializerManager
        if (mapSideCombine) {
          serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
        } else {
          serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
        }
      }
      //new ShuffleDependency,传递prev rdd(哪个rdd调用的groupby oprator)
      List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
    }
    

    ShuffleDependency 的rdd就是prev rdd

     override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
    

    相关文章

      网友评论

        本文标题:rdd依赖-源码分析

        本文链接:https://www.haomeiwen.com/subject/qremgltx.html