美文网首页hadoop 大数据底层理解
Spark RDD 分布式弹性数据集

Spark RDD 分布式弹性数据集

作者: Tim在路上 | 来源:发表于2022-02-13 10:23 被阅读0次

Spark RDD 分布式弹性数据集

rdd是一种弹性分布式的数据集,它代表着不可变的数据元素,可以被分区并行处理。

rdd是一个粗粒度的数据生成方式和流转迭代计算方式的描述。它可以通过稳定的存储器或者从其他RDD生成,它并不需要急着进行转换,只需要在特定的rdd进行一次性的数据的迭代流转。rdd记录着自己的依赖关系,以防在数据丢失时可以通过“血缘”关系再次生成数据。用户也可以自己选择在经常重用的rdd进行数据落地,放置丢失后重做。

rdd的特性总结:

  1. 显式抽象。将运算中的数据集进行显式抽象,定义了其接口和属性。由于数据集抽象的统一,从而可以将不同的计算过程组合起来进行统一的 DAG 调度。
  2. 基于内存。相较于 MapReduce 中间结果必须落盘,RDD 通过将结果保存在内存中,从而大大降低了单个算子计算延迟以及不同算子之间的加载延迟。
  3. 宽窄依赖。在进行 DAG 调度时,定义了宽窄依赖的概念,并以此进行阶段划分,优化调度计算。
  4. 谱系容错。主要依赖谱系图计算来进行错误恢复,而非进行冗余备份,因为内存实在是有限,只能以计算换存储了。
  5. 交互查询。修改了 Scala 的解释器,使得可以交互式的查询基于多机内存的大型数据集。进而支持类 SQL 等高阶查询语言。

RDD与共享内存的比较

分布式的共享内存是一种细粒度的读写,可以对每个存储单元进行读写,其一致性需要程序进行维护,其容错性需要设置检查点和程序回滚。但是RDD由于是不可变的粗粒度的读写,更适合于批量读写的任务,其可以使用“血缘”机制恢复数据,减少了设置检查点的开销。如果出现失败时,也只用重新计算分区中丢失的那一部分。另一方面,RDD的不可变性可以让系统可以像mapreduce一样采用后备任务的方式来代替运行缓慢的任务,不会出现相互影响的情况。

另外rdd也吸取了分布式共享内存的特性,rdd的批量操作可以根据数据所处的位置进行优化,提高性能。加载数据时,当内存不足时,rdd的性能下降是平稳的,不能载入内存的分区可以存储在磁盘上。

RDD 源码

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

// rdd维护这sparkContext上下文环境和rdd的依赖关系
private def sc: SparkContext = {
    if (_sc == null) {
      throw new SparkException(...)
    }
    _sc
}

/** Construct an RDD with just a one-to-one dependency on one parent */
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context, List(new OneToOneDependency(oneParent)))

// 1. 实现compute函数实现rdd每个分区的迭代计算
// Implemented by subclasses to compute a given partition.
 def compute(split: Partition, context: TaskContext): Iterator[T]

/**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   *
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  // 2. 每个rdd都有一个或多个分区
  protected def getPartitions: Array[Partition]

/**
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  // 3. rdd维护这自己的依赖关系
  protected def getDependencies: Seq[Dependency[_]] = deps

  /**
   * Optionally overridden by subclasses to specify placement preferences.
   */
  // 4. rdd可以根据数据的位置优化分区的计算位置,提高效率
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  /** Optionally overridden by subclasses to specify how they are partitioned. */
  // 5. rdd都有自己的分区器,表明他们分区的方式
  @transient val partitioner: Option[Partitioner] = None
}

上面的5点是rdd都会实现的接口,这也是rdd都具有的特性。

1. RDD 分区

如上源码所示,RDD提供了分区的抽象函数,即protected def getPartitions: Array[Partition],每个继承RDD抽象类的RDD都会有自己的getPartitions的实现。RDD分区的多少代表着计算时的并发粒度。

