美文网首页Spark源码精读分析计划SPARK
Spark Core源码精读计划#18:与RDD的重逢

Spark Core源码精读计划#18:与RDD的重逢

作者: LittleMagic | 来源:发表于2019-06-12 21:55 被阅读5次

    目录

    前言

    在前面的17篇文章中,我们对以SparkContext和SparkEnv为中心展开的Spark Core底层支撑组件有了比较深入的理解,当然有一些重要的组件,会随着整个系列的进行详细讲解到。按照计划,我们本应开始看Spark的存储系统结构,但是不着急,我们先花2~3篇文章的时间来重新认识一下我们的老朋友——RDD。它不仅与存储息息相关,也是Spark任务调度和计算的主要对象,现在打好基础是非常有益的。

    RDD的正式名称为弹性分布式数据集(Resilient Distributed Dataset),Spark官方文档中对它的定义是:可以并行操作的、容错的元素集合。实际上,除了可并行操作、容错两点之外,RDD还具有一些其他相关的特点,如:

    • 不可变性(只能生成或转换,不能直接修改,容错时可以重算);
    • 分区性(内部数据会划分为Partition,是分布式并行的基础);
    • 名称中的“弹性”(可以灵活利用内存和外存,Spark设计思想的体现)。

    RDD在Spark Core源码中的基础是o.a.s.rdd.RDD这个抽象类,本文就来对它做一些基础的了解。

    RDD抽象类概述

    构造方法与成员属性

    代码#18.1 - o.a.s.rdd.RDD类的构造方法与成员属性

    abstract class RDD[T: ClassTag](
        @transient private var _sc: SparkContext,
        @transient private var deps: Seq[Dependency[_]]
      ) extends Serializable with Logging {
      if (classOf[RDD[_]].isAssignableFrom(elementClassTag.runtimeClass)) {
        logWarning("Spark does not support nested RDDs (see SPARK-5063)")
      }
    
      def this(@transient oneParent: RDD[_]) =
        this(oneParent.context, List(new OneToOneDependency(oneParent)))
    
      @transient val partitioner: Option[Partitioner] = None
    
      val id: Int = sc.newRddId()
      @transient var name: String = _
    
      private var storageLevel: StorageLevel = StorageLevel.NONE
      private var dependencies_ : Seq[Dependency[_]] = _
      @transient private var partitions_ : Array[Partition] = _
    
      @transient private[spark] val creationSite = sc.getCallSite()
    
      @transient private[spark] val scope: Option[RDDOperationScope] = {
        Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
      }
    
      private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
      private val checkpointAllMarkedAncestors =
        Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)).exists(_.toBoolean)
      @transient private var doCheckpointCalled = false
    
      // ...
    }
    

    RDD类接收两个主构造方法参数:

    • _sc:即SparkContext实例,它不会被序列化。
    • deps:Dependency的序列,它也不会被序列化。所谓Dependency,就是指当前RDD对其他RDD的依赖关系,后面会讲到Dependency相关的知识。

    在构造方法中会检查RDD是否被嵌套了,Spark不支持RDD嵌套,会打印警告信息。另外,还有一个辅助构造方法,它只接收一个RDD oneParent作为参数,此时会使用oneParent对应的SparkContext和一对一依赖OneToOneDependency来构造RDD。

    RDD类的主要成员属性如下。

    • partitioner:键值型RDD(即RDD[(K,V)])的分区逻辑,是Partitioner的子类,后面也会讲到与Partitioner相关的细节。
    • id:该RDD的ID,可以调用SparkContext.newRddId()方法产生。
    • name:RDD的可读名称。
    • storageLevel:RDD的持久化等级,一共有12个等级。它由StorageLevel类及其伴生对象定义。
    • dependencies_:RDD的依赖,与构造参数deps相同,但是可以序列化,并且会考虑当前RDD是否被Checkpoint。
    • partitions_:包含RDD的所有分区的数组。
    • creationSite:创建这个RDD的调用代码位置,通过SparkContext.getCallSite()方法获得。关于CallSite的简介可以参见文章#3。
    • scope:RDD的操作域,由RDDOperationScope结构来描述。所谓操作域,其实就是一个确定的产生RDD的代码块,该代码块中的所有RDD就是在相同的操作域中。
    • checkpointData:保存的RDD检查点数据,方便出错时重算。
    • checkpointAllMarkedAncestors:布尔值,表示是否要对当前RDD的所有标记需要Checkpoint的父RDD保存检查点。
    • doCheckpointCalled:布尔值,表示是否已经保存过该RDD的检查点,防止重复保存。

    需要RDD子类实现的方法

    RDD类中醒目地标出了4个抽象方法,它们都很重要,RDD的子类必须要提供具体实现,如下所示。

    代码#18.2 - o.a.s.rdd.RDD类中的抽象方法

      @DeveloperApi
      def compute(split: Partition, context: TaskContext): Iterator[T]
    
      protected def getPartitions: Array[Partition]
    
      protected def getDependencies: Seq[Dependency[_]] = deps
    
      protected def getPreferredLocations(split: Partition): Seq[String] = Nil
    
    • compute():计算RDD的一个分区split内的数据,返回对应数据类型的迭代器。
    • getPartitions():取得RDD所有分区的数组。
    • getDependencies():取得RDD的所有依赖,默认返回的就是deps。
    • getPreferredLocations():取得计算分区split的偏好位置(如HDFS上块的位置)数组,这个是可选的。

    RDD类中对Partition、Dependency和Preferred Location都提供了简单的Getter方法,它们都会先检查当前RDD的检查点,然后调用上面的三个抽象方法,其代码如下所示。

    代码#18.3 - o.a.s.rdd.RDD.partitions()/dependencies()/preferredLocations()方法

      final def dependencies: Seq[Dependency[_]] = {
        checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
          if (dependencies_ == null) {
            dependencies_ = getDependencies
          }
          dependencies_
        }
      }
    
      final def partitions: Array[Partition] = {
        checkpointRDD.map(_.partitions).getOrElse {
          if (partitions_ == null) {
            partitions_ = getPartitions
            partitions_.zipWithIndex.foreach { case (partition, index) =>
              require(partition.index == index,
                s"partitions($index).partition == ${partition.index}, but it should equal $index")
            }
          }
          partitions_
        }
      }
    
      final def preferredLocations(split: Partition): Seq[String] = {
        checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
          getPreferredLocations(split)
        }
      }
    

    RDD的五要素

    通过以上的介绍,我们就可以归纳出RDD的五个组成要素了。这些内容在RDD类的ScalaDoc中其实已经有所体现:

    • 分区列表 [A list of partitions];
    • 计算每个分区的函数 [A function for computing each split];
    • 对其他RDD的依赖的列表 [A list of dependencies on other RDDs];
    • 可选的对键值型RDD的分区逻辑 [Optionally, a Partitioner for key-value RDDs];
    • 可选的计算分区的位置偏好列表 [Optionally, a list of preferred locations to compute each split on]。

    RDD继承体系与算子概述

    RDD的子类

    RDD拥有众多的子类,这些子类都实现了上面的4个方法。大多数对RDD的操作方法(也就是算子)返回的结果都是RDD子类的实例。主要的RDD子类如下图所示,没有箭头,看官将就一下吧。

    图#18.1 - RDD继承体系

    由于我们之后还有很多事情要做,不可能将RDD的所有细节都分析一遍,这里暂时就不展开讲每个RDD子类的实现了。

    我们已经知道,RDD的算子有两类,即转换(Transformation)算子与动作(Action)算子,这是老生常谈了。

    转换算子

    转换算子用于对一个RDD施加一系列逻辑,使之变成另一个RDD。在文章#0的WordCount程序中出现的flatMap()、map()、reduceByKey()都是转换算子。作为示例,我们来看看日常工作中极其常见、并且效率较高的mapPartitions()算子。

    代码#18.4 - o.a.s.rdd.RDD.mapPartitions()方法

      def mapPartitions[U: ClassTag](
          f: Iterator[T] => Iterator[U],
          preservesPartitioning: Boolean = false): RDD[U] = withScope {
        val cleanedF = sc.clean(f)
        new MapPartitionsRDD(
          this,
          (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
          preservesPartitioning)
      }
    

    这个算子对RDD[T]每个分区的迭代器施加函数f的转换逻辑,返回一个MapPartitionsRDD[U],参数preservesPartitioning表示是否保留父RDD的分区。MapPartitionsRDD的具体实现如下。可以发现,getPartitions()和partitioner都直接复用了父RDD的,而compute()方法则是直接应用函数f的逻辑。

    代码#18.5 - o.a.s.rdd.MapPartitionsRDD类

    private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
        var prev: RDD[T],
        f: (TaskContext, Int, Iterator[T]) => Iterator[U], 
        preservesPartitioning: Boolean = false,
        isOrderSensitive: Boolean = false)
      extends RDD[U](prev) {
      override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
    
      override def getPartitions: Array[Partition] = firstParent[T].partitions
    
      override def compute(split: Partition, context: TaskContext): Iterator[U] =
        f(context, split.index, firstParent[T].iterator(split, context))
    
      override def clearDependencies() {
        super.clearDependencies()
        prev = null
      }
    
      // ...
    }
    

    再举个例子,coalesce()算子可以将一个RDD重新分区,也是常用的转换算子之一。它最终会产生CoalescedRDD,如果中途发生Shuffle的话,也有可能会产生ShuffledRDD。关于它的实现,之前在一篇小文《解决Spark Streaming写入HDFS的小文件问题》中已经讲过,不再赘述。

    动作算子

    动作算子用于触发Job的提交,真正执行RDD转换逻辑的计算,并返回其处理结果。以代码#0.1中用到的collect()以及常用的foreach()为例。

    代码#18.6 - o.a.s.rdd.RDD.collect()/foreach()方法

      def collect(): Array[T] = withScope {
        val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
        Array.concat(results: _*)
      }
    
      def foreach(f: T => Unit): Unit = withScope {
        val cleanF = sc.clean(f)
        sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
      }
    

    代码很简单,需要注意,它们都调用了SparkContext.runJob()方法来提交一个Job。这个方法比较重要,待到之后研究Spark Core调度逻辑时,它可以称得上是一切的起点。

    总结

    本文通过阅读与RDD类相关的一些基础源码,复习了RDD的基本知识,另外又对RDD的子类与算子有了大致的了解。下一篇文章会专注于两个要点:Dependency与Partitioner,即RDD的依赖与分区逻辑。

    相关文章

      网友评论

        本文标题:Spark Core源码精读计划#18:与RDD的重逢

        本文链接:https://www.haomeiwen.com/subject/anirfctx.html