美文网首页Spark的那些坑
Spark RDD依赖关系简述

Spark RDD依赖关系简述

作者: 润土1030 | 来源:发表于2019-05-01 11:49 被阅读0次

RDD,即Resilient Distributed Dataset,是Spark的核心概念。这篇文章就是讲讲spark的rdd依赖关系的,不理解Spark的rdd依赖关系,很多东西你都无法理解。

Spark的RDD依赖主要分为两大类,一类是窄依赖(Narrow Dependencies),一类是宽依赖(Wide Dependencies)。

他们的关系如下图所示
855959-20161009115627506-271998705.png

可以看到,所谓的窄依赖指的是父RDD的每个分区都至多被子RDD的分区使用,而宽依赖指的是多个子RDD的分区依赖一个父RDD的分区,这就涉及到shuffle操作了。

接下让我们通过一段代码看看什么操作是宽依赖,什么操作是窄依赖。

object DependencyDemo {

  def printDependencyInfo(dep: Dependency[_]) {
    println("Dependency type : " + dep.getClass)
    println("Dependency RDD : " + dep.rdd)
    println("Dependency partitions : " + dep.rdd.dependencies)
    println("Dependency partitions size : " + dep.rdd.dependencies.length)

  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("/user/cloudera/barcelona/births/births.csv")

    val wordPair = rdd.flatMap(_.split(",")).map(x => (x, 1))
    wordPair.dependencies.foreach(printDependencyInfo(_))
    val wordCount = wordPair.reduceByKey(_ + _)
    wordCount.dependencies.foreach(printDependencyInfo(_))
  }
}

把这段代码用spark-shell运行或者yarn-client模式提交下得到的结果如下

19/04/30 20:16:52 INFO spark.SparkContext: Created broadcast 0 from textFile at DependencyDemo.scala:23
Dependency type : class org.apache.spark.OneToOneDependency
Dependency RDD : MapPartitionsRDD[2] at flatMap at DependencyDemo.scala:24
Dependency partitions : List(org.apache.spark.OneToOneDependency@4cc2e9fc)
Dependency partitions size : 1
19/04/30 20:16:52 INFO mapred.FileInputFormat: Total input paths to process : 1
Dependency type : class org.apache.spark.ShuffleDependency
Dependency RDD : MapPartitionsRDD[3] at map at DependencyDemo.scala:24
Dependency partitions : List(org.apache.spark.OneToOneDependency@38b1b855)
Dependency partitions size : 1

可以看到第一个wordPair是OneToOneDependency,第二个wordCount是ShuffleDependency。

我们查看下spark的源代码看看spark如何定义Dependency的,我这里的spark版本是1.6.2。


image.png

图中的ShuffleDependency就是宽依赖。

我们看下ShuffleDependency的代码,代码其实很简单,里面关键的就是生成一个shuffleId并向ShuffleManager注册

@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Option[Serializer] = None,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.size, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

OneToOneDependency的代码就更简单了,里面只有一个获取parent的方法。

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

这些知识对理解spark的核心还是很有用的,如何划分stage,就是根据RDD的依赖关系来的,遇到宽依赖就划分stage,这个后面我们有时间专门写一篇文章讲讲spark的DAGScheduler如何划分stage。

相关文章

网友评论

    本文标题:Spark RDD依赖关系简述

    本文链接:https://www.haomeiwen.com/subject/cwminqtx.html