什么是RDD
RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,
它代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。
RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,
后续的查询能够重用工作集,这极大地提升了查询速度。
RDD的属性
1、一组分片(Partition),即数据集的基本组成单位。
对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。
用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。
默认值就是程序所分配到的CPU Core的数目。
2、一个计算每个分区的函数。
Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。
compute函数会对迭代器进行复合,不需要保存每次计算的结果。
3、RDD之间的依赖关系。
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。
在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
4、一个Partitioner,即RDD的分片函数。
当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,
另外一个是基于范围的RangePartitioner。
只有对于key-value的RDD,才会有Partitioner,
非key-value的RDD的Parititioner的值是None。
Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
5、一个列表,存储存取每个Partition的优先位置(preferred location)。
对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。
按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
生成rdd的两种方式
#cd training/spark/bin
# ./spark-shell --master spark://hadoop21:7077 --executor-memory 512m --total-executor-cores 2
scala> val rdd1 = sc.textFile("hdfs://192.168.56.21:9000/wc")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.56.21:9000/wc MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd1.collect
res0: Array[String] = Array(hello tom, hello jerry, hello tom, hello kitty, hello tom, hello jerry, hello tom, hello jerry, hello lilei, hello hanmeimei, hello tom, hello tom, hello jerry, hello tom, hello tom, hello jerry, hello lilei, hello hanmeimei, hello tom, hello tom, hello jerry, hello tom)
scala> val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> rdd2.collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)
网友评论