美文网首页大数据
RDD分区源码追踪

RDD分区源码追踪

作者: Map_Reduce | 来源:发表于2019-07-31 20:37 被阅读0次

RDD的创建分为从集合中创建RDD和从存储系统中创建RDD,两者的分区策略有所不同,下面将通过例子和源码来介绍它们之间的关系和区别:

从集合(内存)中创建RDD

从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark_RDD_list {
    def main(args: Array[String]): Unit = {
        //配置文件 
        //setMaster("local[*]"):指定分区数为电脑总核数
        //setMater("local"):指定分区数为1
        val conf = new SparkConf().setAppName("xxx").setMaster("local[*]")
        //获取上下文
        val sc = new SparkContext(conf)
        //并行
        //sc.parallelize(List(1,2,3,4))
        //从集合(内存)中创建RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
        val rdd2: RDD[Int] = rdd.map(_*2)
        //真正的开始执行
        rdd2.collect().foreach(println)
        //释放资源
        sc.stop()
    }
}

makeRDD方法底层其实就是parallelize(seq, numSlices),两者使用哪个都行

 def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
  }

由以下代码可以看出默认分区数是totalCores(电脑/主机的总核数0),如果不指定makeRDD()的第二个参数时,会有一个默认的分区数,由下面代码注释可以看出,当指定了分区数,就按照自己指定的数字进行分区。

override def defaultParallelism(): Int =
//从配置文件中取分区数  
//totalCores:总核数,和setMaster("local[*]")有关
    scheduler.conf.getInt("spark.default.parallelism", totalCores)
//如果配置文件中没有配置,那么就使用totalCores
 def getInt(key: String, defaultValue: Int): Int = {
    getOption(key).map(_.toInt).getOrElse(defaultValue)
  }

分区后,集合中的值到底存到下面的哪个分区,就是由下面的这块代码决定的

def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      //numSlices:分区数
      (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
      //start:起始位置
      //end:结束位置
        (start, end)
      }
    }

由外部的存储系统中读取数据创建RDD

包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark_RDD_file {
    def main(args: Array[String]): Unit = {
        //配置文件 
        val conf = new SparkConf().setAppName("xxx").setMaster("local[*]")
        //获取上下文
        val sc = new SparkContext(conf)
        //从存储系统中读取数据
        //底层是TextInputFormat,一行一行的读取数据
        val lineRDD: RDD[String] = sc.textFile("input")
        println("分区数:"+lineRDD.getNumPartitions)
        //lineRDD.foreach(println)
        lineRDD.saveAsTextFile("output")
        //释放资源
        sc.stop()
    }
}

textFile方法,第二个参数minPartitions(最小分区数,有默认值)

  def textFile(
      path: String,
      //最小分区数,分区数可能大于此值
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

defaultMinPartitions方法:由下面代码中的 math.min(defaultParallelism, 2)可以看出,defaultParallelism方法其实和makeRDD方法中求分区数的方法是同一个(上面代码已经给出),因此当指定分区数>=2,由下面代码算出来的最小分区其实都是2,当val conf = new SparkConf().setAppName("xxx").setMaster("local")时,分区数才是1。
注意:

  • setMaster("local[*]")代表totalCores=电脑总核数
  • setMaster("local")代表totalCores=电脑总核数=1
  def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

hadoopFile方法:
通过追踪hadoopFile方法,可以看出,textFile方法底层其实就用到了Hadoop的TextInputFormat类中的getSplits来进行切片,代码逻辑和Hadoop中的逻辑完全一样,具体有多少个分区和文件中的数据到底存到哪个分区就是由下面 getSplits方法中的代码决定的。
其中的getSpilts方法如下:

public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    FileStatus[] files = listStatus(job);
    
    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      //得到总大小,多个文件大小的和
      totalSize += file.getLen(); 
    }
    //numSplits 分区数
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
   //job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE 不在配置文件中指定的话,默认值就是1
   // minSplitSize = 1 
   //所以 minSize = 1
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize(); //windows本地默认是32M,linux 128M 
          //切片大小
          long splitSize = computeSplitSize(goalSize, minSize, blockSize); 
 
          //每行的数据怎么存放?和读取每行的数据有关。假如切片大小为8,第一行的数据不够8,那么就接着读取第二行中的数据,依次类推
          //够了就不读取下一行的数据了(这几行保存到同一个文件中)
          //因此有可能造成有的文件中没有数据
          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //是否大于1.1
            String[] splitHosts = getSplitHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            String[] splitHosts = getSplitHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts));
          }
        } else {
          String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    LOG.debug("Total # of splits: " + splits.size());
    return splits.toArray(new FileSplit[splits.size()]);
  }

相关文章

  • RDD分区源码追踪

    RDD的创建分为从集合中创建RDD和从存储系统中创建RDD,两者的分区策略有所不同,下面将通过例子和源码来介绍它们...

  • Spark中repartition和coalesce的用法

    在Spark的Rdd中,Rdd是分区的。 有时候需要重新设置Rdd的分区数量,比如Rdd的分区中,Rdd分区比较多...

  • Spark_day04

    RDD的 Shuffle 和 分区 RDD的分区操作2.Shuffle 的原理 分区的作用 RDD 使用分区来分布...

  • Spark的优化

    1.RDD重新分区 针对大量小分区的RDD,使用RDD重分区函数coalesce将小分区合并成大分区;同样当分区数...

  • 宽依赖和窄依赖

    窄依赖是指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区(O(1),与数据...

  • Spark算子:统计RDD分区中的元素及数量

    Spark RDD是被分区的,在生成RDD时候,一般可以指定分区的数量,如果不指定分区数量,当RDD从集合创建时候...

  • SparkCore之RDD

    RDD 五大特性 A list of partitions一组分区:多个分区,在RDD中用分区的概念。 A fun...

  • RDD依赖关系

    前言 RDD的五大特性 A list of partitions一组分区:多个分区,在RDD中用分区的概念。 A ...

  • RDD(二)

    class RDD源码解析 1.1 RDD源码 1.2 RDD类解释 1.3 RDD class中如何体现RDD的...

  • 《Spark快速大数据分析》读书笔记——为啥Spark在join

    分区之前 分区之后 注意观察箭头的方向,且userData这个RDD比events这个RDD大得多,原来没有分区的...

网友评论

    本文标题:RDD分区源码追踪

    本文链接:https://www.haomeiwen.com/subject/xsxorctx.html