美文网首页
RDD依赖关系

RDD依赖关系

作者: 万事万物 | 来源:发表于2021-07-13 12:02 被阅读0次

前言

RDD的五大特性

  • A list of partitions
    一组分区:多个分区,在RDD中用分区的概念。
  • A function for computing each split
    函数:每个(split/partitions)对应的计算逻辑
  • A list of dependencies on other RDDs
    依赖关系:可对其他RDD有依赖关系,比如上一个RDD结果需要由下一个RDD进行处理。
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    分区器:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的paritioner接口,控制Key分到哪个reduce。
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
    优先位置:作用在每个分区上的优先位置。由spark自动分配

其中有一个就是 - A list of dependencies on other RDDs(依赖关系)

依赖关系的作用

当RDD运行出错时或造成数据丢失,可以根据依赖关系,重新计算并获取数据。


依赖关系

若rdd4运算过程中出现错误,它可以根据它的依赖关系,从头到尾再运行一遍。

查看血缘[了解即可]

所谓血缘,简单说就是,你的父亲是谁,你父类的父亲是谁,你父类的父亲的父亲又是谁。就相当于家里的族谱。通过族谱你可以知道,你的祖先是谁。在spark中可以通过toDebugString可以产线RDD的依赖关系线。

案例:通过wroldCount程序讲解说明
源代码:方便对比后面的改动

  @Test
  def worldCount():Unit={
    //读取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)

    // 内容扁平化
    val worldList: RDD[String] = lines.flatMap(_.split(" "))

    // 内容分组
    val groupList: RDD[(String, Iterable[String])] = worldList.groupBy(s => s)

    // 统计单词数量
    val result=groupList.map(x=>(x._1,x._2.size))

    println(result.collect().toList)

  }

使用toDebugString 打印RDD之间的依赖线

  @Test
  def worldCount():Unit={
    //读取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)
    println("*"*50)
    println(lines.toDebugString)
    println("lines","-"*50)

    // 内容扁平化
    val worldList: RDD[String] = lines.flatMap(_.split(" "))
    println(worldList.toDebugString)
    println("worldList","-"*50)

    // 内容分组
    val groupList: RDD[(String, Iterable[String])] = worldList.groupBy(s => s)
    println(groupList.toDebugString)
    println("groupList","-"*50)

    // 统计单词数量
    val result=groupList.map(x=>(x._1,x._2.size))
    println(result.toDebugString)
    println("result","-"*50)

    println(result.collect().toList)

  }

结果:

**************************************************
(5) file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
 |  file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(lines,--------------------------------------------------)
(5) MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
 |  file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
 |  file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(worldList,--------------------------------------------------)
(5) ShuffledRDD[4] at groupBy at MapAndMapPartitions.scala:185 []
 +-(5) MapPartitionsRDD[3] at groupBy at MapAndMapPartitions.scala:185 []
    |  MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
    |  file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
    |  file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(groupList,--------------------------------------------------)
(5) MapPartitionsRDD[5] at map at MapAndMapPartitions.scala:190 []
 |  ShuffledRDD[4] at groupBy at MapAndMapPartitions.scala:185 []
 +-(5) MapPartitionsRDD[3] at groupBy at MapAndMapPartitions.scala:185 []
    |  MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
    |  file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
    |  file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(result,--------------------------------------------------)

lines 的依赖关系

(5) file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
 |  file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []

RDD(lines)需要依赖HadoopRDDMapPartitionsRDD 就是lines本身这个RDD;
这一步操作,完成了从文件中读取数据,

worldList 的依赖关系:
它的父RDD就是lines,所以需要依赖MapPartitionsRDD,同时也会继承父RDD的依赖。

(5) MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
 |  file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
 |  file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []

result 的依赖关系:
中间的依赖关系都是这样,所以就省略了,到了result这个RDD,除了继承它的父RDD外,也会把它父RDD之前的依赖关系,都会继承下来。

(5) MapPartitionsRDD[5] at map at MapAndMapPartitions.scala:190 []
 |  ShuffledRDD[4] at groupBy at MapAndMapPartitions.scala:185 []
 +-(5) MapPartitionsRDD[3] at groupBy at MapAndMapPartitions.scala:185 []
    |  MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
    |  file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
    |  file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []

总结:一整个job中所有rdd的链条

  1. 子RDD 会有父类的所有依赖关系,父RDD不会有子类的依赖关系。
  2. 每一层依赖都有一个序列号,序号越小,表示关系依赖越深。就像族谱中的排名,往往在最前面或最后的,都是时间关系线很久的先辈。
  3. 序号为0表示最顶级的RDD依赖。

依赖关系

依赖关系: 是指两个RDD的关系

spark RDD依赖关系分为两种:

  1. 宽依赖:有shuffle的称之为宽依赖 【如果父RDD一个分区的数据被子RDD多个分区所使用】
  2. 窄依赖: 没有shuffle的称之为窄依赖 【如果父RDD一个分区的数据只被子RDD一个分区所使用】

