RDD的创建分为从集合中创建RDD和从存储系统中创建RDD,两者的分区策略有所不同,下面将通过例子和源码来介绍它们之间的关系和区别:
从集合(内存)中创建RDD
从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark_RDD_list {
def main(args: Array[String]): Unit = {
//配置文件
//setMaster("local[*]"):指定分区数为电脑总核数
//setMater("local"):指定分区数为1
val conf = new SparkConf().setAppName("xxx").setMaster("local[*]")
//获取上下文
val sc = new SparkContext(conf)
//并行
//sc.parallelize(List(1,2,3,4))
//从集合(内存)中创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val rdd2: RDD[Int] = rdd.map(_*2)
//真正的开始执行
rdd2.collect().foreach(println)
//释放资源
sc.stop()
}
}
makeRDD方法底层其实就是parallelize(seq, numSlices),两者使用哪个都行
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
由以下代码可以看出默认分区数是totalCores(电脑/主机的总核数0),如果不指定makeRDD()的第二个参数时,会有一个默认的分区数,由下面代码注释可以看出,当指定了分区数,就按照自己指定的数字进行分区。
override def defaultParallelism(): Int =
//从配置文件中取分区数
//totalCores:总核数,和setMaster("local[*]")有关
scheduler.conf.getInt("spark.default.parallelism", totalCores)
//如果配置文件中没有配置,那么就使用totalCores
def getInt(key: String, defaultValue: Int): Int = {
getOption(key).map(_.toInt).getOrElse(defaultValue)
}
分区后,集合中的值到底存到下面的哪个分区,就是由下面的这块代码决定的
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
//numSlices:分区数
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
//start:起始位置
//end:结束位置
(start, end)
}
}
由外部的存储系统中读取数据创建RDD
包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark_RDD_file {
def main(args: Array[String]): Unit = {
//配置文件
val conf = new SparkConf().setAppName("xxx").setMaster("local[*]")
//获取上下文
val sc = new SparkContext(conf)
//从存储系统中读取数据
//底层是TextInputFormat,一行一行的读取数据
val lineRDD: RDD[String] = sc.textFile("input")
println("分区数:"+lineRDD.getNumPartitions)
//lineRDD.foreach(println)
lineRDD.saveAsTextFile("output")
//释放资源
sc.stop()
}
}
textFile方法,第二个参数minPartitions(最小分区数,有默认值)
def textFile(
path: String,
//最小分区数,分区数可能大于此值
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
defaultMinPartitions方法:由下面代码中的 math.min(defaultParallelism, 2)可以看出,defaultParallelism方法其实和makeRDD方法中求分区数的方法是同一个(上面代码已经给出),因此当指定分区数>=2,由下面代码算出来的最小分区其实都是2,当val conf = new SparkConf().setAppName("xxx").setMaster("local")时,分区数才是1。
注意:
- setMaster("local[*]")代表totalCores=电脑总核数
- setMaster("local")代表totalCores=电脑总核数=1
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
hadoopFile方法:
通过追踪hadoopFile方法,可以看出,textFile方法底层其实就用到了Hadoop的TextInputFormat类中的getSplits来进行切片,代码逻辑和Hadoop中的逻辑完全一样,具体有多少个分区和文件中的数据到底存到哪个分区就是由下面 getSplits方法中的代码决定的。
其中的getSpilts方法如下:
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
FileStatus[] files = listStatus(job);
// Save the number of input files for metrics/loadgen
job.setLong(NUM_INPUT_FILES, files.length);
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
//得到总大小,多个文件大小的和
totalSize += file.getLen();
}
//numSplits 分区数
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
//job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE 不在配置文件中指定的话,默认值就是1
// minSplitSize = 1
//所以 minSize = 1
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize(); //windows本地默认是32M,linux 128M
//切片大小
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
//每行的数据怎么存放?和读取每行的数据有关。假如切片大小为8,第一行的数据不够8,那么就接着读取第二行中的数据,依次类推
//够了就不读取下一行的数据了(这几行保存到同一个文件中)
//因此有可能造成有的文件中没有数据
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //是否大于1.1
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[] splitHosts = getSplitHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts));
}
} else {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
LOG.debug("Total # of splits: " + splits.size());
return splits.toArray(new FileSplit[splits.size()]);
}
网友评论