Spark RDD 分布式弹性数据集
rdd是一种弹性分布式的数据集,它代表着不可变的数据元素,可以被分区并行处理。
rdd是一个粗粒度的数据生成方式和流转迭代计算方式的描述。它可以通过稳定的存储器或者从其他RDD生成,它并不需要急着进行转换,只需要在特定的rdd进行一次性的数据的迭代流转。rdd记录着自己的依赖关系,以防在数据丢失时可以通过“血缘”关系再次生成数据。用户也可以自己选择在经常重用的rdd进行数据落地,放置丢失后重做。
rdd的特性总结:
- 显式抽象。将运算中的数据集进行显式抽象,定义了其接口和属性。由于数据集抽象的统一,从而可以将不同的计算过程组合起来进行统一的 DAG 调度。
- 基于内存。相较于 MapReduce 中间结果必须落盘,RDD 通过将结果保存在内存中,从而大大降低了单个算子计算延迟以及不同算子之间的加载延迟。
- 宽窄依赖。在进行 DAG 调度时,定义了宽窄依赖的概念,并以此进行阶段划分,优化调度计算。
- 谱系容错。主要依赖谱系图计算来进行错误恢复,而非进行冗余备份,因为内存实在是有限,只能以计算换存储了。
- 交互查询。修改了 Scala 的解释器,使得可以交互式的查询基于多机内存的大型数据集。进而支持类 SQL 等高阶查询语言。
RDD与共享内存的比较
分布式的共享内存是一种细粒度的读写,可以对每个存储单元进行读写,其一致性需要程序进行维护,其容错性需要设置检查点和程序回滚。但是RDD由于是不可变的粗粒度的读写,更适合于批量读写的任务,其可以使用“血缘”机制恢复数据,减少了设置检查点的开销。如果出现失败时,也只用重新计算分区中丢失的那一部分。另一方面,RDD的不可变性可以让系统可以像mapreduce一样采用后备任务的方式来代替运行缓慢的任务,不会出现相互影响的情况。
另外rdd也吸取了分布式共享内存的特性,rdd的批量操作可以根据数据所处的位置进行优化,提高性能。加载数据时,当内存不足时,rdd的性能下降是平稳的,不能载入内存的分区可以存储在磁盘上。
RDD 源码
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
// rdd维护这sparkContext上下文环境和rdd的依赖关系
private def sc: SparkContext = {
if (_sc == null) {
throw new SparkException(...)
}
_sc
}
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
// 1. 实现compute函数实现rdd每个分区的迭代计算
// Implemented by subclasses to compute a given partition.
def compute(split: Partition, context: TaskContext): Iterator[T]
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
// 2. 每个rdd都有一个或多个分区
protected def getPartitions: Array[Partition]
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
// 3. rdd维护这自己的依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
// 4. rdd可以根据数据的位置优化分区的计算位置,提高效率
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/** Optionally overridden by subclasses to specify how they are partitioned. */
// 5. rdd都有自己的分区器,表明他们分区的方式
@transient val partitioner: Option[Partitioner] = None
}
上面的5点是rdd都会实现的接口,这也是rdd都具有的特性。
1. RDD 分区
如上源码所示,RDD提供了分区的抽象函数,即protected def getPartitions: Array[Partition],每个继承RDD抽象类的RDD都会有自己的getPartitions的实现。RDD分区的多少代表着计算时的并发粒度。
用户可以自己指定执行的分区数,如果用户不自己指定,则使用默认的分区数。
-
ParallelCollectionRDD
Untitled.png
从图中看出,通过sparkContext的parallelize从集合生成RDD, 生成的是ParallelCollectionRDD,而partitions.length 分区数为8。
Untitled.png
在生成RDD传入设置分区大小为3,最后生成的分区数为3。
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
TaskSchedulerImpl类:
override def defaultParallelism(): Int = backend.defaultParallelism()
CoarseGrainedSchedulerBackend类:
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
从源码中可以看出,如果不传入分区数,则默认分区数为defaultParallelism,而defaultParallelism=math.max(totalCoreCount.get(), 2)所以最小是2,最大是主机核数。
- HadoopRDD
HadoopRDD是读取hdfs文件的rdd。HadoopRDD使用的是MapReduce API。
spark.sparkContext.textFile("hdfs://user/local/admin.text") 中textFile是读取hdfs文件的方法。其中会调用HadoopRDD。
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
try {
// 从这里可以看出allInputSplits来自getSplits
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
// 分区数为inputSplits.size
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
} catch {
...
}
}
FileInputFormat类:
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
...
long totalSize = 0L;
FileStatus[] arr$ = files;
int len$ = files.length;
// totalSize 为所有文件的总字节数
for(int i$ = 0; i$ < len$; ++i$) {
FileStatus file = arr$[i$];
if (file.isDirectory()) {
throw new IOException("Not a file: " + file.getPath());
}
totalSize += file.getLen();
}
// 对totalSize切分后的字节数goalSize,numSplits默认为min(2,totalCores)=2
long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
// minSize为配置的mapreduce切分值
long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);
....
for(int i$ = 0; i$ < len$; ++i$) {
....
else {
long blockSize = file.getBlockSize();
long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining;
String[][] splitHosts;
// 如果在切片大小1.1倍内的都不会被切分
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
}
if (bytesRemaining != 0L) {
splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
}
}
}
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis());
}
return (InputSplit[])splits.toArray(new FileSplit[splits.size()]);
}
protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
textFile 是从HDFS分布式文件系统的所有节点上读取数据,返回Strings的RDD。
总结下HadoopRDD分区规则:
1.如果textFile指定分区数量为0或者1的话,defaultMinPartitions值为1,则有多少个文件,就会有多少个分区。
2.如果不指定默认分区数量,则默认分区数量为2,则会根据所有文件字节大小totalSize除以分区数量,得到的值goalSize,然后比较goalSize和hdfs指定分块大小(这里是128M)作比较,以较小的最为goalSize作为切分大小,对每个文件进行切分,若文件大于大于goalSize,则会生成该(文件大小/goalSize)个分区,如果文件内的数据不能除尽则分区数会+1,则为(fileSize/goalSize)+1。
3.如果指定分区数量大于等于2,则默认分区数量为指定值,生成实际分区数量规则任然同2中的规则一致。
总之:文件总大小除以分区数,大于分块大小,则与分块大小相关,否则以得到的商相关。
2. RDD 优先位置
rdd优先位置返回的是每一个分区的位置信息,按照移动计算的思路,将计算尽量分配到数据所在的机器上。
override def getPreferredLocations(split: Partition): Seq[String] = {
val hsplit: InputSplit = split.asInstanceOf[HadoopPartition].inputSplit.value
val locs = hsplit match {
case lsplit: InputSplitWithLocationInfo =>
HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo)
case _ => None
}
// 返回ip不为localhost的列表
locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}
3. RDD 依赖关系
RDD的操作是粗粒度的操作,RDD进行转换会形成新的RDD。形成的RDD和原RDD形成依赖关系,RDD通过这种“血缘”关系来维护数据的容错性。RDD的依赖关系可以分为宽依赖和窄依赖两种。
- 窄依赖:父RDD的每一个分区都只被一个子RDD的一个分区依赖。即是一对一的过程,当然也可以是多对一的过程(coalesce() 缩减分区数量)。总之,就是只要不发生shuffle过程,就可以归结为窄依赖的关系。窄依赖的RDD直接可以直接归结为一个pipeline, 分区内的计算可以发生在一台机器上,多个分区可以并发的执行,上一个rdd的分区计算完成后,将结果缓存在内存中,子RDD可以直接使用。其次,窄依赖的明确依赖关系,可以在数据发生错误后,只重新计算发生错误的一个分区。
- 宽依赖:父RDD的分区被子RDD的多个分区依赖。即多对多的关系,其中由于一个父RDD需要将数据分发到子RDD的多个分区中,(不同分区可能在不同的机器上)所以需要发生数据的读写(shuffle过程)。宽依赖反生数据错误后,需要重新计算多个分区。
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val listRDD = spark.sparkContext.parallelize(1 until n, slices)
val mapRDD = listRDD.map(i => (i, i * i))
val groupRDD = mapRDD.groupByKey()
Untitled.png
从中可以看出mapRDD是OneToOneDependency依赖,其父RDD为ParallelCollectionRDD。
Untitled.png
从中可以看出groupRDD的依赖是ShuffleDependency依赖,其父依赖是MapPartitionsRDD。而groupbykey是需要进行shuffle的算子,属于宽依赖。
Spark通过创建的类来表明,RDD间的依赖关系的类型,NarrowDependency属于窄依赖,ShuffleDenpendency属于宽依赖。之后会通过一节来具体介绍其中的细节。
Untitled.png
4. RDD 分区计算
从上面的RDD源码可以发现,每个RDD中都存在一个compute()的函数,这个函数的作用就是为实现RDD具体的分区计算。
def compute(split: Partition, context: TaskContext): Iterator[T]
compute的返回值是分区的迭代器,每一个分区都会调用这个函数。只有到action算子才会真正的执行计算。
5. RDD 分区函数
partitioner指的是Spark的分区函数,目前最常用的有两种,HashPartitioner和RangePartitioner, 其次还有缩减分区数的分区函数CoalescedPartitioner。分区这个概念,只存在于(K,V)键值对的RDD中,非键值对的RDD中partitioner为None。
Untitled.png
分区函数即决定了RDD本身分区的数量,也决定了Shuffle中MapOut输出中每个分区进行切割的依据。
Untitled.png
从上图可以看出,非k-v RDD的分区器为None, k-v RDD的分区函数默认为HashPartitioner。
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
...
// 如果存在配置并发配置,将并发配置作为默认分区数,否则上个rdd的最大值
val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
rdd.context.defaultParallelism
} else {
rdds.map(_.partitions.length).max
}
// If the existing max partitioner is an eligible one, or its partitions number is larger
// than the default number of partitions, use the existing partitioner.
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
hasMaxPartitioner.get.partitioner.get
} else {
// 默认返回HashPartitioner
new HashPartitioner(defaultNumPartitions)
}
}
HashPartitioner会对数据的key进行 key.hascode%numpartitions 计算,得到的数值会放到对应的分区中,这样能较为平衡的分配数据到partition。
RangePartitioner:它是在排序算子中会用到的分区器,比如sortbykey、sortby、orderby等。该分区器先对输入的数据的key做采样,来估算Key的分布,然后按照指定的排序切分range,尽量让每个partition对应的range里的key分布均匀。
RDD 基本转换
rdd中的算子可以分为两种,一个是transformation, 一个是action算子。
1. Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。
2. Action:行动算子,这类算子会触发SparkContext提交Job作业。
网友评论