美文网首页大数据,机器学习,人工智能大数据 爬虫Python AI Sql玩转大数据
Spark学习笔记三《图解Spark核心技术与案例实战》阅读之R

Spark学习笔记三《图解Spark核心技术与案例实战》阅读之R

作者: BitGuo | 来源:发表于2019-10-29 16:40 被阅读0次

    之前的集群容错处理模型比如MapReduce,Dryad等,都是将计算转换为一个DAG,是的模型能有效的恢复DAG中的故障和慢节点执行的任务,但是没有提供除了文件系统之外的其他存储方式,导致在网络上要频繁的数据复制,造成IO压力。
    RDD提供一种粗粒度的变换如(map,filter,join等)接口,这些接口将相同的操作应用到多个数据集上,这样便使得它们可以记录创建数据集的血统 Lineage,而不需要存储真正的数据,从而达到高容错性。

    Spark之RDD类型

    Spark开发者首先需要编写一个Driver程序来连接到各个Worker节点,Driver定义一个或多个RDD以及相关的行动操作,driver同时记录RDD的继承关系即血统,而worker是一直运行的进程,它将经过一系列操作后的RDD分区数据保存在内存中。
    Spark中的操作大致分为4类,(Spark其实只划分了2类,但是这本书的作者认为存在4类)。
    前两种是官方文档定义的两类操作:

    • 转换 transformations,将RDD通过一定的变化操作变换为新的RDD(转换操作是惰性操作 lazy,并不立即执行计算,在我的上一篇笔记有介绍到)。
    • 行动操作 actions,能够触发Spark运行的操作,如reduce,collect等。Spark中行动操作分为两类,一类的操作结果变成Scala集合或者变量,另一类将RDD保存在外部文件系统或数据库中。
    • 创建操作 ,即创建RDD,分为外部创建和内部创建,上篇笔记有介绍不赘述。
    • 控制操作 ,对RDD持久化的操作,让RDD按不同的存储策略保存在磁盘或者内存。(在官方文档有介绍,又一个storage level,不同的存储层级对应不同的存储方案)。

    Spark之RDD实现

    作业调度
    当对RDD进行转换操作时,调度器根据RDD血统来构建调度阶段stage,划分的依据是RDD之间的宽依赖窄依赖,同一个stage中只存在窄依赖,遇到宽依赖则要切割成前后两个stage。

    Spark的Scala解析器

    也就是spark的shell
    Scala的shell解析器处理过程一般为:

    1. 将用户的每一行变异为一个类
    2. 将该类载入到JVM中去
      例如用户在第一行输入以下:
    var x =5
    

    shell将会定义一个叫做Line1的类,该类包含x。

    1. 调用该类的某个函数,在该类中包含一个单例对象,对象中包含当前行的变量或函数,在初始化方法中包含处理该行的代码。
      例如用户在第二行输入以下:
    println(x)
    

    该行被编译为println(Lineq.getInstance().x)

    RDD内存管理

    Spark的三种RDD持久化策略

    • 未序列化的Java对象直接存在JVM虚拟机内存中(性能最优)
    • 序列化的数据存于内存(组织方式更良好,牺牲一定的性能)
    • 序列化的数据存于磁盘(适用于RDD太大的情况)

    Spark对内存使用LRU的回收算法。

    CheckPoint

    血统提供了很方便的RDD错误恢复机制,但是当血统链太长的时候,恢复耗时长。通常情况下,会对包含宽依赖的长血统设置checkpoint

    RDD的转换操作 transformations

    基础操作

    • map
    • distinct
    • flatMap

    重新分区

    • coalesce

    • repartition

    • randomSplit(weights:Array[Double],seed:Long=Utils.random.nextLong):Array[RDD[T]] 根据权重将RDD分隔为多个RDD,权重大的被分配到的几率就大。

    • glom():RDD[Array[T]],将原来的RDD按分区数量分隔为若干个数组

    • mapPartitions[U](f:(Iterator[T])=>Iterator[U],preserverPartioning Boolean = false):
      和map类似,不过参数由原来的RDD的每一个元素变成了RDD的一个迭代器
      还有若干操作。。。。

    RDD的行动操作 actions

    • first():返回RDD的第一个元素,不排序
    • count():Long表示返回RDD中的元素的个数
    • reduce(f:(T,T)=>T):T 根据映射函数f,对RDD中元素进行二元计算
    • collect():Array[T] 表示将RDD转换为数组
    • take(num:Int):Array[T] 表示获取RDD中从0到num-1的元素,不排序
    • top(num:Int),按照降序获取前num个元素
    • aggregate和fold 聚合用的
      *lookup(Key:K):Seq[V] lookup用于(K,V)类型的RDD,指定K值返回RDD中该K对于的所有的V值。
    • countByKey():Map[K,Long]
    • foreach(f:(T)=>Unit):Unit (foreach只会在executor端有效,对Driver端无效)
    • foreachPartition(f:(Iterator[T]=>Unit):Unit
      *sortBy[K](f:(T)=>K,ascending:Boolean = true,numPartitions:Int=this.partitions.length):RDD[T]
      存储行动
    • saveAsTextFile(path:String):Unit
    • saveAsSequenceFile
    • saveAsObjectFile 将RDD中的元素序列化为对象

    在spark shell中使用saveAsTextFile将一个rdd对象存入hdfs中,然后查看。




    保存的形式是一个文件夹,里面包含三个文件,一个_SUCCESS文件,两个part文件,因为我们的在建立rdd1的时候是指定的两个分区。


    相关文章

      网友评论

        本文标题:Spark学习笔记三《图解Spark核心技术与案例实战》阅读之R

        本文链接:https://www.haomeiwen.com/subject/omdwvctx.html