美文网首页
Spark RDD的基本特征以及源码解析

Spark RDD的基本特征以及源码解析

作者: 幸福的小棉袄 | 来源:发表于2018-02-27 13:45 被阅读0次

    RDD是什么

    RDD(Resilient Distributed Datasets)可扩展的弹性分布式数据集,rdd是spark最基本的数据抽象,是整个spark生态的基石。rdd表示一个只读、分区且不变的数据集合。一个rdd可以有多个分区,所谓的分区就是表示可以把一个rdd上的数据划分成不同的几个相互独立的子集。显而易见的好处就是可以在rdd上并行化。

    RDD的属性

    因为RDD是一个抽象类,具体位置在org.apache.spark.rdd包下,同时我们可以发现该包下存在多种rdd。比如从hdfs上生成HadoopRDD;从Mysql上生成的JdbcRDD;以及从一个集合生成的ParallelCollectionRDD。总之,rdd具有多样性。下面将着重介绍rdd的六个重要属性:

    分区性

    所谓分区性,指得是rdd上的数据可以可以划分为几个独立的子集。显而易见带来的好处是并行化,使得基于rdd模型的执行时间明显缩短。比如存在两个rdd,分别是rdd1,rdd2,第一个rdd有10个分区,第二个rdd只有一个分区,假设两个rdd上的数据相同且计算资源充足。理论上第一个rdd执行的时间明显小于第二个rdd,因为第一个rdd可以10个分区并行执行。
    接下来,我们通过源码级别的看一下分区性在类RDD中的体现。在org.apache.spark.rdd.RDD类中,protected def getPartitions: Array[Partition] 表示获取rdd的分区,通过这个函数就可以把一个rdd上的数据进行分区并返回一个数组。由于Partition是一个trait,下面是Partition的源码

    package org.apache.spark
    
    /**
     * An identifier for a partition in an RDD.
     */
    trait Partition extends Serializable {
      /**
       * Get the partition's index within its parent RDD
       */
      def index: Int
    
      // A better default implementation of HashCode
      override def hashCode(): Int = index
    
      override def equals(other: Any): Boolean = super.equals(other)
    }
    
    

    为了阐述问题,我们使用HadoopRDD作为demon,HadoopRDD中包含了三个类,分别是
    HadoopPartition
    HadoopRDD
    HadoopRDD(object)
    其中HadoopPartition继承了Partition,源码如下

    private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: InputSplit)
      extends Partition {
    
      val inputSplit = new SerializableWritable[InputSplit](s)
    
      override def hashCode(): Int = 31 * (31 + rddId) + index
    
      override def equals(other: Any): Boolean = super.equals(other)
    
      /**
       * Get any environment variables that should be added to the users environment when running pipes
       * @return a Map with the environment variables and corresponding values, it could be empty
       */
      def getPipeEnvVars(): Map[String, String] = {
        val envVars: Map[String, String] = if (inputSplit.value.isInstanceOf[FileSplit]) {
          val is: FileSplit = inputSplit.value.asInstanceOf[FileSplit]
          // map_input_file is deprecated in favor of mapreduce_map_input_file but set both
          // since it's not removed yet
          Map("map_input_file" -> is.getPath().toString(),
            "mapreduce_map_input_file" -> is.getPath().toString())
        } else {
          Map()
        }
        envVars
      }
    }
    
    

    HadoopRDD在构建分区时,主要是借助hadoop的InputSplit类构建的,也就是说一个InputSplit是一个分区;HadoopRDD的本地性也是借助了InputSplit这个类的实现的,这个类会记录相应数据与ip之间的映射
    接下来,我们看一下HadoopRDD中的getPartition方法的实现。

    override def getPartitions: Array[Partition] = {
        val jobConf = getJobConf()
        // add the credentials here as this can be called before SparkContext initialized
        SparkHadoopUtil.get.addCredentials(jobConf)
        val inputFormat = getInputFormat(jobConf)
        val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
        val array = new Array[Partition](inputSplits.size)
        for (i <- 0 until inputSplits.size) {
          array(i) = new HadoopPartition(id, i, inputSplits(i))
        }
        array
      }
    

    可知,HadoopRDD在处理hdfs相关的数据时,主要是借助了Hadoop中已有的InputFormat、InputSplit类。数据在hdfs上存储的时候,每个split的大小已经确定了,所以HadoopRDD在划分分区的时候直接使用每个split上的数据作为一个独立的partition

    函数性

    该属性我们可以理解为每个都会有一个函数。换句话我们的逻辑就是由这些函数组成的;在这里我们强调一下,由于RDD是天生不可变性,父RDD正是通过该属性生成子RDD。接下来展示的将是该属性在源码级别的体现(仍然使用HadoopRdd作为说明),函数性主要是指RDD类中的
    def compute(split: Partition, context: TaskContext): Iterator[T]
    该函数输入是Partition,输出是一个迭代器。下面是HadoopRdd类中对该函数的具体实现(只展示了部分核心代码)

    override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
        val iter = new NextIterator[(K, V)] {
           Partition转化为HadoopPartition
          private val split = theSplit.asInstanceOf[HadoopPartition]
          logInfo("Input split: " + split.inputSplit)
          private val jobConf = getJobConf()
    
          private val inputMetrics = context.taskMetrics().inputMetrics
          private val existingBytesRead = inputMetrics.bytesRead
    
          // Sets the thread local variable for the file's name
          split.inputSplit.value match {
            case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString)
            case _ => InputFileNameHolder.unsetInputFileName()
          }
    
          // Find a function that will return the FileSystem bytes read by this thread. Do this before
          // creating RecordReader, because RecordReader's constructor might read some bytes
          private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
            case _: FileSplit | _: CombineFileSplit =>
              SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
            case _ => None
          }
    
          // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
          // If we do a coalesce, however, we are likely to compute multiple partitions in the same
          // task and in the same thread, in which case we need to avoid override values written by
          // previous partitions (SPARK-13071).
          private def updateBytesRead(): Unit = {
            getBytesReadCallback.foreach { getBytesRead =>
              inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
            }
          }
    
         通过InputSplit获取一个Reader
          private var reader: RecordReader[K, V] = null
          private val inputFormat = getInputFormat(jobConf)
          HadoopRDD.addLocalConfiguration(
            new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
            context.stageId, theSplit.index, context.attemptNumber, jobConf)
    
          reader =
            try {
              inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
            } catch {
              case e: IOException if ignoreCorruptFiles =>
                logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
                finished = true
                null
            }
          // Register an on-task-completion callback to close the input stream.
          context.addTaskCompletionListener{ context => closeIfNeeded() }
          private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
          private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
          
           重写Iterator中的getNext方法
          override def getNext(): (K, V) = {
            try {
              finished = !reader.next(key, value)
            } catch {
              case e: IOException if ignoreCorruptFiles =>
                logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
                finished = true
            }
            if (!finished) {
              inputMetrics.incRecordsRead(1)
            }
            if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
              updateBytesRead()
            }
            (key, value)
          }
        }
        new InterruptibleIterator[(K, V)](context, iter)
      }
    

    通过上面的源码,可以得出两个点:1.Partition转化为HadoopPartition,并且通过InputSplit创建RecordReader;2.重写Iterator中的两个方法,getNext和close,通过创建的reader读取分片的数据。

    依懒性

    首先介绍一下rdd依赖的概念:rdds通过transformation算子之间的操作,转化得到新的rdd,新的rdd包含了从其它rdd衍生的必需信息,我们称之为依赖。众所周知,rdd天生具有不变性,每次Transformation都会产生一个新的rdd,那么在这里会有一个问题。即为什么RDD设计为具有依赖性以及他的好处?我认为主要体现为以下两点:
    1.生具有容错性。很多书籍上都会介绍该特性,而spark正是利用了这个特性用做数据容灾。当一个rdd数据丢失时,由于spark保存了rdd之间的依赖关系,那么就会利用该性能重新生成丢失的rdd。
    2.调度性能更优。窄依赖可以以pipeline形式执行多条命令,同时多个之间可以并行化执行。
    rdd之间的依赖性主要分为以下两大类,分别是
    1.窄依赖:每一个parent rdd的partition最多被子rdd的一个partition使用。常见的窄依赖操作算子有map,filter,union等
    2.宽依赖:多个子rdd的partition会使用同一个parent的partition。常见的宽依赖算子有groupByKey,reduceByKey,join,distinct,repartition等等
    在这里很多人会考虑一个问题,“为什么会存在宽依赖和窄依赖?前面已经介绍了窄依赖可以使用pipeline,为什么不全部使用窄依赖?”。为了解释这个问题,我们以mr作为例子。很多人都认为spark性能优于hadoop的原因是内存计算,不需要落盘。其实不然,spark优于hadoop,不仅仅是内存计算,他的作业调度也要优于hadoop,同时spark也是需要落盘的。说到这里,我们也明白了,归根结底是因为spark计算的过程中也需要落盘,这也就是rdd宽依赖存在的根本了。
    下面将通过源码来介绍rdd的依赖关系。宽依赖和窄依赖都继承了Dependency类。

    /**
     * :: DeveloperApi ::
     * Base class for dependencies.
     */
    @DeveloperApi
    abstract class Dependency[T] extends Serializable {
      def rdd: RDD[T]
    }
    

    其中rdd就是依赖parent rdd。并且窄依赖又分为两种,分别是一对一(OneToOneDependency)和多对一(RangeDependency)两种依赖。这两种依赖的基类是NarrowDependency

    @DeveloperApi
    abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
      /**
       * Get the parent partitions for a child partition.
       * @param partitionId a partition of the child RDD
       * @return the partitions of the parent RDD that the child partition depends upon
       */
      def getParents(partitionId: Int): Seq[Int]
      override def rdd: RDD[T] = _rdd
    }
    

    其中一对一依赖的类是OneToOneDependency,实现源码如下

    /**
     * :: DeveloperApi ::
     * Represents a one-to-one dependency between partitions of the parent and child RDDs.
     */
    @DeveloperApi
    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
      override def getParents(partitionId: Int): List[Int] = List(partitionId)
    }
    

    这个类主要重写了getParents方法,输入的参数是子partition的id,我们知道该id是唯一的,返回的是父partitionid
    而多对一的依赖的实现类是RangeDependency,源码如下

    /**
     * :: DeveloperApi ::
     * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
     * @param rdd the parent RDD
     * @param inStart the start of the range in the parent RDD
     * @param outStart the start of the range in the child RDD
     * @param length the length of the range
     */
    @DeveloperApi
    class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
      extends NarrowDependency[T](rdd) {
      override def getParents(partitionId: Int): List[Int] = {
        if (partitionId >= outStart && partitionId < outStart + length) {
          List(partitionId - outStart + inStart)
        } else {
          Nil
        }
      }
    }
    

    这种依赖主要出现在rdd的union过程中,多个rdd的partition经过union,变成一个partition。
    不同于窄依赖,宽依赖的实现只有一种形式,即ShuffleDependency。子rdd的partition依赖于父parent rdd的所有partition,所以该过程使用到了shuffle。

    /**
     * :: DeveloperApi ::
     * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
     * the RDD is transient since we don't need it on the executor side.
     *
     * @param _rdd the parent RDD
     * @param partitioner partitioner used to partition the shuffle output
     * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
     *                   explicitly then the default serializer, as specified by `spark.serializer`
     *                   config option, will be used.
     * @param keyOrdering key ordering for RDD's shuffles
     * @param aggregator map/reduce-side aggregator for RDD's shuffle
     * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
     */
    @DeveloperApi
    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
        @transient private val _rdd: RDD[_ <: Product2[K, V]],
        val partitioner: Partitioner,
        val serializer: Serializer = SparkEnv.get.serializer,
        val keyOrdering: Option[Ordering[K]] = None,
        val aggregator: Option[Aggregator[K, V, C]] = None,
        val mapSideCombine: Boolean = false)
      extends Dependency[Product2[K, V]] {
    
      override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
    
      private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
      private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
      // Note: It's possible that the combiner class tag is null, if the combineByKey
      // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
      private[spark] val combinerClassName: Option[String] =
        Option(reflect.classTag[C]).map(_.runtimeClass.getName)
    
      val shuffleId: Int = _rdd.context.newShuffleId()
    
      val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
        shuffleId, _rdd.partitions.length, this)
    
      _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
    }
    

    在这里我们就不详细介绍shuffle了,后续我将会使用一个专题去介绍spark的shuffle过程。
    下面我们看一下rdd依赖在抽象类RDD中的实现,主要是RDD类中定义了一个

    protected def getDependencies: Seq[Dependency[_]] = deps
    

    其中在抽象类RDD中,默认的是一对一窄依赖

    /** Construct an RDD with just a one-to-one dependency on one parent */
      def this(@transient oneParent: RDD[_]) =
        this(oneParent.context, List(new OneToOneDependency(oneParent)))
    

    前面使用的HadoopRDD中默认的也是一对一窄依赖,在这里我们看一下ShuffledRDD,该rdd重写了getDependencies方法

      override def getDependencies: Seq[Dependency[_]] = {
        val serializer = userSpecifiedSerializer.getOrElse {
          val serializerManager = SparkEnv.get.serializerManager
          if (mapSideCombine) {
            serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
          } else {
            serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
          }
        }
        List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
      }
    

    在这里强调一下,所有的rdd创建都是在SparkContext中实现的,SparkContext中使用不同的算子创建不同的rdd,在不同的rdd中会存在不同的依赖,这也进一步体现了rdd的多样性

    分区器(Optional)

    该特性是可选项,主要针对key-value格式的rdd,需要提供一个partitioner。在抽象类RDD中,默认返回为None。

    /** Optionally overridden by subclasses to specify how they are partitioned. */
      @transient val partitioner: Option[Partitioner] = None
    

    而在ShuffleRdd中,对该变量进行了实现。

    本地性(Optional)

    本地性是一个可选项,有些子rdd中并没有实现,比如JdbcRdd,但是绝大部分的rdd都实现了。该特性在spark中使用十分普遍,极大地提高了spark执行的性能。比如我们知道spark的Data Locality分为以下五大类。
    1.PROCESS_LOCAL:执行的task和数据在同一个jvm中,即同一个executor中。
    2.NODE_LOCAL:task和数据在同一个节点的不同jvm中,即不同的executor中。
    3.NO_PREF:数据从哪儿访问都一样快。
    4.RACK_LOCAL:数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢。
    5.ANY:数据在非同一机架的网络上,速度最慢。
    在这里我们思考一个问题,即spark的data locality是如何实现的?在这里我们直接给出答案,即通过spark的本地性实现的,也就是rdd记录了数据存放的最近位置。在抽象类RDD中的源码如下

    /**
       * Optionally overridden by subclasses to specify placement preferences.
       */
      protected def getPreferredLocations(split: Partition): Seq[String] = Nil
    

    由于RDD没有实现该方法,在这里我们看一下RDD的子类HadoopRdd对其的实现

    override def getPreferredLocations(split: Partition): Seq[String] = {
        val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
        val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
          case Some(c) =>
            try {
              val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
              val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
              HadoopRDD.convertSplitLocationInfo(infos)
            } catch {
              case e: Exception =>
                logDebug("Failed to use InputSplitWithLocations.", e)
                None
            }
          case None => None
        }
        locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
      }
    

    主要是借助了hadoop中的inputSplit类,该类记录了每个split所在的host信息

    缓存

    在这里我把缓存也看做rdd的一个属性。为什么我们总说spark在迭代计算明显优于hadoop,其中rdd的缓存特性就是其中的决定性因素。在spark中,我们可以把需要重复使用的数据进行缓存,这样可以极大地提高执行性能。主要是利用了RDD的persist函数,源码如下:

      /**
       * Mark this RDD for persisting using the specified level.
       *
       * @param newLevel the target storage level
       * @param allowOverride whether to override any existing level with the new one
       */
      private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
        // TODO: Handle changes of StorageLevel
        if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
          throw new UnsupportedOperationException(
            "Cannot change storage level of an RDD after it was already assigned a level")
        }
        // If this is the first time this RDD is marked for persisting, register it
        // with the SparkContext for cleanups and accounting. Do this only once.
        if (storageLevel == StorageLevel.NONE) {
          sc.cleaner.foreach(_.registerRDDForCleanup(this))
          sc.persistRDD(this)
        }
        storageLevel = newLevel
        this
      }
    

    上面的程序首先判断该rdd是否已经缓存过了,如果没有缓存,直接调用了SparkContext类中的persistRDD方法,该方法使用了一个Map结构,其中key用rdd的id表示,value表示rdd的数据。

    private[spark] def persistRDD(rdd: RDD[_]) {
        persistentRdds(rdd.id) = rdd
      }
    
    // Keeps track of all persisted RDDs
      private[spark] val persistentRdds = {
        val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
        map.asScala
      }
    

    下一章我将重点介绍shuffle在spark中的实现。

    相关文章

      网友评论

          本文标题:Spark RDD的基本特征以及源码解析

          本文链接:https://www.haomeiwen.com/subject/ifgqxftx.html