之前的集群容错处理模型比如MapReduce,Dryad等,都是将计算转换为一个DAG,是的模型能有效的恢复DAG中的故障和慢节点执行的任务,但是没有提供除了文件系统之外的其他存储方式,导致在网络上要频繁的数据复制,造成IO压力。
RDD提供一种粗粒度的变换如(map,filter,join等)接口,这些接口将相同的操作应用到多个数据集上,这样便使得它们可以记录创建数据集的血统 Lineage,而不需要存储真正的数据,从而达到高容错性。
Spark之RDD类型
Spark开发者首先需要编写一个Driver程序来连接到各个Worker节点,Driver定义一个或多个RDD以及相关的行动操作,driver同时记录RDD的继承关系即血统,而worker是一直运行的进程,它将经过一系列操作后的RDD分区数据保存在内存中。
Spark中的操作大致分为4类,(Spark其实只划分了2类,但是这本书的作者认为存在4类)。
前两种是官方文档定义的两类操作:
- 转换 transformations,将RDD通过一定的变化操作变换为新的RDD(转换操作是惰性操作 lazy,并不立即执行计算,在我的上一篇笔记有介绍到)。
- 行动操作 actions,能够触发Spark运行的操作,如reduce,collect等。Spark中行动操作分为两类,一类的操作结果变成Scala集合或者变量,另一类将RDD保存在外部文件系统或数据库中。
- 创建操作 ,即创建RDD,分为外部创建和内部创建,上篇笔记有介绍不赘述。
- 控制操作 ,对RDD持久化的操作,让RDD按不同的存储策略保存在磁盘或者内存。(在官方文档有介绍,又一个storage level,不同的存储层级对应不同的存储方案)。
Spark之RDD实现
作业调度
当对RDD进行转换操作时,调度器根据RDD血统来构建调度阶段stage,划分的依据是RDD之间的宽依赖窄依赖,同一个stage中只存在窄依赖,遇到宽依赖则要切割成前后两个stage。
Spark的Scala解析器
也就是spark的shell
Scala的shell解析器处理过程一般为:
- 将用户的每一行变异为一个类
- 将该类载入到JVM中去
例如用户在第一行输入以下:
var x =5
shell将会定义一个叫做Line1的类,该类包含x。
- 调用该类的某个函数,在该类中包含一个单例对象,对象中包含当前行的变量或函数,在初始化方法中包含处理该行的代码。
例如用户在第二行输入以下:
println(x)
该行被编译为println(Lineq.getInstance().x)
RDD内存管理
Spark的三种RDD持久化策略
- 未序列化的Java对象直接存在JVM虚拟机内存中(性能最优)
- 序列化的数据存于内存(组织方式更良好,牺牲一定的性能)
- 序列化的数据存于磁盘(适用于RDD太大的情况)
Spark对内存使用LRU的回收算法。
CheckPoint
血统提供了很方便的RDD错误恢复机制,但是当血统链太长的时候,恢复耗时长。通常情况下,会对包含宽依赖的长血统设置checkpoint,
RDD的转换操作 transformations
基础操作
- map
- distinct
- flatMap
重新分区
-
coalesce
-
repartition
-
randomSplit(weights:Array[Double],seed:Long=Utils.random.nextLong):Array[RDD[T]] 根据权重将RDD分隔为多个RDD,权重大的被分配到的几率就大。
-
glom():RDD[Array[T]],将原来的RDD按分区数量分隔为若干个数组
- mapPartitions[U](f:(Iterator[T])=>Iterator[U],preserverPartioning Boolean = false):
和map类似,不过参数由原来的RDD的每一个元素变成了RDD的一个迭代器
还有若干操作。。。。
RDD的行动操作 actions
- first():返回RDD的第一个元素,不排序
- count():Long表示返回RDD中的元素的个数
- reduce(f:(T,T)=>T):T 根据映射函数f,对RDD中元素进行二元计算
- collect():Array[T] 表示将RDD转换为数组
- take(num:Int):Array[T] 表示获取RDD中从0到num-1的元素,不排序
- top(num:Int),按照降序获取前num个元素
- aggregate和fold 聚合用的
*lookup(Key:K):Seq[V] lookup用于(K,V)类型的RDD,指定K值返回RDD中该K对于的所有的V值。 - countByKey():Map[K,Long]
- foreach(f:(T)=>Unit):Unit (foreach只会在executor端有效,对Driver端无效)
- foreachPartition(f:(Iterator[T]=>Unit):Unit
*sortBy[K](f:(T)=>K,ascending:Boolean = true,numPartitions:Int=this.partitions.length):RDD[T]
存储行动 - saveAsTextFile(path:String):Unit
- saveAsSequenceFile
- saveAsObjectFile 将RDD中的元素序列化为对象
在spark shell中使用saveAsTextFile将一个rdd对象存入hdfs中,然后查看。
保存的形式是一个文件夹,里面包含三个文件,一个_SUCCESS文件,两个part文件,因为我们的在建立rdd1的时候是指定的两个分区。
网友评论