美文网首页
SparkSQL RDD的依赖是如何建立的

SparkSQL RDD的依赖是如何建立的

作者: 嘻嘻是小猪 | 来源:发表于2021-04-23 15:12 被阅读0次

RDD的依赖指的就是spark中的org.apache.spark.Dependency,Dependency有两个子类,NarrowDependencyShuffleDependency,也就是我们常说的窄依赖和宽依赖。

窄依赖指父RDD的每一个partition最多被子RDD的一个partition使用
宽依赖指父RDD的一个partition会被子RDD的多个partition使用
Dependency继承关系图

下面通过一条sparksql来解释宽窄依赖是如何建立的。

select
    i_item_sk
    ,count(*) as cnt
from
    tpcds_text_10.item
where
    i_item_sk>10
group by
    i_item_sk

它的物理执行计划如下:

== Physical Plan ==
*(2) HashAggregate(keys=[i_item_sk#61L], functions=[count(1)], output=[i_item_sk#61L, cnt#33L])
+- Exchange(coordinator id: 991724420) hashpartitioning(i_item_sk#61L, 200), coordinator[target post-shuffle partition size: 67108864]
   +- *(1) HashAggregate(keys=[i_item_sk#61L], functions=[partial_count(1)], output=[i_item_sk#61L, count#84L])
      +- *(1) Filter (isnotnull(i_item_sk#61L) && (i_item_sk#61L > 10))
         +- HiveTableScan [i_item_sk#61L], HiveTableRelation `tpcds_text_10`.`item`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [i_item_sk#61L, i_item_id#62, i_rec_start_date#63, i_rec_end_date#64, i_item_desc#65, i_current_price#66, i_wholesale_cost#67]
  • 窄依赖
    从上面执行计划中看到打印的Filter节点,它对应的是org.apache.spark.sql.execution.FilterExec
    看到其doExecute()方法:
protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
      val predicate = newPredicate(condition, child.output)
      predicate.initialize(0)
      iter.filter { row =>
        val r = predicate.eval(row)
        if (r) numOutputRows += 1
        r
      }
    }
  }

不复杂,重点走进mapPartitionsWithIndexInternal
这个mapPartitionsWithIndexInternalorg.apache.spark.rdd.RDD的一个方法:

private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
      preservesPartitioning)
  }

看到代码里有一个new MapPartitionsRDD的操作。
先来了解一下MapPartitionsRDD的构造方法:

MapPartitionsRDD的构造

上图中 1 处的prev, 就是FilterExec#doExecute()方法中child.execute()产生的RDD,也就是父RDD(计划树是从下往上执行的,子节点产生的RDD是父RDD)。
图中 2 处我们再进一步看一下,是RDD的构造方法:org.apache.spark.rdd.RDD#this

 /** Construct an RDD with just a one-to-one dependency on one parent */
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context, List(new OneToOneDependency(oneParent)))

关于org.apache.spark.rdd.RDD的构造也在下面贴个图:

image.png
上下对照,一目了然,这里传入了一个OneToOneDependency作为构造参数。
综上所述,新new的MapPartitionsRDD与其父RDD构建了一个OneToOneDependency依赖。
  • 宽依赖
    回到文章初的那个物理执行计划,Exchange节点对应的就是org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
    同样, 进入doExecute()
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
    // Returns the same ShuffleRowRDD if this plan is used by multiple plans.
    if (cachedShuffleRDD == null) {
      cachedShuffleRDD = coordinator match {
        case Some(exchangeCoordinator) =>
          val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
          assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
          shuffleRDD
        case _ =>
          val shuffleDependency = prepareShuffleDependency() /关注这里
          preparePostShuffleRDD(shuffleDependency) /关注这里
      }
    }
    cachedShuffleRDD
  }

我们先不关心有coordinator的情况,看到代码中的prepareShuffleDependency:

private[exchange] def prepareShuffleDependency()
    : ShuffleDependency[Int, InternalRow, InternalRow] = {
    ShuffleExchangeExec.prepareShuffleDependency(
      child.execute(), / 父RDD在这里被传入 
      child.output, newPartitioning, serializer)
  }

代码很容易,这里进入了ShuffleExchangeExec.prepareShuffleDependency的另一个重载方法:

 def prepareShuffleDependency(
      rdd: RDD[InternalRow],
      outputAttributes: Seq[Attribute],
      newPartitioning: Partitioning,
      serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow] = {
      
      /1. 构建对应的 Partitioner 
      val part: Partitioner = newPartitioning match {
        case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
        ......
        case _ => sys.error(s"Exchange not implemented for $newPartitioning")
    }
   
     /2. 这一段持续对传入的原有rdd做转化,得到新的rdd:rddWithPartitionIds
    val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = {
     
      val newRdd = if (isRoundRobin && SQLConf.get.sortBeforeRepartition) {
        rdd.mapPartitionsInternal { iter =>
          ......
          sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
        }
      } else {
        rdd
      }

      val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition
      if (needToCopyObjectsBeforeShuffle(part)) {
        newRdd.mapPartitionsWithIndexInternal((_, iter) => {
        ......
    }, isOrderSensitive = isOrderSensitive)
      } else {
        newRdd.mapPartitionsWithIndexInternal((_, iter) => {
        ......  
        }, isOrderSensitive = isOrderSensitive)
      }
    }
   }    
    /3.生成ShuffleDependency
     val dependency =
      new ShuffleDependency[Int, InternalRow, InternalRow](
        rddWithPartitionIds,
        new PartitionIdPassthrough(part.numPartitions),
        serializer)

    dependency
}

可以看到,最后构造了一个ShuffleDependency
而且用于构造ShuffleDependency的rdd已经不是最开始传入的父rdd了,而是rddWithPartitionIds。那是不是代表与传入的rdd的依赖就此断开了呢?其实不然,源码中可以看到,加工父rdd使用的是mapPartitionsInternalmapPartitionsWithIndexInternal这样的算子,和分析窄依赖时一样,这些步骤会建立新的rdd,而且与它们的父rdd建立窄依赖关系。这样下来,依赖链条到rddWithPartitionIds其实也是没有断开的,只是较原始传入的rdd,中间多了几个新rdd和一些窄依赖关系。

目前为止ShuffleDependency准备好了,回到ShuffleExchangeExec#doExecute(),进入ShuffleExchangeExec#preparePostShuffleRDD

private[exchange] def preparePostShuffleRDD(
      shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
      specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = {
    // If an array of partition start indices is provided, we need to use this array
    // to create the ShuffledRowRDD. Also, we need to update newPartitioning to
    // update the number of post-shuffle partitions.
    specifiedPartitionStartIndices.foreach { indices =>
      assert(newPartitioning.isInstanceOf[HashPartitioning])
      newPartitioning = UnknownPartitioning(indices.length)
    }
    new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
  }

这里果然产生了新的ShuffledRowRDD,而且将之前得到的shuffleDependency传入了构造。依赖建立了起来。

题外记录:对于一个RDD来说,dependencies_ 属性&getDependencies方法是记录和获得依赖关系的地方。

收工!!!

相关文章

  • SparkSQL RDD的依赖是如何建立的

    RDD的依赖指的就是spark中的org.apache.spark.Dependency,Dependency有两...

  • RDD、DataFrame、Dataset区别

    RDD: 1、RDD一般和spark mlib同时使用 2、RDD不支持sparksql操作 DataFrame:...

  • SparkSQL、RDD和DataFrame基本操作

    1 三者比较 易操作程度 SparkSQL > DataFrame > RDD 2 创建RDD、DataFrame...

  • 宽依赖和窄依赖

    在设计RDD的接口时,一个有意思的问题是如何表现RDD之间的依赖。在RDD中将依赖划分成了两种类型:窄依赖(nar...

  • SparkSql之编程方式

    什么是SparkSql? SparkSql作用主要用于用于处理结构化数据,底层就是将SQL语句转成RDD执行 Sp...

  • RDD的依赖关系:宽依赖和窄依赖

    RDD之间的依赖关系是指RDD之间的上下来源关系,RDD2依赖RDD1,RDD4依赖于RDD2和RDD3,RDD7...

  • Spark之RDD强化学习

    一、RDD依赖关系 1、RDD的依赖关系分为窄依赖和宽依赖;2、窄依赖是说父RDD的每一个分区最多被一个子RDD的...

  • Spark Core2--LineAge

    Lineage RDD Lineage(又称为RDD运算图或RDD依赖关系图)是RDD所有父RDD的graph(图...

  • rdd的依赖

    例子一: 一个简单的例子,看一下mappartitonrdd是怎么形成依赖的 例子二: 接下来我们看一下join算...

  • RDD 的宽依赖和窄依赖

    1. RDD 间的依赖关系 RDD和它依赖的父 RDD(s)的关系有两种不同的类型,即窄依赖(narrow dep...

网友评论

      本文标题:SparkSQL RDD的依赖是如何建立的

      本文链接:https://www.haomeiwen.com/subject/cudfrltx.html