47.spark

作者: 文茶君 | 来源:发表于2020-03-09 11:46 被阅读0次

    spark
    spark是一种计算框架,比MapReduce快一些
    开源计算框架。运行和编写都很快
    可以在很多环境运行
    hadoop,apache mesos kubernetes,standalone or in the cloud
    基于内存,快
    可以用java,scala,Python(功能不完善,缺少包)和r和sql



    Spark与MapReduce的区别
    都是分布式计算框架,Spark基于内存,MR基于HDFS。Spark处理数据的能力一般是MR的十倍以上,Spark中除了基于内存计算外,还有DAG有向无环图来切分任务的执行先后顺序。

    ########注意:安装时路径不要带空格或者中文

    2.spark核心概念RDD

    1.RDD
    概念
    RDD(Resilient Distributed Dateset),弹性分布式数据集。

    RDD的五大特性:
    1.RDD是由一系列的partition组成的。
    2.函数(算子)是作用在每一个partition(split)上的。
    3.RDD之间有一系列的依赖关系。
    4.分区器是作用在K,V格式的RDD上。
    5.RDD提供一系列最佳的计算位置。利于数据处理的本地化。“计算向数据移动”

    问题引入
    1spark读取hdfs中数据的方法 textfile底层是调用的MR读取HDFS的方法,首先会split,每个split对应一个block。每个split对应生成RDD的每个partition
    2.什么是K,V格式的RDD?
    RDD中的数据是一个个的tuple2数据,那么这个RDD就是KV格式的RDD
    3.哪里体现了RDD的弹性(容错)
    1)RDD之间有依赖关系
    2)RDD的partion可多可少
    4.哪里体现RDD的分布式
    RDD的paartion是分布在多个节点上的
    5.Spark运行模式

    • Local
      多用于本地测试,如在eclipse,idea中写程序测试等。
    • Standalone
      Standalone是Spark自带的一个资源调度框架,它支持完全分布式。
    • Yarn
      Hadoop生态圈里面的一个资源调度框架,Spark也是可以基于Yarn来计算的。
    • Mesos
      资源调度框架。
      要基于Yarn来进行资源调度,必须实现AppalicationMaster接口,Spark实现了这个接口,所以可以基于Yarn。

    以上图中有四个机器节点,Driver和Worker是启动在节点上的进程,运行在JVM中的进程。

    • Driver与集群节点之间有频繁的通信。
    • Driver负责任务(tasks)的分发和结果的回收。任务的调度。如果task的计算结果非常大就不要回收了。会造成oom。
    • Worker是Standalone资源调度框架里面资源管理的从节点。也是JVM进程。
    • Master是Standalone资源调度框架里面资源管理的主节点。也是JVM进程。

    3.Spark代码流程

    1.创建SparkConf对象
    可以设置Application name。
    可以设置运行模式及资源需求。
    2.创建SparkContext对象
    3.基于Spark的上下文创建一个RDD,对RDD进行处理。
    4.应用程序中要有Action类算子来触发Transformation类算子执行。
    5.关闭Spark上下文对象SparkContext。
    4.Transformations转换算子
    概念:
    Transformations类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。
    Action触发执行
    Action也是一类算子(函数)

    Transformation类算子:

    filter
    过滤符合条件的记录数,true保留,false过滤掉。
    map
    将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
    特点:输入一条,输出一条数据。
    flatMap
    先map后flat。与map类似,每个输入项可以映射为0到多个输出项。
    sample
    随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。
    reduceByKey
    将相同的Key根据相应的逻辑进行处理。
    sortByKey/sortBy
    作用在K,V格式的RDD上,对key进行升序或者降序排序。

    5.Action行动算子
    概念:
    Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。
    Action类算子

    count
    返回数据集中的元素数。会在结果计算完成后回收到Driver端。
    take(n)
    返回一个包含数据集前n个元素的集合。

    first
    first=take(1),返回数据集中的第一个元素。
    foreach
    循环遍历数据集中的每个元素,运行相应的逻辑。
    collect
    将计算结果回收到Driver端。

    java算子
    和Scala类似

    总结:
    sparkcore
    spark:基于内存的计算框架
    与MR区别
    基于内存迭代计算,MR是基于磁盘迭代计算
    spark中有DAG有向无环图
    MR中只有map和reduce,相当于spark中两个算子(map和reducebykey)spark中有各种算子应对不同场景
    spark技术栈:HDFS MR Yarn ,hive
    sparkcore sparksql sparkstreaming
    spark运行模式:local 多用于本地测试,一般在eclipse,idea中运行使用local模式
    standalone:spark自带的资源调度框架,支持分布式框架
    yarn:hadoop生态圈中资源调度框架,spark可以基于yarn运行
    mesos:资源调度框架
    spark核心rdd:RDD弹性分布式数据集
    rdd内其实是不存数据的,partion也是不存数据的

    rdd五大特性
    1rdd是由partion组成
    2算子(函数)作用在partion上的
    3rdd之间有依赖关系
    4分区器是作用在KV格式的RDD上
    5partion对外提供最佳计算位置,利于数据处理的本地化
    注意textfile底层读取hdfs文件的方法底层调用的是mr读取hdfs文件的方法,首先split,每个split对应一个block,每个split对应一个partion
    什么是KV格式的rdd
    RDD中数据是一个个的二元组
    哪里体现了rdd的弹性(容错)
    rdd的分区可多可少
    rdd之间有依赖关系
    rdd的partion是分布在多个节点上的

    spark代码流程:
    val conf=new SparkConf().setAppName...setMaster
    val sc=new SparkContext(conf)
    由sc得到rdd
    对rdd使用transformation类算子进行转换
    对rdd使用adction算子触发transformations类算子执行
    sc.stop()

    算子
    转换算子transformation 懒执行,需要action触发执行
    行动算子action 触发transformation类算子执行,一个application中有一个action算子就有一个job
    count结果会拿到driver端
    collect将结果拿回driver端
    持久化算子
    RDD的持久化
    cache默认将数据存储在内存中
    persist可以手动指定持久化级别
    cache()=persist()=persist(StorageLevel.MEORY_ONLY)
    尽量少使用DISK_ONLY级别
    checkpoint:将数据直接持久化到指定的目录,当lineage计算非常复杂,可以尝试使用checkpoint,checkpoint还可以切断rdd的依赖关系
    特殊场景使用checkpoint:对rdd使用checkpoint要谨慎使用
    checkpoint要指定目录,可以将数据持久化到指定的目录中,当application执行完成后,这个目录中的数据不会被清除
    checkpoint的执行流程:1.当spark执行完成之后,spark会从后往前回溯,找到checkpointRDD做标记
    2.回溯完成之后,spark会重新启动一个job,计算标记的RDD的数据,放入指定的checkpoint目录中
    3.数据计算完成,放入目录之后,会切断RDD的依赖关系。当spaarkapplication执行完成之后,数据目录中的数据不会被清除
    优化:对那个rdd进行checkpoint,最好先cache下,这样回溯完成后再计算这个checkpointRDD数据的时候可以直接在内存中拿到放指定的目录

    cache和persist的注意事项:
    1.cache和persistc,heckpoint都是懒执行,必须有一个action类算子触发执行。最小持久化单位是partion
    2.cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
    3.cache和persist算子后不能立即紧跟action算子。
    4.cache和persist算子持久化的数据当applilcation执行完成之后会被清除。
    错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。

    相关文章

      网友评论

          本文标题:47.spark

          本文链接:https://www.haomeiwen.com/subject/hzjvrhtx.html