依旧时上面的案例

  @Test
  def worldCount():Unit={
    //读取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)
    println("*"*50)
    println(lines.dependencies)
    println("lines","-"*50)

    // 内容扁平化
    val worldList: RDD[String] = lines.flatMap(_.split(" "))
    println(worldList.dependencies)
    println("worldList","-"*50)

    // 内容分组
    val groupList: RDD[(String, Iterable[String])] = worldList.groupBy(s => s)
    println(groupList.dependencies)
    println("groupList","-"*50)

    // 统计单词数量
    val result=groupList.map(x=>(x._1,x._2.size))
    println(result.dependencies)
    println("result","-"*50)

    println(result.collect().toList)
  }

结果

**************************************************
List(org.apache.spark.OneToOneDependency@623ebac7)
(lines,--------------------------------------------------)
List(org.apache.spark.OneToOneDependency@3dd31157)
(worldList,--------------------------------------------------)
List(org.apache.spark.ShuffleDependency@34b9eb03)
(groupList,--------------------------------------------------)
List(org.apache.spark.OneToOneDependency@606f81b5)
(result,--------------------------------------------------)

VS

**************************************************
(5) file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
 |  file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(lines,--------------------------------------------------)
(5) MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
 |  file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
 |  file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(worldList,--------------------------------------------------)
(5) ShuffledRDD[4] at groupBy at MapAndMapPartitions.scala:185 []
 +-(5) MapPartitionsRDD[3] at groupBy at MapAndMapPartitions.scala:185 []
    |  MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
    |  file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
    |  file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(groupList,--------------------------------------------------)
(5) MapPartitionsRDD[5] at map at MapAndMapPartitions.scala:190 []
 |  ShuffledRDD[4] at groupBy at MapAndMapPartitions.scala:185 []
 +-(5) MapPartitionsRDD[3] at groupBy at MapAndMapPartitions.scala:185 []
    |  MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
    |  file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
    |  file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(result,--------------------------------------------------)

注意到没有:RDD('groupList') 是一个宽依赖(ShuffledRDD),会进行一次shuffle(通过ShuffledRDD可以看出来;其他都是窄依赖(OneToOneDependency)。

依赖(Dependency)的分类

spark只有两种依赖宽依赖(WideDependence),窄依赖(NarrowDependency)

宽依赖(WideDependence):只有一个

  • ShuffleDependency‘:父对子(一对多),一个父亲多个孩子

窄依赖(NarrowDependency):有三个

  • PruneDependency :外部无法使用,所以不讲
  • OneToOneDependency:一对一的依赖关系,如;RDD1依赖RDD2
  • RangeDependency:子对父(一个还是有多个干爹),如;RDD1依赖RDD2,同时依赖于RDD3

宽依赖,窄依赖的作用

主要用于进行shuffle切分的

最后

血统: 一个job中rdd先后顺序的链条

  • 如何查看血统: rdd.toDebugString

依赖: 两个RDD的关系

  • 查了两个RDD的依赖关系: rdd.dependencys
  • RDD的依赖关系分为两种:
    宽依赖: 有shuffle的称之为宽依赖
    窄依赖: 没有shuffle的称之为窄依赖

相关文章

  • RDD的依赖关系:宽依赖和窄依赖

    RDD之间的依赖关系是指RDD之间的上下来源关系,RDD2依赖RDD1,RDD4依赖于RDD2和RDD3,RDD7...

  • Spark之RDD强化学习

    一、RDD依赖关系 1、RDD的依赖关系分为窄依赖和宽依赖;2、窄依赖是说父RDD的每一个分区最多被一个子RDD的...

  • RDD 的宽依赖和窄依赖

    1. RDD 间的依赖关系 RDD和它依赖的父 RDD(s)的关系有两种不同的类型,即窄依赖(narrow dep...

  • Spark Core2--LineAge

    Lineage RDD Lineage(又称为RDD运算图或RDD依赖关系图)是RDD所有父RDD的graph(图...

  • RDD依赖关系

    Spark中RDD的高效与DAG图有着莫大的关系,在DAG调度中需要对计算过程划分stage,而划分依据就是RDD...

  • RDD依赖关系

    前言 RDD的五大特性 A list of partitions一组分区:多个分区,在RDD中用分区的概念。 A ...

  • RDD的依赖关系

    RDD的依赖关系 窄依赖 每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如m...

  • 1.3 Spark-RDD的依赖关系

    RDD的依赖关系分为两种: 窄依赖(A>B) 定义:父RDD的一个分区最多被子RDD的一个分区依赖。有两中情况: ...

  • Spark RDD的依赖关系

    RDD和它依赖的父RDD(s)的关系有两种不同类型,即窄依赖(narrow dependency)和 宽依赖(wi...

  • 2 通过案例对SparkStreaming透彻理解之二

    Spark Core是基于RDD形成的,RDD之间都会有依赖关系。而Spark Streaming是在RDD之上增...

网友评论

      本文标题:RDD依赖关系

      本文链接:https://www.haomeiwen.com/subject/phxpultx.html