目录
前言
按照计划,本文来讲解RDD的依赖与分区器。这两者不仅与之后调度系统的细节(DAG、Shuffle等)息息相关,而且也是面试Spark系大数据研发工程师时经常被问到的基础问题(反正我是会问的),因此看官也可以将本文当做一篇面试知识点解析来看。
RDD依赖
Dependency抽象类及子类
在Spark Core中,RDD依赖关系的基类就是Dependency抽象类。它的定义只有一句话。
代码#19.1 - o.a.s.Dependency抽象类
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
该类中只定义了一个方法rdd(),它用于取得当前RDD依赖的父RDD。Dependency及其子类的类图如下所示。
图#19.1 - Dependency继承体系我们已经知道,RDD依赖分为窄依赖和宽依赖(Shuffle依赖)两种,下面分别来看。
窄依赖
所谓窄依赖,是指父RDD的每个分区都仅被子RDD的一个分区所依赖,也就是说子RDD的一个分区固定对应一个父RDD的单个分区。窄依赖在代码中的基类是NarrowDependency抽象类。
代码#19.2 - o.a.s.NarrowDependency抽象类
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
可见,NarrowDependency类带有一个构造方法参数_rdd,并重写rdd()方法让其返回之,它就是当前RDD依赖的父RDD。另外,它还定义了一个抽象方法getParents(),用来返回partitionId对应分区依赖的所有父RDD的分区ID。该方法由NarrowDependency的子类实现,分别为OneToOneDependency(一对一依赖)和RangeDependency(范围依赖),它们的代码都比较简单。
代码#19.3 - o.a.s.OneToOneDependency与RangeDependency类
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
可见,它们返回的都是只有一个元素的List,且OneToOneDependency的父子RDD分区ID严格相同,常见的map()或filter()等算子都会产生OneToOneDependency。而在RangeDependency中,子RDD中ID为partitionId的分区与父RDD中ID为(partitionId - outStart + inStart)的分区一一对应,其中inStart为父RDD分区ID的起始值,outStart为子RDD分区ID的起始值,例如union()算子就会产生这种依赖。
上面讲的两种情况都是一一对应关系。当子RDD分区对应多个父RDD的分区(如join()算子)时,也可以形成窄依赖。其前提是父子RDD的分区规则完全相同,即子RDD的某个分区p对应父RDD 1的分区p,也对应父RDD 2的分区p。如果分区规则不同,就会变成宽依赖。
下面的图来自网络,原始出处已不可考,比较形象地说明了窄依赖。
图#19.2 - 窄依赖的三种情况宽依赖
严格来讲,它的名字应该叫“Shuffle依赖”,因为在Spark代码中,它的类名是ShuffleDependency。不过在中文圈子里,“宽依赖”这个名字也同样通用。它就是指子RDD的一个分区会对应一个父RDD的多个分区,并且往往是全部分区。ShuffleDependency类的代码如下。
代码#19.4 - o.a.s.ShuffleDependency类
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
由于Shuffle是调度系统的重要组成部分,因此我们这里只限于读ShuffleDependency的源码,做一些基本介绍,大头留到后面去说。
ShuffleDependency类有3个泛型参数,K代表键类型,V代表值类型,而C则代表Combiner的类型。如果看官有Hadoop MapReduce的基础,对Combiner这个词自然不会陌生。由于Shuffle过程对键值型数据才有意义,因此ShuffleDependency对父RDD的泛型类型有限制,必须是Product2[K,V]或者其子类,Product2在Scala中代表两个元素的笛卡尔积。
其他构造方法参数说明如下:
- partitioner:分区器,下面马上就会讲到。
- serializer:闭包序列化器,SparkEnv中已经创建,为JavaSerializer。
- keyOrdering:可选的对键类型K排序的排序规则。
- aggregator:可选的Map端数据聚合逻辑。
- mapSideCombine:指定是否启用Map数据预聚合。
随着宽依赖的创建,还会调用SparkContext.newShuffleId()方法分配一个新的Shuffle ID,以及调用ShuffleManager.registerShuffle方法注册该Shuffle,返回Shuffle句柄(ShuffleHandle)。虽然在很久之前的几篇文章中讲过Shuffle相关的话题,但是在真正讲到调度系统时,还是会继续深挖的。
下面的图形象地说明了宽依赖。
图#19.3 - 宽依赖的两种情况RDD分区器
Partitioner抽象类与伴生对象
在上一篇文章讲RDD时,Partitioner就已经出现了,并且它在上一节的ShuffleDependency代码中也是作为构造参数出现。在Shuffle过程中,必须得有确定的计算逻辑来决定父RDD的分区数据如何分配并对应到子RDD的分区中,这就是分区器Partitioner的职责。
Partitioner抽象类的定义也很简单。
代码#19.5 - o.a.s.Partitioner抽象类
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
numPartitions()方法返回分区总数,而getPartitions()方法根据键返回其将被映射到的分区ID。
Partitioner还带有一个伴生对象,其中定义了defaultPartitioner()方法,顾名思义,它(根据上游的一个或一些RDD)返回默认的分区逻辑,其代码如下。
代码#19.6 - o.a.s.Partitioner.defaultPartitioner()/isEligiblePartitioner()方法
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
Some(hasPartitioner.maxBy(_.partitions.length))
} else {
None
}
val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
rdd.context.defaultParallelism
} else {
rdds.map(_.partitions.length).max
}
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
hasMaxPartitioner.get.partitioner.get
} else {
new HashPartitioner(defaultNumPartitions)
}
}
private def isEligiblePartitioner(
hasMaxPartitioner: RDD[_],
rdds: Seq[RDD[_]]): Boolean = {
val maxPartitions = rdds.map(_.partitions.length).max
log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
}
该方法会从输入的所有RDD中取出那些定义了分区逻辑的RDD,然后找到其中分区数最大的那个Partitioner。如果SparkConf中定义了缺省并行度配置项,即spark.default.parallelism,那么默认分区器的分区数就会采用该参数的值,否则就直接用所有RDD中最大的分区数(这就是为什么几乎总是推荐在提交Spark作业时设定spark.default.parallelism)。
然后,调用isElegiblePartitioner()方法,判断分区数最大的那个Partitioner是否“合格”,判断逻辑是其分区数与所有上游RDD中最大分区数之差小于一个数量级。如果通过检查,并且默认分区数比它小,就采用分区数最大的那个Partitioner作为分区逻辑,否则用默认分区数构造一个新的HashPartitioner并返回。
Partitioner在Spark Core中的实现类主要有两个:基于散列的HashPartitioner和基于采样范围的RangePartitioner,前者是默认实现。下面我们就以HashPartitioner为例来看看其具体实现(RangePartitioner太麻烦了)。
HashPartitioner
代码#19.7 - o.a.s.HashPartitioner类
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
可见,在重写的getPartition()方法中,会取得键的hashCode值,对分区数numPartitions取模,返回其绝对值,这样就确保映射到的分区落在[0,numPartitions - 1]的区间内。为了判断两个HashPartitioner是否相等,也必须同时重写其equals()和hashCode()方法,判断标准自然就只有分区数了。
我们也可以很容易地想到,用户通过自己继承Partitioner类,能够自定义分区逻辑。下面就是一个简单的示例,它通过Key的长度来分区。由于它不属于Spark源码,就不编号了。
class KeyLengthPartitioner(partitions: Int) extends Partitioner {
def numPartitions: Int = partitions
def getPartition(key: Any): Int = {
return key.asInstanceOf[String].length() & (partitions - 1)
}
}
总结
本文分别以Spark Core中的Dependency与Partitioner两个抽象类为起点,比较详细地讲解了Spark中RDD依赖关系与分区逻辑的具体设计。依赖与分区是RDD五要素中最重要的两个点,在今后的源码阅读过程中,会经常用到它们。
网友评论