用户可以自己指定执行的分区数,如果用户不自己指定,则使用默认的分区数。

  • ParallelCollectionRDD


    Untitled.png

    从图中看出,通过sparkContext的parallelize从集合生成RDD, 生成的是ParallelCollectionRDD,而partitions.length 分区数为8。


    Untitled.png
    在生成RDD传入设置分区大小为3,最后生成的分区数为3。
def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
  assertNotStopped()
  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

TaskSchedulerImpl类:
override def defaultParallelism(): Int = backend.defaultParallelism()
CoarseGrainedSchedulerBackend类:
override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

从源码中可以看出,如果不传入分区数,则默认分区数为defaultParallelism,而defaultParallelism=math.max(totalCoreCount.get(), 2)所以最小是2,最大是主机核数。

  • HadoopRDD

HadoopRDD是读取hdfs文件的rdd。HadoopRDD使用的是MapReduce API。

spark.sparkContext.textFile("hdfs://user/local/admin.text") 中textFile是读取hdfs文件的方法。其中会调用HadoopRDD。


override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
    try {
      // 从这里可以看出allInputSplits来自getSplits
      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
      val inputSplits = if (ignoreEmptySplits) {
        allInputSplits.filter(_.getLength > 0)
      } else {
        allInputSplits
      }
      // 分区数为inputSplits.size
      val array = new Array[Partition](inputSplits.size)
      for (i <- 0 until inputSplits.size) {
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
    } catch {
       ...
    }
  }

FileInputFormat类:
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
        ...
        long totalSize = 0L;
        FileStatus[] arr$ = files;
        int len$ = files.length;
        // totalSize 为所有文件的总字节数
        for(int i$ = 0; i$ < len$; ++i$) {
            FileStatus file = arr$[i$];
            if (file.isDirectory()) {
                throw new IOException("Not a file: " + file.getPath());
            }

            totalSize += file.getLen();
        }
        // 对totalSize切分后的字节数goalSize,numSplits默认为min(2,totalCores)=2
        long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
        // minSize为配置的mapreduce切分值
        long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);
        ....
        for(int i$ = 0; i$ < len$; ++i$) {
        ....
            else {
                    long blockSize = file.getBlockSize();
                    long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);

                    long bytesRemaining;
                    String[][] splitHosts;
                    // 如果在切片大小1.1倍内的都不会被切分
                    for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
                        splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
                        splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
                    }

                    if (bytesRemaining != 0L) {
                        splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
                        splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
                    }
                }
            }
        }

        sw.stop();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis());
        }

        return (InputSplit[])splits.toArray(new FileSplit[splits.size()]);
    }

protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
        return Math.max(minSize, Math.min(goalSize, blockSize));
    }

textFile 是从HDFS分布式文件系统的所有节点上读取数据,返回Strings的RDD。

总结下HadoopRDD分区规则:

1.如果textFile指定分区数量为0或者1的话,defaultMinPartitions值为1,则有多少个文件,就会有多少个分区。

2.如果不指定默认分区数量,则默认分区数量为2,则会根据所有文件字节大小totalSize除以分区数量,得到的值goalSize,然后比较goalSize和hdfs指定分块大小(这里是128M)作比较,以较小的最为goalSize作为切分大小,对每个文件进行切分,若文件大于大于goalSize,则会生成该(文件大小/goalSize)个分区,如果文件内的数据不能除尽则分区数会+1,则为(fileSize/goalSize)+1。

3.如果指定分区数量大于等于2,则默认分区数量为指定值,生成实际分区数量规则任然同2中的规则一致。

总之:文件总大小除以分区数,大于分块大小,则与分块大小相关,否则以得到的商相关。

2. RDD 优先位置

rdd优先位置返回的是每一个分区的位置信息,按照移动计算的思路,将计算尽量分配到数据所在的机器上。

override def getPreferredLocations(split: Partition): Seq[String] = {
  val hsplit: InputSplit = split.asInstanceOf[HadoopPartition].inputSplit.value
  val locs = hsplit match {
    case lsplit: InputSplitWithLocationInfo =>
      HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo)
    case _ => None
  }
  // 返回ip不为localhost的列表
  locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}

3. RDD 依赖关系

