美文网首页Spark源码精读分析计划
Spark Core源码精读计划#19:RDD的依赖与分区逻辑

Spark Core源码精读计划#19:RDD的依赖与分区逻辑

作者: LittleMagic | 来源:发表于2019-06-13 22:39 被阅读19次

    目录

    前言

    按照计划,本文来讲解RDD的依赖与分区器。这两者不仅与之后调度系统的细节(DAG、Shuffle等)息息相关,而且也是面试Spark系大数据研发工程师时经常被问到的基础问题(反正我是会问的),因此看官也可以将本文当做一篇面试知识点解析来看。

    RDD依赖

    Dependency抽象类及子类

    在Spark Core中,RDD依赖关系的基类就是Dependency抽象类。它的定义只有一句话。

    代码#19.1 - o.a.s.Dependency抽象类

    @DeveloperApi
    abstract class Dependency[T] extends Serializable {
      def rdd: RDD[T]
    }
    

    该类中只定义了一个方法rdd(),它用于取得当前RDD依赖的父RDD。Dependency及其子类的类图如下所示。

    图#19.1 - Dependency继承体系

    我们已经知道,RDD依赖分为窄依赖和宽依赖(Shuffle依赖)两种,下面分别来看。

    窄依赖

    所谓窄依赖,是指父RDD的每个分区都仅被子RDD的一个分区所依赖,也就是说子RDD的一个分区固定对应一个父RDD的单个分区。窄依赖在代码中的基类是NarrowDependency抽象类。

    代码#19.2 - o.a.s.NarrowDependency抽象类

    @DeveloperApi
    abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
      def getParents(partitionId: Int): Seq[Int]
    
      override def rdd: RDD[T] = _rdd
    }
    

    可见,NarrowDependency类带有一个构造方法参数_rdd,并重写rdd()方法让其返回之,它就是当前RDD依赖的父RDD。另外,它还定义了一个抽象方法getParents(),用来返回partitionId对应分区依赖的所有父RDD的分区ID。该方法由NarrowDependency的子类实现,分别为OneToOneDependency(一对一依赖)和RangeDependency(范围依赖),它们的代码都比较简单。

    代码#19.3 - o.a.s.OneToOneDependency与RangeDependency类

    @DeveloperApi
    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
      override def getParents(partitionId: Int): List[Int] = List(partitionId)
    }
    
    @DeveloperApi
    class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
      extends NarrowDependency[T](rdd) {
    
      override def getParents(partitionId: Int): List[Int] = {
        if (partitionId >= outStart && partitionId < outStart + length) {
          List(partitionId - outStart + inStart)
        } else {
          Nil
        }
      }
    }
    

    可见,它们返回的都是只有一个元素的List,且OneToOneDependency的父子RDD分区ID严格相同,常见的map()或filter()等算子都会产生OneToOneDependency。而在RangeDependency中,子RDD中ID为partitionId的分区与父RDD中ID为(partitionId - outStart + inStart)的分区一一对应,其中inStart为父RDD分区ID的起始值,outStart为子RDD分区ID的起始值,例如union()算子就会产生这种依赖。

    上面讲的两种情况都是一一对应关系。当子RDD分区对应多个父RDD的分区(如join()算子)时,也可以形成窄依赖。其前提是父子RDD的分区规则完全相同,即子RDD的某个分区p对应父RDD 1的分区p,也对应父RDD 2的分区p。如果分区规则不同,就会变成宽依赖。

    下面的图来自网络,原始出处已不可考,比较形象地说明了窄依赖。

    图#19.2 - 窄依赖的三种情况

    宽依赖

    严格来讲,它的名字应该叫“Shuffle依赖”,因为在Spark代码中,它的类名是ShuffleDependency。不过在中文圈子里,“宽依赖”这个名字也同样通用。它就是指子RDD的一个分区会对应一个父RDD的多个分区,并且往往是全部分区。ShuffleDependency类的代码如下。

    代码#19.4 - o.a.s.ShuffleDependency类

    @DeveloperApi
    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
        @transient private val _rdd: RDD[_ <: Product2[K, V]],
        val partitioner: Partitioner,
        val serializer: Serializer = SparkEnv.get.serializer,
        val keyOrdering: Option[Ordering[K]] = None,
        val aggregator: Option[Aggregator[K, V, C]] = None,
        val mapSideCombine: Boolean = false)
      extends Dependency[Product2[K, V]] {
      override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
    
      private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
      private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
      private[spark] val combinerClassName: Option[String] =
        Option(reflect.classTag[C]).map(_.runtimeClass.getName)
    
      val shuffleId: Int = _rdd.context.newShuffleId()
    
      val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
        shuffleId, _rdd.partitions.length, this)
    
      _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
    }
    

    由于Shuffle是调度系统的重要组成部分,因此我们这里只限于读ShuffleDependency的源码,做一些基本介绍,大头留到后面去说。

    ShuffleDependency类有3个泛型参数,K代表键类型,V代表值类型,而C则代表Combiner的类型。如果看官有Hadoop MapReduce的基础,对Combiner这个词自然不会陌生。由于Shuffle过程对键值型数据才有意义,因此ShuffleDependency对父RDD的泛型类型有限制,必须是Product2[K,V]或者其子类,Product2在Scala中代表两个元素的笛卡尔积。

    其他构造方法参数说明如下:

    • partitioner:分区器,下面马上就会讲到。
    • serializer:闭包序列化器,SparkEnv中已经创建,为JavaSerializer。
    • keyOrdering:可选的对键类型K排序的排序规则。
    • aggregator:可选的Map端数据聚合逻辑。
    • mapSideCombine:指定是否启用Map数据预聚合。

    随着宽依赖的创建,还会调用SparkContext.newShuffleId()方法分配一个新的Shuffle ID,以及调用ShuffleManager.registerShuffle方法注册该Shuffle,返回Shuffle句柄(ShuffleHandle)。虽然在很久之前的几篇文章中讲过Shuffle相关的话题,但是在真正讲到调度系统时,还是会继续深挖的。

    下面的图形象地说明了宽依赖。

    图#19.3 - 宽依赖的两种情况

    RDD分区器

    Partitioner抽象类与伴生对象

    在上一篇文章讲RDD时,Partitioner就已经出现了,并且它在上一节的ShuffleDependency代码中也是作为构造参数出现。在Shuffle过程中,必须得有确定的计算逻辑来决定父RDD的分区数据如何分配并对应到子RDD的分区中,这就是分区器Partitioner的职责。

    Partitioner抽象类的定义也很简单。

    代码#19.5 - o.a.s.Partitioner抽象类

    abstract class Partitioner extends Serializable {
      def numPartitions: Int
      def getPartition(key: Any): Int
    }
    

    numPartitions()方法返回分区总数,而getPartitions()方法根据键返回其将被映射到的分区ID。

    Partitioner还带有一个伴生对象,其中定义了defaultPartitioner()方法,顾名思义,它(根据上游的一个或一些RDD)返回默认的分区逻辑,其代码如下。

    代码#19.6 - o.a.s.Partitioner.defaultPartitioner()/isEligiblePartitioner()方法

      def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
        val rdds = (Seq(rdd) ++ others)
        val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
        val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
          Some(hasPartitioner.maxBy(_.partitions.length))
        } else {
          None
        }
    
        val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
          rdd.context.defaultParallelism
        } else {
          rdds.map(_.partitions.length).max
        }
    
        if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
            defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
          hasMaxPartitioner.get.partitioner.get
        } else {
          new HashPartitioner(defaultNumPartitions)
        }
      }
    
      private def isEligiblePartitioner(
         hasMaxPartitioner: RDD[_],
         rdds: Seq[RDD[_]]): Boolean = {
        val maxPartitions = rdds.map(_.partitions.length).max
        log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
      }
    

    该方法会从输入的所有RDD中取出那些定义了分区逻辑的RDD,然后找到其中分区数最大的那个Partitioner。如果SparkConf中定义了缺省并行度配置项,即spark.default.parallelism,那么默认分区器的分区数就会采用该参数的值,否则就直接用所有RDD中最大的分区数(这就是为什么几乎总是推荐在提交Spark作业时设定spark.default.parallelism)。

    然后,调用isElegiblePartitioner()方法,判断分区数最大的那个Partitioner是否“合格”,判断逻辑是其分区数与所有上游RDD中最大分区数之差小于一个数量级。如果通过检查,并且默认分区数比它小,就采用分区数最大的那个Partitioner作为分区逻辑,否则用默认分区数构造一个新的HashPartitioner并返回。

    Partitioner在Spark Core中的实现类主要有两个:基于散列的HashPartitioner和基于采样范围的RangePartitioner,前者是默认实现。下面我们就以HashPartitioner为例来看看其具体实现(RangePartitioner太麻烦了)。

    HashPartitioner

    代码#19.7 - o.a.s.HashPartitioner类

    class HashPartitioner(partitions: Int) extends Partitioner {
      require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
    
      def numPartitions: Int = partitions
    
      def getPartition(key: Any): Int = key match {
        case null => 0
        case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
      }
    
      override def equals(other: Any): Boolean = other match {
        case h: HashPartitioner =>
          h.numPartitions == numPartitions
        case _ =>
          false
      }
    
      override def hashCode: Int = numPartitions
    }
    

    可见,在重写的getPartition()方法中,会取得键的hashCode值,对分区数numPartitions取模,返回其绝对值,这样就确保映射到的分区落在[0,numPartitions - 1]的区间内。为了判断两个HashPartitioner是否相等,也必须同时重写其equals()和hashCode()方法,判断标准自然就只有分区数了。

    我们也可以很容易地想到,用户通过自己继承Partitioner类,能够自定义分区逻辑。下面就是一个简单的示例,它通过Key的长度来分区。由于它不属于Spark源码,就不编号了。

    class KeyLengthPartitioner(partitions: Int) extends Partitioner {
      def numPartitions: Int = partitions
    
      def getPartition(key: Any): Int = {
        return key.asInstanceOf[String].length() & (partitions - 1)
      }
    }
    

    总结

    本文分别以Spark Core中的Dependency与Partitioner两个抽象类为起点,比较详细地讲解了Spark中RDD依赖关系与分区逻辑的具体设计。依赖与分区是RDD五要素中最重要的两个点,在今后的源码阅读过程中,会经常用到它们。

    相关文章

      网友评论

        本文标题:Spark Core源码精读计划#19:RDD的依赖与分区逻辑

        本文链接:https://www.haomeiwen.com/subject/ijyefctx.html