Spark的编程核心RDD的实现详解

作者: 叫我不矜持 | 来源:发表于2019-06-11 14:41 被阅读0次

一.什么是RDD

RDD是弹性分布式数据集(Resilient Distributed Dataset),RDD是只读的、 分区记录的集合。 RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。 这些确定性操作称为转换, 如map、 filter、 groupBy、 join。RDD含有如何从其他RDD衍生(即计算)出本RDD的相关信息(即Lineage) , 因此在RDD部分分区数据丢失的时候可以从物理存储的数据计算出相应的RDD分区。

每个RDD有5个主要的属性:
1.分区列表(A list of partitions)
对于RDD来说, 每个分区都会被一个计算任务处理, 并决定并行计算的粒度。 用户可以在创建RDD时指定RDD的分区个数, 如果没有指定, 那么就会采用默认值。 默认值就是程序所分配到的CPU Core的数目。在分区存储的计算模型中,每个分配的存储是由BlockManager实现的。 每个分区都会被逻辑映射成BlockManager的一个Block, 而这个Block会被一个Task负责计算。

2.计算每个分片的函数
Spark中RDD的计算是以分片为单位的, 每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合, 不需要保存每次计算的结果。

3.RDD之间的依赖关系
RDD的每次转换都会生成一个新的RDD, 所以RDD之间就会形成类似于流水线一样的前后依赖关系。 在部分分区数据丢失时, Spark可以通过这个依赖关系重新计算丢失的分区数据, 而不是对RDD的所有分区进行重新计算。

4.分区函数Partitioner
当前Spark中实现了两种类型的分片函数, 一个是基于哈希的HashPartitioner, 另外一个是基于范围的RangePartitioner。 只有对于key-value的RDD, 才会有Partitioner, 非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量, 也决定了parent RDD Shuffle输出时的分片数量。

5.每个Partition的优先位置(preferred location)
一个列表, 存储存取每个Partition的优先位置(preferred location) 。 对于一个HDFS文件来说, 这个列表保存的就是每个Partition所在的块的位置。 按照“计算向数据移动”的理念, Spark在进行任务调度的时候, 会尽可能地将计算任务分配到
其所要处理数据块的存储位置。

二.如何创建RDD

可通过以下几种方式创建RDD:

  • 通过读取外部数据集 (本地文件系统/HDFS/Cassandra/HBase/...)
  • 通过一个已经存在的Scala集合创建(List/Set/...)
  • 通过已有的RDD生成新的RDD

三.Spark对RDD操作方式

Spark对RDD的操作分两种,即转换操作(Transformation)和动作操作(Action)。

1.转换操作
不触发实际计算,从现有的数据集创建一个新的数据集,返回一个新的RDD,例如对数据的匹配操作map和过滤操作filter,惰性求值。

2.动作操作
会触发实际计算,即在数据集上进行计算后,会向Driver程序驱动器返回结果或将结果写到外部系统。

如何区别两种操作?
看返回值类型,返回RDD类型的为转换操作,返回其他数据类型的是行动操作。

惰性求值?
RDD中的所有转换都是惰性的, 也就是说, 它们并不会直接计算结果。 相反的, 它们只是记住这些应用到最原始数据集上的转换操作。 只有当发生动作操作(Action)返回结果给Driver的动作时, 这些转换才会真正运行。 这个设计让Spark更加有效率地运行。

为何会有惰性求值?
如果每经过一次转换操作都触发计算,将会有系统负担,而惰性求值会将多个转换操作合并到一起,抵消不必要的步骤后,在最后必要的时才进行运算,获得性能的提升同时又减轻系统运算负担。

Transformation操作

函数名 目的 示例 结果
map(f) 将函数应用于每一个元素中,返回值构成新的RDD rdd.map(x=>x+1) {2,3,4,4}
flatMap(f) 将函数应用于每一个元素中,并把元素中迭代器内所有内容一并生成新的RDD,常用于切分单词 rdd.flatMap(x=>x.to(3)) {1,2,3,,2,3,3,3}
filter(f) 过滤元素 rdd.filter(x=>x!=1) {2,3,3}
distinct() 元素去重 rdd.distinct() {1,2,3}
sample( withReplacement, fraction , [seed] ) 元素采样,以及是否需要替换 rdd.sample(false,0.5) 不确定值,不确定数目
union(rdd) 合并两个RDD所有元素(不去重) rdd1.union(rdd2) {1,2,3,3,4,5}
intersection(rdd) 求两个RDD的交集 rdd1.intersection(rdd2) {3}
substract(rdd) 移除在RDD2中存在的RDD1元素 rdd1.substract(rdd2) {1,2}
cartesian(rdd) 求两个RDD的笛卡尔积 rdd1.cartesian(rdd2) {(1,3),(1,4),(1,5)...(3,5)}