RDD的操作是粗粒度的操作,RDD进行转换会形成新的RDD。形成的RDD和原RDD形成依赖关系,RDD通过这种“血缘”关系来维护数据的容错性。RDD的依赖关系可以分为宽依赖和窄依赖两种。

  • 窄依赖:父RDD的每一个分区都只被一个子RDD的一个分区依赖。即是一对一的过程,当然也可以是多对一的过程(coalesce() 缩减分区数量)。总之,就是只要不发生shuffle过程,就可以归结为窄依赖的关系。窄依赖的RDD直接可以直接归结为一个pipeline, 分区内的计算可以发生在一台机器上,多个分区可以并发的执行,上一个rdd的分区计算完成后,将结果缓存在内存中,子RDD可以直接使用。其次,窄依赖的明确依赖关系,可以在数据发生错误后,只重新计算发生错误的一个分区。
  • 宽依赖:父RDD的分区被子RDD的多个分区依赖。即多对多的关系,其中由于一个父RDD需要将数据分发到子RDD的多个分区中,(不同分区可能在不同的机器上)所以需要发生数据的读写(shuffle过程)。宽依赖反生数据错误后,需要重新计算多个分区。
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val listRDD = spark.sparkContext.parallelize(1 until n, slices)
val mapRDD = listRDD.map(i => (i, i * i))
val groupRDD = mapRDD.groupByKey()
Untitled.png

从中可以看出mapRDD是OneToOneDependency依赖,其父RDD为ParallelCollectionRDD。


Untitled.png

从中可以看出groupRDD的依赖是ShuffleDependency依赖,其父依赖是MapPartitionsRDD。而groupbykey是需要进行shuffle的算子,属于宽依赖。

Spark通过创建的类来表明,RDD间的依赖关系的类型,NarrowDependency属于窄依赖,ShuffleDenpendency属于宽依赖。之后会通过一节来具体介绍其中的细节。


Untitled.png

4. RDD 分区计算

从上面的RDD源码可以发现,每个RDD中都存在一个compute()的函数,这个函数的作用就是为实现RDD具体的分区计算。

def compute(split: Partition, context: TaskContext): Iterator[T]

compute的返回值是分区的迭代器,每一个分区都会调用这个函数。只有到action算子才会真正的执行计算。

5. RDD 分区函数

partitioner指的是Spark的分区函数,目前最常用的有两种,HashPartitioner和RangePartitioner, 其次还有缩减分区数的分区函数CoalescedPartitioner。分区这个概念,只存在于(K,V)键值对的RDD中,非键值对的RDD中partitioner为None。


Untitled.png

分区函数即决定了RDD本身分区的数量,也决定了Shuffle中MapOut输出中每个分区进行切割的依据。


Untitled.png
从上图可以看出,非k-v RDD的分区器为None, k-v RDD的分区函数默认为HashPartitioner。
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
 ...
  // 如果存在配置并发配置,将并发配置作为默认分区数,否则上个rdd的最大值
  val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
    rdd.context.defaultParallelism
  } else {
    rdds.map(_.partitions.length).max
  }

  // If the existing max partitioner is an eligible one, or its partitions number is larger
  // than the default number of partitions, use the existing partitioner.
  if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
      defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
    hasMaxPartitioner.get.partitioner.get
  } else {
    // 默认返回HashPartitioner
    new HashPartitioner(defaultNumPartitions)
  }
}

HashPartitioner会对数据的key进行 key.hascode%numpartitions 计算,得到的数值会放到对应的分区中,这样能较为平衡的分配数据到partition。

RangePartitioner:它是在排序算子中会用到的分区器,比如sortbykey、sortby、orderby等。该分区器先对输入的数据的key做采样,来估算Key的分布,然后按照指定的排序切分range,尽量让每个partition对应的range里的key分布均匀。

RDD 基本转换

rdd中的算子可以分为两种,一个是transformation, 一个是action算子。

1. Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。

2. Action:行动算子,这类算子会触发SparkContext提交Job作业。

相关文章

网友评论

    本文标题:Spark RDD 分布式弹性数据集

    本文链接:https://www.haomeiwen.com/subject/dqjflrtx.html