[spark] RDD解析

作者: BIGUFO | 来源:发表于2017-10-03 13:00 被阅读127次

    RDD(Resilient Distributed Dataset):弹性分布式数据集。

    特性

    • A list of partitions (可分片)
    • A function for computing each split (compute func)
    • A list of dependencies on other RDDs (依赖)
    • A Partitioner for key-value RDDs (分片器,决定一条数据属于某分片)
    • A list of preferred locations to compute each split on (e.g. block locations for an HDFS file) (位置优先)

    Partition

    1. RDD能并行计算的原因就是Partition,一个RDD可有多个partition,每个partition一个task任务,每个partition代表了该RDD一部分数据,分区内部并不会存储具体的数据,访问数据时是通过partition的迭代器,iterator 可遍历到所有数据。
    2. partition的个数需要视情况而定,RDD 可以通过创建操作或者转换操作得到,转换操作中,分区的个数会根据转换操作对应多个 RDD 之间的依赖关系确定,窄依赖子 RDD 由父 RDD 分区个数决定,Shuffle 依赖由子 RDD 分区器决定,从集合中创建RDD时默认个数为defaultParallelism,当该值没有设定时:
      • 本地模式: conf.getInt("spark.default.parallelism", totalCores) // CPU cores
      • Mesos: conf.getInt("spark.default.parallelism", 8) // 8
      • Standalone&Yarn: conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
    3. 特质Partition只有一个返回index的方法,很多具体的 RDD 也会有自己实现的 partition。
      trait Partition extends Serializable { 
        def index: Int
        override def hashCode(): Int = index
        override def equals(other: Any): Boolean = super.equals(other)
      }
    

    compute func

    每个具体的RDD都得实现compute 方法,该方法接受的参数之一是一个Partition 对象,目的是计算该分区中的数据。
    我们通过map方法来看具体的实现:

    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
      }
    

    调用map时都会new一个MapPartitionsRDD实例,并且接收一个方法作为参数,该方法接收一个迭代器(后面会细讲),对该RDD的map操作函数f将作用于这个迭代器的每一条数据。在MapPartitionsRDD中是通过compute方法来计算对应分区的数据:

    override def compute(split: Partition, context: TaskContext): Iterator[U] =
        f(context, split.index, firstParent[T].iterator(split, context))
    

    这里将调用该RDD(MapPartitionsRDD)内的第一个父 RDD 的 iterator 方法,该方的目的是拉取父 RDD 对应分区内的数据。iterator方法会返回一个迭代器,对应的是父RDD计算完成的数据,该迭代器将作为 f 方法的一个参数,该f 方法就是上面提到的创建MapPartitionsRDD实例时传入的方法。

    其实RDD的compute方法也类似。接下来我们看看iterator方法究竟都做了什么事:

    final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
        if (storageLevel != StorageLevel.NONE) {
          getOrCompute(split, context)
        } else {
          computeOrReadCheckpoint(split, context)
        }
      }
    

    RDD的iterator方法即遍历对应分区的数据,先判断改RDD的存储级别若不为NONE,则说明该数据已经存在于缓存中,RDD 经过持久化操作并经历了一次计算过程 ,可直接将数据返回。

    private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
        val blockId = RDDBlockId(id, partition.index)
        var readCachedBlock = true
        // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
        SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
          readCachedBlock = false
          computeOrReadCheckpoint(partition, context)
        }) match {
          case Left(blockResult) =>
            if (readCachedBlock) {
              val existingMetrics = context.taskMetrics().inputMetrics
              existingMetrics.incBytesRead(blockResult.bytes)
              new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
                override def next(): T = {
                  existingMetrics.incRecordsRead(1)
                  delegate.next()
                }
              }
            } else {
              new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
            }
           ...
        }
      }
    

    通过RDD_id和partition_index唯一表示一个block,先从缓存中取数据,也有可能取不到数据

    • 数据丢失
    • RDD 经过持久化操作,但是是当前分区数据是第一次被计算,因此会出现拉取得到数据为 None

    取不到的时候则调用computeOrReadCheckpoint来获取并加入缓存。
    当RDD的存储级别若为NONE,则需要直接通过computeOrReadCheckpoint方法来计算。

    private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
      {
        if (isCheckpointedAndMaterialized) {
          firstParent[T].iterator(split, context)
        } else {
          compute(split, context)
        }
      }
    

    该方法会先检查当前RDD是否被checkpoint,若是则直接从依赖的checkpointRDD中获取迭代对象,若不是则需要通过compute方法计算。

    dependency

    RDD的容错机制就是通过dependency实现的,在外部成为血统(Lineage)关系,在源码里面实为dependency,抽象类Dependency只有一个返回对应RDD的方法。

    abstract class Dependency[T] extends Serializable {
      def rdd: RDD[T]
    }
    

    每个RDD都有一个返回其所依赖的dependences:Seq[Dependency[_]] 的dependencies方法,Dependency里面存的就是父RDD,递归RDD+遍历这个dependences将可得到整个DAG。

    依赖分为两种,分别是窄依赖(Narrow Dependency)和 Shuffle 依赖(Shuffle Dependency,也称即宽依赖)。在窄依赖中,父RDD的一个分区至多被一个子RDD的一个分区所依赖,分区数据不可被拆分:

    在宽依赖中,父RDD的一个分区被子RDD的多个分区所依赖,分区数据被拆分:

    一次转换操作可同时包含窄依赖和宽依赖:


    窄依赖的抽象类为NarrowDependency,对应实现分别是 OneToOneDependency (一对一依赖)类和RangeDependency (范围依赖)类。

    一对一依赖表示子 RDD 分区的编号与父 RDD 分区的编号完全一致的情况,若两个 RDD 之间存在着一对一依赖,则子 RDD 的分区个数、分区内记录的个数都将继承自父 RDD。

    范围依赖是依赖关系中的一个特例,只被用于表示 UnionRDD 与父 RDD 之间的依赖关系。

    宽依赖的对应实现为 ShuffleDependency 类,宽依赖支持两种 Shuffle Manager,即 HashShuffleManager 和 SortShuffleManager

    Partitioner

    partitioner就是决定一条数据应该数据哪个分区的分区器,但只有 k, v 类型的 RDD 才能有 partitioner,因为都是由其 k 来决定的。
    特质 Partitioner提供了一个返回分区index的方法,通过传入k及指定的分区个数:

    trait Partitioner { 
      def partition(key: Any, numPartitions: Int): Int
    }
    

    Spark 内置了两种分区器,分别是哈希分区器(Hash Partitioner)和范围分区器(Range Partitioner)。

    Hash Partitioner

    我们来看HashPartitioner的定义,主要是getPartition方法,当key为null时直接返回null

    class HashPartitioner(partitions: Int) extends Partitioner {
      def numPartitions: Int = partitions
      def getPartition(key: Any): Int = key match {
        case null => 0
        case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
      }
      override def equals(other: Any): Boolean = other match {
        case h: HashPartitioner =>
          h.numPartitions == numPartitions
        case _ =>
          false
      }
      override def hashCode: Int = numPartitions
    }
    

    不为null时将调用Utils的nonNegativeMod方法,即将key的hashcode和mod取余,若结果为正,则返回该结果;若结果为负,返回结果加上 mod。

     def nonNegativeMod(x: Int, mod: Int): Int = {
        val rawMod = x % mod
        rawMod + (if (rawMod < 0) mod else 0)
      }
    

    RangePartitioner

    在key的hashcode分布不均的情况下会到导致通过HashPartitioner分出来的分区数据倾斜不均匀,这是就需要用到RangePartitioner分区器,该分区器运行速度相对HashPartitioner较慢,原理复杂。

    HashPartitioner会将一个范围的key直接映射到一个partition,也就是一个partition的key一定比另一个partition的key都大或者都小,而怎么具体划分这个范围的边界成为关键,既要保证分布均匀又要减少遍历次数。具体实现可见 Spark分区器HashPartitioner和RangePartitioner代码详解

    preferred locations

    每个具体的RDD实例都需要实现自己的getPreferredLocations方法,RDD位置优先即返回partition的存储位置,该位置和spark的任务调度有关,尽量将计算移到该partition对应的地方。
    以从Hadoop中读取数据生成RDD为例,preferredLocations返回每一个数据块所在的机器名或者IP地址,如果每一个数据块是多份存储的(HDFS副本数),那么就会返回多个机器地址。

    相关文章

      网友评论

        本文标题:[spark] RDD解析

        本文链接:https://www.haomeiwen.com/subject/wkityxtx.html