美文网首页
-0- Spark 基础概念

-0- Spark 基础概念

作者: emm_simon | 来源:发表于2020-05-15 17:58 被阅读0次

    [参考link]
    [参考link]
    [参考link]
    [参考link]
    [参考link]
    [参考link]
    [参考link]
    [参考link]

    -0- Spark概述

    Spark是一个计算引擎,专为大规模数据处理而设计,快速、通用。

    -1- SparkContext、sparkApp、Driver、Executor

    val conf = new SparkConf().setAppName("appName")
    implicit val sc: SparkContext = new SparkContext(conf)
    ...
    ...
    ...
    sc.stop()
    

    * SparkContext是整一个sparkApp的入口:
    通过SparkContext()初始化一个SparkContext实例,一个SC实例就是一个Spark应用;
    * 举个简单的sparkApp的例子:
    saprkSql读取hive表数据DataFrame相关数据过滤处理sparkSql将最终数据overwrite到hive表
    当然,在sparkApp里还可以进行hive表样本数据读取机器学习模型训练模型预测预测结果overwrite到hive表这种任务脚本;
    * 一个Spark应用程序,包含了一个Driver program和集群中多个Executor,Driver和Executor存在心跳机制确保存活;
    * Driver负责控制执行开发人员向spark引擎提交的应用脚本中main入口的代码,包括:
    创建SparkContext、创建 RDD、对RDD 的transformation操作和action操作等;
    * Executor负责运行组成 Spark 应用的任务,并将结果返回给Driver进程;Executor通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。RDD是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算

    -2- SparkSession、sparkSQL

    implicit val spark: SparkSession = SparkSession.builder()
                .config("hive.exec.dynamic.partition", "true")
                .config("hive.exec.dynamic.partition.mode", "nonstrict")
                .enableHiveSupport()
                .getOrCreate()
    val inputSql = s"""
                   |SELECT *
                   |FROM ${inputTable}
                   |WHERE date = '${date}'
                   |""".stripMargin
    println("inputSql :" + inputSql)
    val rawDF = spark.sql(inputSql)
    rawDF.show(10)
    

    * SparkSession是sparkSql的入口:
    通过SparkSession.builder()初始化一个spark对象,spark.sql()就可以进行跑SQL,完成从hive表取数、将数据overwrite到hive表的操作了。
    * SparkSession 其实是封装了SQLContextHiveContext,spark应用中有sparkSQL 操作的话必须创建一个 SQLContext 或者 HiveContext 的类实例,HiveContext继承自 SQLContext,用于处理 hive 中的数据。

    -3- DAG(Directed Acyclic Graph,有向无环图)、RDD(ResilienntDistributedDatasets,弹性分布式数据集)

    * spark的作业和任务调度系统是其核心,它能够有效的进行调度的根本原因是因为对任务划分DAG和容错。
    * spark处理数据时,会将计算转化为一个有向无环图(DAG)的任务集。
    * Spark中使用DAG对RDD的关系进行建模,描述了RDD的依赖关系,这种关系也被称之为lineage,RDD的依赖关系使用Dependency维护,参考Spark RDD之Dependency,DAG在Spark中的对应的实现为DAGScheduler。

    * RDD :弹性分布式数据集,只读的、分区(partition)记录的集合
    * RDD能够有效的恢复DAG中故障和慢节点执行的任务,并且RDD提供一种基于粗粒度变换的接口,记录创建数据集的“血统”,能够实现高效的容错性。
    * 初代rdd处于血统的顶层,记录任务所需的数据的分区信息,每个分区数据的读取方法
    * 子代rdd不真正的存储信息,只记录血统信息
    * 真正的数据读取,应该是task具体被执行的时候,触发action操作的时候才发生的

    -4- DataFrame、DataSet、Row

    * 在Spark中,DataFrame是一种以RDD为基础的分布式数据集;
    * DataFrame类似于传统数据库中的二维表格;
    * DataFrame与RDD的主要区别在于:
    DataFrame带有schema元信息,即DataFrame的每一列都带有名称和类型。
    这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。
    RDD所存数据元素的具体内部结构无从得知,Spark Core只能在stage层面进行简单、通用的流水线优化。

    -5- 算子:transformation()、action()

    RDD的三大特性:
    * 分布式存储 分布式存储在最大的好处是可以让数据在不同工作节点并行存储,以便在需要数据时并行运算。
    * 弹性 弹性指其在节点存储时,既可以使用内存,也可已使用外存,为使用者进行大数据处理提供方便。
    * 延迟计算 一个完整的RDD运行任务被分为两部分:Transformation和Action.

    hadoop计算引擎提供的接口只有map函数、reduce函数
    spark计算引擎提供的是mapreduce的扩展,提供两类操作:Transformation系列操作、Action系列操作

    -5.1- Transformation

    Transformation用于对RDD的创建,RDD只能使用Transformation创建,同时还提供大量操作方法,包括map,filter,groupBy,join等,RDD利用这些操作生成新的RDD,但是需要注意,无论多少次Transformation,在RDD中真正数据计算Action之前都不可能真正运行。

    -5.2- Action

    Action是数据执行部分,其通过执行count,reduce,collect等方法真正执行数据的计算部分。实际上,RDD中所有的操作都是Lazy模式进行,运行在编译中不会立即计算最终结果,而是记住所有操作步骤和方法,只有显示的遇到启动命令才执行。这样做的好处在于大部分前期工作在Transformation时已经完成,当Action工作时,只需要利用全部自由完成业务的核心工作。

    -5.3- Spark 的 20个Transformations 操作函数总结及举例
    function 说明 注释
    map(func) 将func函数作用到数据集的每个元素,生成一个新的分布式的数据集并返回
    filter(func) 选出所有func返回值为true的元素,作为一个新的数据集返回
    flatMap(func) 与map相似,但是每个输入的item能够被map到0个或者更多的items输出
    mapPartitions(func) 与map相似,但是mapPartitions的输入函数单独作用于RDD的每个分区(block)上,因此func的输入和返回值都必须是迭代器iterator
    mapPartitionsWithIndex(func) 与mapPartitions相似,但是输入函数func提供了一个正式的参数,可以用来表示分区的编号
    sample(withReplacement, fraction, seed) 从数据中抽样,withReplacement表示是否有放回,withReplacement=true表示有放回抽样,fraction为抽样的概率(0<=fraction<=1),seed为随机种子
    union(otherDataset) 并集操作,将源数据集与union中的输入数据集取并集,默认保留重复元素
    intersection(otherDataset) 交集操作,将源数据集与union中的输入数据集取交集,并返回新的数据集
    distinct([numTasks]) 去除数据集中的重复元素
    groupByKey([numTasks]) 作用于由键值对(K, V)组成的数据集上,将Key相同的数据放在一起,返回一个由键值对(K, Iterable)组成的数据集
    reduceByKey(func, [numTasks]) 作用于键值对(K, V)上,按Key分组,然后将Key相同的键值对的Value都执行func操作,得到一个值
    aggregateByKey(zeroValue, seqOp, comOp, [numTasks]) 在于键值对(K, V)的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seqOp函数的参数,进行计算,返回的结果作为一个新的键值对(K, V)
    sortByKey([ascending=True], [numTasks]) 按照Key进行排序,ascending的值默认为True,True/False表示升序还是降序
    join(otherDataset, [numTasks]) 类似于SQL中的连接操作,即作用于键值对(K, V)和(K, W)上,返回元组 (K, (V, W)),spark也支持外连接,包括leftOuterJoin,rightOuterJoin和fullOuterJoin
    cogroup(otherDataset, [numTasks]) 作用于键值对(K, V)和(K, W)上,返回元组 (K, (Iterable, Iterable))
    cartesian(otherDataset) 笛卡尔乘积,作用于数据集T和U上,返回(T, U),即数据集中每个元素的两两组合
    pipe(command, [envVars]) 将驱动程序中的RDD交给shell处理(外部进程),例如Perl或bash脚本
    coalesce(numPartitions) 将RDD的分区数减小到numPartitions个
    repartition(numPartitions) 重组数据,数据被重新随机分区为numPartitions个,numPartitions可以比原来大,也可以比原来小,平衡各个分区
    repartitionAndSortWithinPartitions(partitioner) 根据给定的partitioner函数重新将RDD分区,并在分区内排序
    -5.4- Spark 的 12个Actions 操作函数总结及举例
    function 说明 注释
    reduce(func) 对数据集中的元素做聚集操作
    collect() 以数组形式返回数据集中所有的元素
    count() 返回数据集中元素的个数
    first() 返回数据集中的第一个元素
    take() 以数组形式返回数据集中前n个元素
    takeSample(withReplacement, num, [seed]) 以数组形式返回从数据集中抽取的样本数量为num的随机样本,有替换或者无替换的进行采样
    takeOrdered(n, [ordering]) 返回RDD的前n个元素,可以利用自然顺序或者由用户执行排序的comparator
    saveAsTextFile(path) 将数据集中的元素以文本文件(或者文本文件的一个集合)的形式写入本地文件系统,或者HDFS,或者其他Hadoop支持的文件系统的指定路径path下。Spark会调用每个元素的toString方法,将其转换为文本文件中的一行
    saveAsSequenceFile(path) 将数据集中的元素以Hadoop SequenceFile的形式写入本地文件系统,或者HDFS,或者其他Hadoop支持的文件系统的指定路径path下。RDD的元素必须由实现了Hadoop的Writable接口的key-value键值对组成。
    saveAsObjectFile(path) 利用Java序列化,将数据集中的元素以一种简单的形式进行写操作,并能够利用SparkContext.objectFile()加载数据
    countByKey() 只能作用于键值对(K, V)形式的RDDs上。按照Key进行计数,返回键值对(K, int)的哈希表
    foreach(func) 在数据集的每个元素上调用函数func

    -6- cache

    -7- Job、Stage、Task、Spark Web UI

    * 一个action算子触发一个job
    * 一个job中有好多的task,task是执行job的逻辑单元(猜测是根据partition划分任务)
    * 一个job根据是否有shuffle发生可以分为好多的stage

    -8- 参数配置

    --conf spark.executor.instances=5 
    --conf spark.executor.cores=8 
    --conf spark.executor.memory=80G
    

    相关文章

      网友评论

          本文标题:-0- Spark 基础概念

          本文链接:https://www.haomeiwen.com/subject/eohhohtx.html