美文网首页
SparkSQL RDD的依赖是如何建立的

SparkSQL RDD的依赖是如何建立的

作者: 嘻嘻是小猪 | 来源:发表于2021-04-23 15:12 被阅读0次

    RDD的依赖指的就是spark中的org.apache.spark.Dependency,Dependency有两个子类,NarrowDependencyShuffleDependency,也就是我们常说的窄依赖和宽依赖。

    窄依赖指父RDD的每一个partition最多被子RDD的一个partition使用
    宽依赖指父RDD的一个partition会被子RDD的多个partition使用
    Dependency继承关系图

    下面通过一条sparksql来解释宽窄依赖是如何建立的。

    select
        i_item_sk
        ,count(*) as cnt
    from
        tpcds_text_10.item
    where
        i_item_sk>10
    group by
        i_item_sk
    

    它的物理执行计划如下:

    == Physical Plan ==
    *(2) HashAggregate(keys=[i_item_sk#61L], functions=[count(1)], output=[i_item_sk#61L, cnt#33L])
    +- Exchange(coordinator id: 991724420) hashpartitioning(i_item_sk#61L, 200), coordinator[target post-shuffle partition size: 67108864]
       +- *(1) HashAggregate(keys=[i_item_sk#61L], functions=[partial_count(1)], output=[i_item_sk#61L, count#84L])
          +- *(1) Filter (isnotnull(i_item_sk#61L) && (i_item_sk#61L > 10))
             +- HiveTableScan [i_item_sk#61L], HiveTableRelation `tpcds_text_10`.`item`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [i_item_sk#61L, i_item_id#62, i_rec_start_date#63, i_rec_end_date#64, i_item_desc#65, i_current_price#66, i_wholesale_cost#67]
    
    • 窄依赖
      从上面执行计划中看到打印的Filter节点,它对应的是org.apache.spark.sql.execution.FilterExec
      看到其doExecute()方法:
    protected override def doExecute(): RDD[InternalRow] = {
        val numOutputRows = longMetric("numOutputRows")
        child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
          val predicate = newPredicate(condition, child.output)
          predicate.initialize(0)
          iter.filter { row =>
            val r = predicate.eval(row)
            if (r) numOutputRows += 1
            r
          }
        }
      }
    

    不复杂,重点走进mapPartitionsWithIndexInternal
    这个mapPartitionsWithIndexInternalorg.apache.spark.rdd.RDD的一个方法:

    private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
          f: (Int, Iterator[T]) => Iterator[U],
          preservesPartitioning: Boolean = false): RDD[U] = withScope {
        new MapPartitionsRDD(
          this,
          (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
          preservesPartitioning)
      }
    

    看到代码里有一个new MapPartitionsRDD的操作。
    先来了解一下MapPartitionsRDD的构造方法:

    MapPartitionsRDD的构造

    上图中 1 处的prev, 就是FilterExec#doExecute()方法中child.execute()产生的RDD,也就是父RDD(计划树是从下往上执行的,子节点产生的RDD是父RDD)。
    图中 2 处我们再进一步看一下,是RDD的构造方法:org.apache.spark.rdd.RDD#this

     /** Construct an RDD with just a one-to-one dependency on one parent */
      def this(@transient oneParent: RDD[_]) =
        this(oneParent.context, List(new OneToOneDependency(oneParent)))
    

    关于org.apache.spark.rdd.RDD的构造也在下面贴个图:

    image.png
    上下对照,一目了然,这里传入了一个OneToOneDependency作为构造参数。
    综上所述,新new的MapPartitionsRDD与其父RDD构建了一个OneToOneDependency依赖。
    • 宽依赖
      回到文章初的那个物理执行计划,Exchange节点对应的就是org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
      同样, 进入doExecute()
    protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
        // Returns the same ShuffleRowRDD if this plan is used by multiple plans.
        if (cachedShuffleRDD == null) {
          cachedShuffleRDD = coordinator match {
            case Some(exchangeCoordinator) =>
              val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
              assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
              shuffleRDD
            case _ =>
              val shuffleDependency = prepareShuffleDependency() /关注这里
              preparePostShuffleRDD(shuffleDependency) /关注这里
          }
        }
        cachedShuffleRDD
      }
    

    我们先不关心有coordinator的情况,看到代码中的prepareShuffleDependency:

    private[exchange] def prepareShuffleDependency()
        : ShuffleDependency[Int, InternalRow, InternalRow] = {
        ShuffleExchangeExec.prepareShuffleDependency(
          child.execute(), / 父RDD在这里被传入 
          child.output, newPartitioning, serializer)
      }
    

    代码很容易,这里进入了ShuffleExchangeExec.prepareShuffleDependency的另一个重载方法:

     def prepareShuffleDependency(
          rdd: RDD[InternalRow],
          outputAttributes: Seq[Attribute],
          newPartitioning: Partitioning,
          serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow] = {
          
          /1. 构建对应的 Partitioner 
          val part: Partitioner = newPartitioning match {
            case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
            ......
            case _ => sys.error(s"Exchange not implemented for $newPartitioning")
        }
       
         /2. 这一段持续对传入的原有rdd做转化,得到新的rdd:rddWithPartitionIds
        val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = {
         
          val newRdd = if (isRoundRobin && SQLConf.get.sortBeforeRepartition) {
            rdd.mapPartitionsInternal { iter =>
              ......
              sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
            }
          } else {
            rdd
          }
    
          val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition
          if (needToCopyObjectsBeforeShuffle(part)) {
            newRdd.mapPartitionsWithIndexInternal((_, iter) => {
            ......
        }, isOrderSensitive = isOrderSensitive)
          } else {
            newRdd.mapPartitionsWithIndexInternal((_, iter) => {
            ......  
            }, isOrderSensitive = isOrderSensitive)
          }
        }
       }    
        /3.生成ShuffleDependency
         val dependency =
          new ShuffleDependency[Int, InternalRow, InternalRow](
            rddWithPartitionIds,
            new PartitionIdPassthrough(part.numPartitions),
            serializer)
    
        dependency
    }
    

    可以看到,最后构造了一个ShuffleDependency
    而且用于构造ShuffleDependency的rdd已经不是最开始传入的父rdd了,而是rddWithPartitionIds。那是不是代表与传入的rdd的依赖就此断开了呢?其实不然,源码中可以看到,加工父rdd使用的是mapPartitionsInternalmapPartitionsWithIndexInternal这样的算子,和分析窄依赖时一样,这些步骤会建立新的rdd,而且与它们的父rdd建立窄依赖关系。这样下来,依赖链条到rddWithPartitionIds其实也是没有断开的,只是较原始传入的rdd,中间多了几个新rdd和一些窄依赖关系。

    目前为止ShuffleDependency准备好了,回到ShuffleExchangeExec#doExecute(),进入ShuffleExchangeExec#preparePostShuffleRDD

    private[exchange] def preparePostShuffleRDD(
          shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
          specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = {
        // If an array of partition start indices is provided, we need to use this array
        // to create the ShuffledRowRDD. Also, we need to update newPartitioning to
        // update the number of post-shuffle partitions.
        specifiedPartitionStartIndices.foreach { indices =>
          assert(newPartitioning.isInstanceOf[HashPartitioning])
          newPartitioning = UnknownPartitioning(indices.length)
        }
        new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
      }
    

    这里果然产生了新的ShuffledRowRDD,而且将之前得到的shuffleDependency传入了构造。依赖建立了起来。

    题外记录:对于一个RDD来说,dependencies_ 属性&getDependencies方法是记录和获得依赖关系的地方。

    收工!!!

    相关文章

      网友评论

          本文标题:SparkSQL RDD的依赖是如何建立的

          本文链接:https://www.haomeiwen.com/subject/cudfrltx.html