美文网首页Spark的那些坑
Spark RDD依赖关系简述

Spark RDD依赖关系简述

作者: 润土1030 | 来源:发表于2019-05-01 11:49 被阅读0次

    RDD,即Resilient Distributed Dataset,是Spark的核心概念。这篇文章就是讲讲spark的rdd依赖关系的,不理解Spark的rdd依赖关系,很多东西你都无法理解。

    Spark的RDD依赖主要分为两大类,一类是窄依赖(Narrow Dependencies),一类是宽依赖(Wide Dependencies)。

    他们的关系如下图所示
    855959-20161009115627506-271998705.png

    可以看到,所谓的窄依赖指的是父RDD的每个分区都至多被子RDD的分区使用,而宽依赖指的是多个子RDD的分区依赖一个父RDD的分区,这就涉及到shuffle操作了。

    接下让我们通过一段代码看看什么操作是宽依赖,什么操作是窄依赖。

    object DependencyDemo {
    
      def printDependencyInfo(dep: Dependency[_]) {
        println("Dependency type : " + dep.getClass)
        println("Dependency RDD : " + dep.rdd)
        println("Dependency partitions : " + dep.rdd.dependencies)
        println("Dependency partitions size : " + dep.rdd.dependencies.length)
    
      }
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        val sc = new SparkContext(conf)
        val rdd = sc.textFile("/user/cloudera/barcelona/births/births.csv")
    
        val wordPair = rdd.flatMap(_.split(",")).map(x => (x, 1))
        wordPair.dependencies.foreach(printDependencyInfo(_))
        val wordCount = wordPair.reduceByKey(_ + _)
        wordCount.dependencies.foreach(printDependencyInfo(_))
      }
    }
    

    把这段代码用spark-shell运行或者yarn-client模式提交下得到的结果如下

    19/04/30 20:16:52 INFO spark.SparkContext: Created broadcast 0 from textFile at DependencyDemo.scala:23
    Dependency type : class org.apache.spark.OneToOneDependency
    Dependency RDD : MapPartitionsRDD[2] at flatMap at DependencyDemo.scala:24
    Dependency partitions : List(org.apache.spark.OneToOneDependency@4cc2e9fc)
    Dependency partitions size : 1
    19/04/30 20:16:52 INFO mapred.FileInputFormat: Total input paths to process : 1
    Dependency type : class org.apache.spark.ShuffleDependency
    Dependency RDD : MapPartitionsRDD[3] at map at DependencyDemo.scala:24
    Dependency partitions : List(org.apache.spark.OneToOneDependency@38b1b855)
    Dependency partitions size : 1
    
    

    可以看到第一个wordPair是OneToOneDependency,第二个wordCount是ShuffleDependency。

    我们查看下spark的源代码看看spark如何定义Dependency的,我这里的spark版本是1.6.2。


    image.png

    图中的ShuffleDependency就是宽依赖。

    我们看下ShuffleDependency的代码,代码其实很简单,里面关键的就是生成一个shuffleId并向ShuffleManager注册

    @DeveloperApi
    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
        @transient private val _rdd: RDD[_ <: Product2[K, V]],
        val partitioner: Partitioner,
        val serializer: Option[Serializer] = None,
        val keyOrdering: Option[Ordering[K]] = None,
        val aggregator: Option[Aggregator[K, V, C]] = None,
        val mapSideCombine: Boolean = false)
      extends Dependency[Product2[K, V]] {
    
      override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
    
      private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
      private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
      // Note: It's possible that the combiner class tag is null, if the combineByKey
      // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
      private[spark] val combinerClassName: Option[String] =
        Option(reflect.classTag[C]).map(_.runtimeClass.getName)
    
      val shuffleId: Int = _rdd.context.newShuffleId()
    
      val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
        shuffleId, _rdd.partitions.size, this)
    
      _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
    }
    

    OneToOneDependency的代码就更简单了,里面只有一个获取parent的方法。

    @DeveloperApi
    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
      override def getParents(partitionId: Int): List[Int] = List(partitionId)
    }
    

    这些知识对理解spark的核心还是很有用的,如何划分stage,就是根据RDD的依赖关系来的,遇到宽依赖就划分stage,这个后面我们有时间专门写一篇文章讲讲spark的DAGScheduler如何划分stage。

    相关文章

      网友评论

        本文标题:Spark RDD依赖关系简述

        本文链接:https://www.haomeiwen.com/subject/cwminqtx.html