RDD的依赖指的就是spark中的org.apache.spark.Dependency
,Dependency有两个子类,NarrowDependency
和 ShuffleDependency
,也就是我们常说的窄依赖和宽依赖。
窄依赖指父RDD的每一个partition最多被子RDD的一个partition使用
宽依赖指父RDD的一个partition会被子RDD的多个partition使用
Dependency继承关系图下面通过一条sparksql来解释宽窄依赖是如何建立的。
select
i_item_sk
,count(*) as cnt
from
tpcds_text_10.item
where
i_item_sk>10
group by
i_item_sk
它的物理执行计划如下:
== Physical Plan ==
*(2) HashAggregate(keys=[i_item_sk#61L], functions=[count(1)], output=[i_item_sk#61L, cnt#33L])
+- Exchange(coordinator id: 991724420) hashpartitioning(i_item_sk#61L, 200), coordinator[target post-shuffle partition size: 67108864]
+- *(1) HashAggregate(keys=[i_item_sk#61L], functions=[partial_count(1)], output=[i_item_sk#61L, count#84L])
+- *(1) Filter (isnotnull(i_item_sk#61L) && (i_item_sk#61L > 10))
+- HiveTableScan [i_item_sk#61L], HiveTableRelation `tpcds_text_10`.`item`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [i_item_sk#61L, i_item_id#62, i_rec_start_date#63, i_rec_end_date#64, i_item_desc#65, i_current_price#66, i_wholesale_cost#67]
-
窄依赖
从上面执行计划中看到打印的Filter节点,它对应的是org.apache.spark.sql.execution.FilterExec
。
看到其doExecute()
方法:
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val predicate = newPredicate(condition, child.output)
predicate.initialize(0)
iter.filter { row =>
val r = predicate.eval(row)
if (r) numOutputRows += 1
r
}
}
}
不复杂,重点走进mapPartitionsWithIndexInternal
。
这个mapPartitionsWithIndexInternal
是org.apache.spark.rdd.RDD
的一个方法:
private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
preservesPartitioning)
}
看到代码里有一个new MapPartitionsRDD
的操作。
先来了解一下MapPartitionsRDD
的构造方法:
上图中 1 处的prev, 就是FilterExec#doExecute()
方法中child.execute()
产生的RDD,也就是父RDD(计划树是从下往上执行的,子节点产生的RDD是父RDD)。
图中 2 处我们再进一步看一下,是RDD
的构造方法:org.apache.spark.rdd.RDD#this
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
关于org.apache.spark.rdd.RDD
的构造也在下面贴个图:
上下对照,一目了然,这里传入了一个
OneToOneDependency
作为构造参数。综上所述,新new的
MapPartitionsRDD
与其父RDD构建了一个OneToOneDependency
依赖。
-
宽依赖
回到文章初的那个物理执行计划,Exchange节点对应的就是org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
。
同样, 进入doExecute()
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
if (cachedShuffleRDD == null) {
cachedShuffleRDD = coordinator match {
case Some(exchangeCoordinator) =>
val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
shuffleRDD
case _ =>
val shuffleDependency = prepareShuffleDependency() /关注这里
preparePostShuffleRDD(shuffleDependency) /关注这里
}
}
cachedShuffleRDD
}
我们先不关心有coordinator的情况,看到代码中的prepareShuffleDependency
:
private[exchange] def prepareShuffleDependency()
: ShuffleDependency[Int, InternalRow, InternalRow] = {
ShuffleExchangeExec.prepareShuffleDependency(
child.execute(), / 父RDD在这里被传入
child.output, newPartitioning, serializer)
}
代码很容易,这里进入了ShuffleExchangeExec.prepareShuffleDependency
的另一个重载方法:
def prepareShuffleDependency(
rdd: RDD[InternalRow],
outputAttributes: Seq[Attribute],
newPartitioning: Partitioning,
serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow] = {
/1. 构建对应的 Partitioner
val part: Partitioner = newPartitioning match {
case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
......
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
}
/2. 这一段持续对传入的原有rdd做转化,得到新的rdd:rddWithPartitionIds
val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = {
val newRdd = if (isRoundRobin && SQLConf.get.sortBeforeRepartition) {
rdd.mapPartitionsInternal { iter =>
......
sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
}
} else {
rdd
}
val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition
if (needToCopyObjectsBeforeShuffle(part)) {
newRdd.mapPartitionsWithIndexInternal((_, iter) => {
......
}, isOrderSensitive = isOrderSensitive)
} else {
newRdd.mapPartitionsWithIndexInternal((_, iter) => {
......
}, isOrderSensitive = isOrderSensitive)
}
}
}
/3.生成ShuffleDependency
val dependency =
new ShuffleDependency[Int, InternalRow, InternalRow](
rddWithPartitionIds,
new PartitionIdPassthrough(part.numPartitions),
serializer)
dependency
}
可以看到,最后构造了一个ShuffleDependency
。
而且用于构造ShuffleDependency
的rdd已经不是最开始传入的父rdd了,而是rddWithPartitionIds
。那是不是代表与传入的rdd的依赖就此断开了呢?其实不然,源码中可以看到,加工父rdd使用的是mapPartitionsInternal
和mapPartitionsWithIndexInternal
这样的算子,和分析窄依赖时一样,这些步骤会建立新的rdd,而且与它们的父rdd建立窄依赖关系。这样下来,依赖链条到rddWithPartitionIds
其实也是没有断开的,只是较原始传入的rdd,中间多了几个新rdd和一些窄依赖关系。
目前为止ShuffleDependency
准备好了,回到ShuffleExchangeExec#doExecute()
,进入ShuffleExchangeExec#preparePostShuffleRDD
private[exchange] def preparePostShuffleRDD(
shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = {
// If an array of partition start indices is provided, we need to use this array
// to create the ShuffledRowRDD. Also, we need to update newPartitioning to
// update the number of post-shuffle partitions.
specifiedPartitionStartIndices.foreach { indices =>
assert(newPartitioning.isInstanceOf[HashPartitioning])
newPartitioning = UnknownPartitioning(indices.length)
}
new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
}
这里果然产生了新的ShuffledRowRDD
,而且将之前得到的shuffleDependency传入了构造。依赖建立了起来。
题外记录:对于一个RDD
来说,dependencies_ 属性
&getDependencies方法
是记录和获得依赖关系的地方。
收工!!!
网友评论