美文网首页hadoop 大数据底层理解
Spark RDD 分布式弹性数据集

Spark RDD 分布式弹性数据集

作者: Tim在路上 | 来源:发表于2022-02-13 10:23 被阅读0次

    Spark RDD 分布式弹性数据集

    rdd是一种弹性分布式的数据集,它代表着不可变的数据元素,可以被分区并行处理。

    rdd是一个粗粒度的数据生成方式和流转迭代计算方式的描述。它可以通过稳定的存储器或者从其他RDD生成,它并不需要急着进行转换,只需要在特定的rdd进行一次性的数据的迭代流转。rdd记录着自己的依赖关系,以防在数据丢失时可以通过“血缘”关系再次生成数据。用户也可以自己选择在经常重用的rdd进行数据落地,放置丢失后重做。

    rdd的特性总结:

    1. 显式抽象。将运算中的数据集进行显式抽象,定义了其接口和属性。由于数据集抽象的统一,从而可以将不同的计算过程组合起来进行统一的 DAG 调度。
    2. 基于内存。相较于 MapReduce 中间结果必须落盘,RDD 通过将结果保存在内存中,从而大大降低了单个算子计算延迟以及不同算子之间的加载延迟。
    3. 宽窄依赖。在进行 DAG 调度时,定义了宽窄依赖的概念,并以此进行阶段划分,优化调度计算。
    4. 谱系容错。主要依赖谱系图计算来进行错误恢复,而非进行冗余备份,因为内存实在是有限,只能以计算换存储了。
    5. 交互查询。修改了 Scala 的解释器,使得可以交互式的查询基于多机内存的大型数据集。进而支持类 SQL 等高阶查询语言。

    RDD与共享内存的比较

    分布式的共享内存是一种细粒度的读写,可以对每个存储单元进行读写,其一致性需要程序进行维护,其容错性需要设置检查点和程序回滚。但是RDD由于是不可变的粗粒度的读写,更适合于批量读写的任务,其可以使用“血缘”机制恢复数据,减少了设置检查点的开销。如果出现失败时,也只用重新计算分区中丢失的那一部分。另一方面,RDD的不可变性可以让系统可以像mapreduce一样采用后备任务的方式来代替运行缓慢的任务,不会出现相互影响的情况。

    另外rdd也吸取了分布式共享内存的特性,rdd的批量操作可以根据数据所处的位置进行优化,提高性能。加载数据时,当内存不足时,rdd的性能下降是平稳的,不能载入内存的分区可以存储在磁盘上。

    RDD 源码

    abstract class RDD[T: ClassTag](
        @transient private var _sc: SparkContext,
        @transient private var deps: Seq[Dependency[_]]
      ) extends Serializable with Logging {
    
    // rdd维护这sparkContext上下文环境和rdd的依赖关系
    private def sc: SparkContext = {
        if (_sc == null) {
          throw new SparkException(...)
        }
        _sc
    }
    
    /** Construct an RDD with just a one-to-one dependency on one parent */
      def this(@transient oneParent: RDD[_]) =
        this(oneParent.context, List(new OneToOneDependency(oneParent)))
    
    // 1. 实现compute函数实现rdd每个分区的迭代计算
    // Implemented by subclasses to compute a given partition.
     def compute(split: Partition, context: TaskContext): Iterator[T]
    
    /**
       * Implemented by subclasses to return the set of partitions in this RDD. This method will only
       * be called once, so it is safe to implement a time-consuming computation in it.
       *
       * The partitions in this array must satisfy the following property:
       *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
       */
      // 2. 每个rdd都有一个或多个分区
      protected def getPartitions: Array[Partition]
    
    /**
       * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
       * be called once, so it is safe to implement a time-consuming computation in it.
       */
      // 3. rdd维护这自己的依赖关系
      protected def getDependencies: Seq[Dependency[_]] = deps
    
      /**
       * Optionally overridden by subclasses to specify placement preferences.
       */
      // 4. rdd可以根据数据的位置优化分区的计算位置,提高效率
      protected def getPreferredLocations(split: Partition): Seq[String] = Nil
    
      /** Optionally overridden by subclasses to specify how they are partitioned. */
      // 5. rdd都有自己的分区器,表明他们分区的方式
      @transient val partitioner: Option[Partitioner] = None
    }
    

    上面的5点是rdd都会实现的接口,这也是rdd都具有的特性。

    1. RDD 分区

    如上源码所示,RDD提供了分区的抽象函数,即protected def getPartitions: Array[Partition],每个继承RDD抽象类的RDD都会有自己的getPartitions的实现。RDD分区的多少代表着计算时的并发粒度。

    用户可以自己指定执行的分区数,如果用户不自己指定,则使用默认的分区数。

    • ParallelCollectionRDD


      Untitled.png

      从图中看出,通过sparkContext的parallelize从集合生成RDD, 生成的是ParallelCollectionRDD,而partitions.length 分区数为8。


      Untitled.png
      在生成RDD传入设置分区大小为3,最后生成的分区数为3。
    def parallelize[T: ClassTag](
        seq: Seq[T],
        numSlices: Int = defaultParallelism): RDD[T] = withScope {
      assertNotStopped()
      new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
    }
    
    TaskSchedulerImpl类:
    override def defaultParallelism(): Int = backend.defaultParallelism()
    CoarseGrainedSchedulerBackend类:
    override def defaultParallelism(): Int = {
        conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
    }
    

    从源码中可以看出,如果不传入分区数,则默认分区数为defaultParallelism,而defaultParallelism=math.max(totalCoreCount.get(), 2)所以最小是2,最大是主机核数。

    • HadoopRDD

    HadoopRDD是读取hdfs文件的rdd。HadoopRDD使用的是MapReduce API。

    spark.sparkContext.textFile("hdfs://user/local/admin.text") 中textFile是读取hdfs文件的方法。其中会调用HadoopRDD。

    
    override def getPartitions: Array[Partition] = {
        val jobConf = getJobConf()
        // add the credentials here as this can be called before SparkContext initialized
        SparkHadoopUtil.get.addCredentials(jobConf)
        try {
          // 从这里可以看出allInputSplits来自getSplits
          val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
          val inputSplits = if (ignoreEmptySplits) {
            allInputSplits.filter(_.getLength > 0)
          } else {
            allInputSplits
          }
          // 分区数为inputSplits.size
          val array = new Array[Partition](inputSplits.size)
          for (i <- 0 until inputSplits.size) {
            array(i) = new HadoopPartition(id, i, inputSplits(i))
          }
          array
        } catch {
           ...
        }
      }
    
    FileInputFormat类:
    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
            ...
            long totalSize = 0L;
            FileStatus[] arr$ = files;
            int len$ = files.length;
            // totalSize 为所有文件的总字节数
            for(int i$ = 0; i$ < len$; ++i$) {
                FileStatus file = arr$[i$];
                if (file.isDirectory()) {
                    throw new IOException("Not a file: " + file.getPath());
                }
    
                totalSize += file.getLen();
            }
            // 对totalSize切分后的字节数goalSize,numSplits默认为min(2,totalCores)=2
            long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
            // minSize为配置的mapreduce切分值
            long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);
            ....
            for(int i$ = 0; i$ < len$; ++i$) {
            ....
                else {
                        long blockSize = file.getBlockSize();
                        long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
    
                        long bytesRemaining;
                        String[][] splitHosts;
                        // 如果在切片大小1.1倍内的都不会被切分
                        for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
                            splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
                            splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
                        }
    
                        if (bytesRemaining != 0L) {
                            splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
                            splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
                        }
                    }
                }
            }
    
            sw.stop();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis());
            }
    
            return (InputSplit[])splits.toArray(new FileSplit[splits.size()]);
        }
    
    protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
            return Math.max(minSize, Math.min(goalSize, blockSize));
        }
    

    textFile 是从HDFS分布式文件系统的所有节点上读取数据,返回Strings的RDD。

    总结下HadoopRDD分区规则:

    1.如果textFile指定分区数量为0或者1的话,defaultMinPartitions值为1,则有多少个文件,就会有多少个分区。

    2.如果不指定默认分区数量,则默认分区数量为2,则会根据所有文件字节大小totalSize除以分区数量,得到的值goalSize,然后比较goalSize和hdfs指定分块大小(这里是128M)作比较,以较小的最为goalSize作为切分大小,对每个文件进行切分,若文件大于大于goalSize,则会生成该(文件大小/goalSize)个分区,如果文件内的数据不能除尽则分区数会+1,则为(fileSize/goalSize)+1。

    3.如果指定分区数量大于等于2,则默认分区数量为指定值,生成实际分区数量规则任然同2中的规则一致。

    总之:文件总大小除以分区数,大于分块大小,则与分块大小相关,否则以得到的商相关。

    2. RDD 优先位置

    rdd优先位置返回的是每一个分区的位置信息,按照移动计算的思路,将计算尽量分配到数据所在的机器上。

    override def getPreferredLocations(split: Partition): Seq[String] = {
      val hsplit: InputSplit = split.asInstanceOf[HadoopPartition].inputSplit.value
      val locs = hsplit match {
        case lsplit: InputSplitWithLocationInfo =>
          HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo)
        case _ => None
      }
      // 返回ip不为localhost的列表
      locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
    }
    

    3. RDD 依赖关系

    RDD的操作是粗粒度的操作,RDD进行转换会形成新的RDD。形成的RDD和原RDD形成依赖关系,RDD通过这种“血缘”关系来维护数据的容错性。RDD的依赖关系可以分为宽依赖和窄依赖两种。

    • 窄依赖:父RDD的每一个分区都只被一个子RDD的一个分区依赖。即是一对一的过程,当然也可以是多对一的过程(coalesce() 缩减分区数量)。总之,就是只要不发生shuffle过程,就可以归结为窄依赖的关系。窄依赖的RDD直接可以直接归结为一个pipeline, 分区内的计算可以发生在一台机器上,多个分区可以并发的执行,上一个rdd的分区计算完成后,将结果缓存在内存中,子RDD可以直接使用。其次,窄依赖的明确依赖关系,可以在数据发生错误后,只重新计算发生错误的一个分区。
    • 宽依赖:父RDD的分区被子RDD的多个分区依赖。即多对多的关系,其中由于一个父RDD需要将数据分发到子RDD的多个分区中,(不同分区可能在不同的机器上)所以需要发生数据的读写(shuffle过程)。宽依赖反生数据错误后,需要重新计算多个分区。
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val listRDD = spark.sparkContext.parallelize(1 until n, slices)
    val mapRDD = listRDD.map(i => (i, i * i))
    val groupRDD = mapRDD.groupByKey()
    
    Untitled.png

    从中可以看出mapRDD是OneToOneDependency依赖,其父RDD为ParallelCollectionRDD。


    Untitled.png

    从中可以看出groupRDD的依赖是ShuffleDependency依赖,其父依赖是MapPartitionsRDD。而groupbykey是需要进行shuffle的算子,属于宽依赖。

    Spark通过创建的类来表明,RDD间的依赖关系的类型,NarrowDependency属于窄依赖,ShuffleDenpendency属于宽依赖。之后会通过一节来具体介绍其中的细节。


    Untitled.png

    4. RDD 分区计算

    从上面的RDD源码可以发现,每个RDD中都存在一个compute()的函数,这个函数的作用就是为实现RDD具体的分区计算。

    def compute(split: Partition, context: TaskContext): Iterator[T]

    compute的返回值是分区的迭代器,每一个分区都会调用这个函数。只有到action算子才会真正的执行计算。

    5. RDD 分区函数

    partitioner指的是Spark的分区函数,目前最常用的有两种,HashPartitioner和RangePartitioner, 其次还有缩减分区数的分区函数CoalescedPartitioner。分区这个概念,只存在于(K,V)键值对的RDD中,非键值对的RDD中partitioner为None。


    Untitled.png

    分区函数即决定了RDD本身分区的数量,也决定了Shuffle中MapOut输出中每个分区进行切割的依据。


    Untitled.png
    从上图可以看出,非k-v RDD的分区器为None, k-v RDD的分区函数默认为HashPartitioner。
    def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
     ...
      // 如果存在配置并发配置,将并发配置作为默认分区数,否则上个rdd的最大值
      val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
        rdd.context.defaultParallelism
      } else {
        rdds.map(_.partitions.length).max
      }
    
      // If the existing max partitioner is an eligible one, or its partitions number is larger
      // than the default number of partitions, use the existing partitioner.
      if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
          defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
        hasMaxPartitioner.get.partitioner.get
      } else {
        // 默认返回HashPartitioner
        new HashPartitioner(defaultNumPartitions)
      }
    }
    

    HashPartitioner会对数据的key进行 key.hascode%numpartitions 计算,得到的数值会放到对应的分区中,这样能较为平衡的分配数据到partition。

    RangePartitioner:它是在排序算子中会用到的分区器,比如sortbykey、sortby、orderby等。该分区器先对输入的数据的key做采样,来估算Key的分布,然后按照指定的排序切分range,尽量让每个partition对应的range里的key分布均匀。

    RDD 基本转换

    rdd中的算子可以分为两种,一个是transformation, 一个是action算子。

    1. Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。

    2. Action:行动算子,这类算子会触发SparkContext提交Job作业。

    相关文章

      网友评论

        本文标题:Spark RDD 分布式弹性数据集

        本文链接:https://www.haomeiwen.com/subject/dqjflrtx.html