Action操作

函数名 目的 示例 结果
collect() 收集并返回RDD中所有元素 rdd.collect() {1,2,3,3}
count() RDD中元素的个数 rdd.count() 4
countByValue() 各元素出现的个数 rdd.countByValue() {(1,1),(2,1),(3,2)}
take(num) 从RDD中返回num个元素 rdd.take(2) {1,2}
top(num) 返回最前面的num个元素 rdd.take(2) {3,3}
takeOrdered(num,[ordering]) 按提供的顺序返回前num个元素 rdd.takeOrdered(2,[myOrdering]) {3,3}
takeSample(withReplacement, num ,[seed]) 返回任意元素 takeSample(false,1) 不确定值
reduce(f) 并行整合RDD中所有元素,返回一个同一类型元素 rdd.reduce((x,y) => x+y ) 9
fold(zeroValue)(f) 与reduce一样,不过需要提供初始值 rdd.fold(0)((x,y) => x+y ) 9
aggregate(zeroValue)(seqOp , combOp) 与reduce相似,不过返回不同类型的元素 rdd. aggregate(( 0, 0)) ((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) {9,4}
foreach(f) 给每个元素使用给定的函数,结果不需发回本地 rdd.foreach(f)

四.RDD的持久化(缓存)

Spark速度非常快的原因之一, 就是在不同操作中在内存中持久化(或缓存) 一个数据集。 当持久化一个RDD后, 每一个节点
都将把计算的分片结果保存在内存中, 并在对此数据集(或者衍生出的数据集) 进行的其他动作(action) 中重用。 这使得后
续的动作变得更加迅速(通常快10倍) 。 RDD相关的持久化和缓存, 是Spark最重要的特征之一。 可以说, 缓存是Spark构建迭代式算法和快速交互式查询的关键。

出于不同目的和场景需求,我们可选择的持久化级别有:

级别 使用空间 CPU时间 是否在内存中 是否在磁盘上
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK 部分 部分
MEMORY_AND_DISK_SER 部分 部分
DISK_ONLY

我们可以通过persist() 或cache() 方法可以标记一个要被持久化的RDD, 一旦首次被触发, 该RDD将会被保留在计算节点的内存中并重用。

persist的源码实现如下

 /**
   * Set this RDD's storage level to persist its values across operations after the first time
   * it is computed. This can only be used to assign a new storage level if the RDD does not
   * have a storage level set yet. Local checkpointing is an exception.
   */
  def persist(newLevel: StorageLevel): this.type = {
    if (isLocallyCheckpointed) {
      // This means the user previously called localCheckpoint(), which should have already
      // marked this RDD for persisting. Here we should override the old storage level with
      // one that is explicitly requested by the user (after adapting it to use disk).
      persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
    } else {
      persist(newLevel, allowOverride = false)
    }
  }

代码示例

scala> val rdd = sc.parallelize(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> val rdd1 = rdd.map(_+5)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:26

scala> rdd1 = rdd1.persist(StorageLevel.MEMORY_ONLY)
res2: rdd1.type = MapPartitionsRDD[6] at map at <console>:26

scala> rdd1.reduce(_+_)
res3: Int = 40

scala> rdd1.count()
res4: Long = 5

scala> rdd1.first()
res5: Int = 6

scala> rdd1.unpersist()
res7: rdd1.type = MapPartitionsRDD[6] at map at <console>:26

如果要缓存的数据太多,内存放不下,Spark会自动使用LRU(最近最小使用)的缓存策略把最老的分区从内存中移除。同时缓存有可能丢失,RDD的缓存的容错机制保证了即使缓存丢失也能保证计算的正确执行。 通过基于RDD的一系列的转换, 丢失的数据会被重算。 RDD的各个Partition是相对独立的, 因此只需要计算丢失的部分即可, 并不需要重算全部Partition。

最后,可调用rdd.unpersist()方法手动移除RDD缓存。

五.RDD之间的依赖关系

Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系, RDD之间的关系可以从两个维度来理解:

  • RDD是从哪些RDD转换而来, 也就是RDD的parent RDD是什么
  • RDD中的Partition,依赖于parent RDD中 的哪些Partition

根据依赖于parentRDD的Partitions的不同情况, Spark将这种依赖分为两种, 一种是宽依赖, 一种是窄依赖。

  • 窄依赖指的是子RDD依赖于父RDD中固定的Partitions
  • 宽依赖指的是子RDD对父RDD中的所有partition都有依赖,或者说依赖于父RDD的数量不能明确
窄依赖和宽依赖

从上面的图中我们可以理解下这两种依赖关系之间的区别

对于map和filter形式的转换来说, 它们只是将Partition的数据根据转换的规则进行转化, 并不涉及其他的处理, 可以简单地认为
只是将数据从一个形式转换到另一个形式。

对于union, 只是将多个RDD合并成一个,parent RDD的Partition不会有任何的变化, 可以认为只是把parent RDD的Partition 简单进行复制与合并。

对于join, 如果每个Partition仅仅和已知的、 特定的Partition进行join, 那么这个依赖关系也是窄依赖。 对于这种有规则的数据的join, 并不会引入昂贵的Shuffle。 对于窄依赖, 由于RDD每个Partition依赖固定数量的parent RDD的Partition, 因此可以通过一个计算任务来处理这些Partition, 并且这些Partition相互独立, 这些计算任务也就可以并行执行了。

对于groupByKey, 子RDD的所有Partition会依赖于parent RDD的所有Partition, 子RDD的Partition是parent RDD的所有Partition Shuffle的结果, 因此这两个RDD是不能通过一个计算任务来完成的。 同样, 对于需要parent RDD的所有Partition进行join的转换, 也是需要Shuffle, 这类join的依赖就是宽依赖而不是前面提到的窄依赖了。

五.RDD依赖关系的具体代码实现

RDD的依赖关系继承图

Spark中对应窄依赖的的抽象类为NarrowDependency,具体实现有两种。

一种是一对一的依赖, 即OneToOneDependency:

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
     override def getParents(partitionId: Int) = List(partitionId)
}

通过OneToOneDependency的源码中的getParents的实现不难看出, RDD仅仅依赖于parent RDD相同ID的Partition。

还有一个是范围的依赖, 即RangeDependency:
它仅仅被UnionRDD使用,UnionRDD是把多个RDD合成一个RDD,这些RDD是被拼接而成, 即每个parent RDD的Partition的相对顺序不会变, 只不过每个parent RDD在UnionRDD中的Partition的起始位置不同。 因此它的getPartents如下:

class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    // inStart是parent RDD中Partition的起始位置
    // outStart是在UnionRDD中的起始位置
    // length就是parent RDD中Partition的数量。
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

宽依赖的实现只有一种,ShuffleDependency的实现相对前面几种较为复杂,会在后续的文章中详细讲解...

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

六.区分两种依赖的作用

1.划分 Stage
根据RDD之间的依赖关系将DAG图划分为不同的阶段Stage( Stage之间的依赖关系可以认为就是Lineage)。

对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中。

而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算,因此宽依赖要单独划分一个Stage。

Stage 之间做 shuffle,Stage 之内做 pipeline(流水线)。方便stage内优化。

2.解决数据容错的高效性

假如某个节点出故障了,窄依赖只需重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖关系的Lineage图,单个节点失效可能导致这个RDD的所有父RDD都要进行重新计算。

七.RDD的检查点(checkpoint)机制

RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。但是,如果缓存丢失了,则需要重新计算。如果计算特别复杂或者计算耗时特别多,那么缓存丢失对于整个Job的影响是不容忽视的。

为了避免缓存丢失重新计算带来的开销,Spark又引入了检查点(checkpoint)机制。

缓存是在计算结束后,直接将计算结果写入不同的介质。而检查点不同,它是在计算完成后,为数据创建一个目录,并且将计算结果写入新创建的目录,之后重新建立一个Job来计算。接着创建一个CheckpointRDD,RDD变成CheckPointRDD后,前边的所有RDD依赖都会被移除。这就意味着RDD的转换的计算链(compute chain) 等信息都被清除。

一般推荐先将RDD缓存,这样就能保证检查点的操作可以快速完成。

设置检查点:

//设置检查点目录 存储在HDFS上,并使用checkpoint设置检查点,该操作属于懒加载
sc.setCheckpointDir("hdfs://xxxx:9000/checkpoint/")
rdd.checkpoint()

相关文章

网友评论

    本文标题:Spark的编程核心RDD的实现详解

    本文链接:https://www.haomeiwen.com/subject/rpmaxctx.html