美文网首页
rdd的依赖

rdd的依赖

作者: lemontree_hook | 来源:发表于2018-07-10 10:44 被阅读0次

    例子一:

    一个简单的例子,看一下mappartitonrdd是怎么形成依赖的

    scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[88] at makeRDD at <console>:24
    
    scala> val rdd2 = rdd1.map(x=>(x._1,x._2+1))
    rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[90] at map at <console>:26
    通过makeRDD,构造了ParallelCollectionRDD,ParallelCollectionRDD通过map算子形成了MapPartitionsRDD
    makeRDD ----  返回一个ParallelCollectionRDD
    private[spark] class ParallelCollectionRDD[T: ClassTag](
        sc: SparkContext,
        @transient private val data: Seq[T],
        numSlices: Int,
        locationPrefs: Map[Int, Seq[String]])
        extends RDD[T](sc, Nil)
    ParallelCollectionRDD本身没有构造函数,它的初始化动作是在RDD这个抽象类中实现的,请注意传入的构造函数sc,Nil,接下来看一下RDD构造函数:
    abstract class RDD[T: ClassTag](
        @transient private var _sc: SparkContext,
        @transient private var deps: Seq[Dependency[_]]
      ) extends Serializable with Logging 
    

    例子二:

    接下来我们看一下join算子,join一定会产生shuffle吗??接下来我们看一下:

    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[72] at makeRDD at <console>:24
    
    scala> var rdd2 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)),10)
    rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[73] at makeRDD at <console>:24
    scala> rdd1.partitions.length
    res8: Int = 2
    
    scala> rdd2.partitions.length
    res9: Int = 5
    已经按照参数进行分区成功,
    scala> val rdd3 = rdd1.join(rdd2)
    rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[10] at join at <console>:28
    rdd3怎么会是一个MapPartitionsRDD,join不是一本shuffle操作吗,稍后我们会分析一下源码接着下面操作
    scala> rdd3.dependencies
    res13: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@69718b8c)
    rdd3的依赖也是一个窄依赖
    scala> rdd3.dependencies(0).rdd
    res14: org.apache.spark.rdd.RDD[_] = MapPartitionsRDD[9] at join at <console>:28
    rdd3的依赖rdd也是一个MapPartitionsRDD,那我们顺着依赖倒退回去看看
    scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
    res17: org.apache.spark.rdd.RDD[_] = CoGroupedRDD[8] at join at <console>:28
    终于出现CoGroupedRDD了
    scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies
    res18: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@7d1a9695, org.apache.spark.ShuffleDependency@7acc1c2b)
    
    scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x.rdd))
    ParallelCollectionRDD[0] at makeRDD at <console>:24
    ParallelCollectionRDD[1] at makeRDD at <console>:24
    
    ParallelCollectionRDD[0]  ----- CoGroupedRDD -- MapPartitionsRDD
    ParallelCollectionRDD[1] 
    
    

    到这里你应该已经明白了,rdd1.join(rdd2)的过程中rdd是怎么样做的算子转化的,其中CoGroupedRDD的依赖有两个都是ShuffleDependency依赖的类型
    但是CoGroupedRDD之后的MapPartitionsRDD是怎么产生的那??看下CoGroupedRDD源代码:

      def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
          other2: RDD[(K, W2)],
          other3: RDD[(K, W3)],
          partitioner: Partitioner)
          : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
        if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
          throw new SparkException("HashPartitioner cannot partition array keys.")
        }
        val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
        cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
           (vs.asInstanceOf[Iterable[V]],
             w1s.asInstanceOf[Iterable[W1]],
             w2s.asInstanceOf[Iterable[W2]],
             w3s.asInstanceOf[Iterable[W3]])
        }
      }
      可以看到CoGroupedRDD之后又做了mapValues的操作,
        def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
        val cleanF = self.context.clean(f)//闭包清理
        new MapPartitionsRDD[(K, U), (K, V)](self,
          (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
          preservesPartitioning = true)
      }
      返回一个MapPartitionsRDD
    

    例子三:

    现在我们对rdd1做一点小变化

      scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[41] at makeRDD at <console>:24
    再rdd1对它进行hash分区,rdd2保持不变
    scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5).partitionBy(new org.apache.spark.HashPartitioner(2))
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[50] at partitionBy at <console>:24
    再将rdd1和rdd2做join操作
    scala> val rdd3 = rdd1.join(rdd2)
    rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[53] at join at <console>:28
    我们看一下rdd3的依赖关系有什么变化:
    scala> rdd3.dependencies
    res45: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@381c4f8)
    scala> rdd3.dependencies(0)
    res46: org.apache.spark.Dependency[_] = org.apache.spark.OneToOneDependency@381c4f8
    scala> rdd3.dependencies(0).rdd
    res47: org.apache.spark.rdd.RDD[_] = MapPartitionsRDD[52] at join at <console>:28
    scala> rdd3.dependencies(0).rdd.dependencies
    res48: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@3672c520)
    scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
    res49: org.apache.spark.rdd.RDD[_] = CoGroupedRDD[51] at join at <console>:28
    到这里你会发现和之前的依赖关系都没有变化,继续看下去
    scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x.rdd))
    ShuffledRDD[50] at partitionBy at <console>:24
    ParallelCollectionRDD[37] at makeRDD at <console>:24
    由于我们对rdd1做了一次partitionBy,那么它将变成一个ShuffledRDD,两一个依赖的rdd还是ParallelCollectionRDD,这没有什么问题,看一下依赖的类型
    scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x))
    org.apache.spark.OneToOneDependency@62fe218f
    org.apache.spark.ShuffleDependency@2a914623
    

    你会发现由于rdd1我们采用了hash重新分区,和之前的对比你会发现依赖关系由之前的ShuffleDependency变成了OneToOneDependency。
    发生这个变换是发生在CoGroupedRDD的依赖,我们看一下CoGroupedRDD的依赖是怎么得到的:

      override def getDependencies: Seq[Dependency[_]] = {
        rdds.map { rdd: RDD[_] =>//对于参与cogroup操作的都要进行遍历和判断
          if (rdd.partitioner == Some(part)) {//如果参与cogroup的rdd的分区方法等于part分区方法那么产生窄依赖
            logDebug("Adding one-to-one dependency with " + rdd)
            new OneToOneDependency(rdd)
          } else {
            logDebug("Adding shuffle dependency with " + rdd)
            new ShuffleDependency[K, Any, CoGroupCombiner](
              rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
          }
        }
      }
    

    很明显了,其实在做CoGroupedRDD的时候并不是每个一定会产生ShuffleDependency,可以参考注释但是需要解释一下的就是那么part是什么,我们需要看一下join
    的代码是怎么实现的:
    join实现几个api,我们这里是仅仅传入一个参数

      def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
        join(other, defaultPartitioner(self, other))
      }
    

    可以看到join底层实现是必须要实现分区器的,如果不传入的情况下,使用默认的defaultPartitioner,一般情况下defaultPartitioner就是HashPartitioner
    (defaultPartitioner可以去看一下源码这里不做分析)

      def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
        this.cogroup(other, partitioner).flatMapValues( pair =>
          for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
        )
      }
     重点看下cogroup传入的两个参数:other--参与join的另一个rdd,partitioner分区器
       def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
          : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
        if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
          throw new SparkException("HashPartitioner cannot partition array keys.")
        }
        val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
        cg.mapValues { case Array(vs, w1s) =>
          (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
        }
      }
    

    CoGroupedRDD:传入第一个参数就是一个序列参与join两个rdd,请注意他们的顺序,第二个参数就是分区器,好了到这里我们知道了part其实就是你调用join的时候指定的分区方法,如果你没有传入那么会给你一个defaultPartitioner一般情况下就是HashPartitioner,他也就是上面说的到part, if (rdd.partitioner == Some(part)) 就是为了判断参与join的分区方法是否与join的时候的分区方法是否相等,如果他们使用同一种分区方法那就会形成窄依赖
    如果采用不同的方法那么就会产生一个宽依赖,其实道理也很好懂,采用相同的分区方法那么在join的时候其实也就知道了分区的对应关系。

    那么我们为什么在例子二中,为什么产生了不同的结果那,看一下例子二中rdd1和rdd2的分区方法是什么

    scala>  var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
    scala> var rdd2 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)),10)
    rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:24
    
    scala> rdd1.partitioner
    res0: Option[org.apache.spark.Partitioner] = None
    scala> rdd2.partitioner
    res1: Option[org.apache.spark.Partitioner] = None
    partitioner为None,没有分区方法,可以去看看makeRDD是怎么样对数据分区做划分的
    

    例子四:

    最后我们看一下reduceByKey的情况

    scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5).partitionBy(new org.apache.spark.HashPartitioner(2))
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[6] at partitionBy at <console>:24
    
    scala> var rdd3 = rdd1.reduceByKey(_ + _)
    rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at reduceByKey at <console>:26
    
    scala> rdd3.dependencies
    res4: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@43d35c2a)
    
    scala> rdd3.dependencies(0).rdd
    res5: org.apache.spark.rdd.RDD[_] = ShuffledRDD[6] at partitionBy at <console>:24
    
    scala> rdd3.dependencies(0).rdd.dependencies
    res6: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@480217c6)
    
    scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
    res7: org.apache.spark.rdd.RDD[_] = ParallelCollectionRDD[5] at makeRDD at <console>:24
    

    可以看到rdd1尽管执行了reduceByKey,但是没有产生宽依赖,看reduceByKey的源码实现发现原因和之前join原理是一样的,会对分区方法进行判断时候产生宽窄依赖,
    如果rdd1不进行partitionBy操作,就会产生宽依赖,因为仅仅执行makeRDD产生的rdd的partitioner方法为None,有兴趣的可以下自行验证。

    总结: rdd作为弹性分布式数据集,我们都知道他是惰性计算,再遇到action算子之前不会对数据进行真正的计算,仅仅会保存这些rdd之间的依赖关系,在真正要计算的时候通过每个rdd的dependences去找到它的父亲依赖关系和父亲rdd,从而进行回溯,stage的划分也就是通过计算依赖完成的。通过上面的例子分析,不难发现其实rdd的一些shuffle算子join,reduceByKey不一定会产生宽依赖,取决于传入算子中的分区计算和调用这个算子的rdd的分区方法

    相关文章

      网友评论

          本文标题:rdd的依赖

          本文链接:https://www.haomeiwen.com/subject/kkhfpftx.html