[参考link]
[参考link]
[参考link]
[参考link]
[参考link]
[参考link]
[参考link]
[参考link]
-0- Spark概述
Spark是一个计算引擎,专为大规模数据处理而设计,快速、通用。
-1- SparkContext、sparkApp、Driver、Executor
val conf = new SparkConf().setAppName("appName")
implicit val sc: SparkContext = new SparkContext(conf)
...
...
...
sc.stop()
*
SparkContext是整一个sparkApp
的入口:
通过SparkContext()
初始化一个SparkContext实例,一个SC实例就是一个Spark应用;
*
举个简单的sparkApp的例子:
saprkSql读取hive表数据
→DataFrame相关数据过滤处理
→sparkSql将最终数据overwrite到hive表
;
当然,在sparkApp里还可以进行hive表样本数据读取
→机器学习模型训练
→模型预测
→预测结果overwrite到hive表
这种任务脚本;
*
一个Spark应用程序,包含了一个Driver program和集群中多个Executor,Driver和Executor存在心跳机制确保存活;
*
Driver
负责控制执行开发人员向spark引擎提交的应用脚本中main入口
的代码,包括:
创建SparkContext、创建 RDD、对RDD 的transformation操作和action操作等;
*
Executor
负责运行组成 Spark 应用的任务,并将结果返回给Driver进程;Executor通过自身的块管理器(Block Manager)
为用户程序中要求缓存的RDD提供内存式存储
。RDD是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算
。
-2- SparkSession、sparkSQL
implicit val spark: SparkSession = SparkSession.builder()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate()
val inputSql = s"""
|SELECT *
|FROM ${inputTable}
|WHERE date = '${date}'
|""".stripMargin
println("inputSql :" + inputSql)
val rawDF = spark.sql(inputSql)
rawDF.show(10)
*
SparkSession是sparkSql
的入口:
通过SparkSession.builder()
初始化一个spark对象,spark.sql()就可以进行跑SQL,完成从hive表取数、将数据overwrite到hive表的操作了。
*
SparkSession 其实是封装了SQLContext
和HiveContext
,spark应用中有sparkSQL 操作的话必须创建一个 SQLContext 或者 HiveContext 的类实例,HiveContext继承自 SQLContext,用于处理 hive 中的数据。
-3- DAG(Directed Acyclic Graph,有向无环图)、RDD(ResilienntDistributedDatasets,弹性分布式数据集)
*
spark的作业和任务调度系统是其核心,它能够有效的进行调度的根本原因是因为对任务划分DAG和容错。
*
spark处理数据时,会将计算转化为一个有向无环图(DAG)的任务集。
*
Spark中使用DAG对RDD的关系进行建模,描述了RDD的依赖关系,这种关系也被称之为lineage,RDD的依赖关系使用Dependency维护,参考Spark RDD之Dependency,DAG在Spark中的对应的实现为DAGScheduler。
*
RDD :弹性分布式数据集,只读的、分区(partition)记录的集合
*
RDD能够有效的恢复DAG中故障和慢节点执行的任务,并且RDD提供一种基于粗粒度变换的接口,记录创建数据集的“血统”,能够实现高效的容错性。
*
初代rdd
处于血统的顶层,记录任务所需的数据的分区信息,每个分区数据的读取方法
*
子代rdd
不真正的存储信息,只记录血统信息
*
真正的数据读取
,应该是task具体被执行的时候,触发action操作的时候才发生的
-4- DataFrame、DataSet、Row
*
在Spark中,DataFrame
是一种以RDD
为基础的分布式数据集;
*
DataFrame类似于传统数据库中的二维表格;
*
DataFrame与RDD的主要区别在于:
DataFrame
带有schema元信息,即DataFrame的每一列都带有名称和类型。
这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。
RDD
所存数据元素的具体内部结构无从得知,Spark Core只能在stage层面进行简单、通用的流水线优化。
-5- 算子:transformation()、action()
RDD的三大特性:
* 分布式存储
分布式存储在最大的好处是可以让数据在不同工作节点并行存储,以便在需要数据时并行运算。
* 弹性
弹性指其在节点存储时,既可以使用内存,也可已使用外存,为使用者进行大数据处理提供方便。
* 延迟计算
一个完整的RDD运行任务被分为两部分:Transformation和Action.
hadoop计算引擎
提供的接口只有map函数、reduce函数
spark计算引擎
提供的是mapreduce的扩展,提供两类操作:Transformation系列操作、Action系列操作
-5.1- Transformation
Transformation用于对RDD的创建,RDD只能使用Transformation创建,同时还提供大量操作方法,包括map,filter,groupBy,join等,RDD利用这些操作生成新的RDD,但是需要注意,无论多少次Transformation,在RDD中真正数据计算Action之前都不可能真正运行。
-5.2- Action
Action是数据执行部分,其通过执行count,reduce,collect等方法真正执行数据的计算部分。实际上,RDD中所有的操作都是Lazy模式进行,运行在编译中不会立即计算最终结果,而是记住所有操作步骤和方法,只有显示的遇到启动命令才执行。这样做的好处在于大部分前期工作在Transformation时已经完成,当Action工作时,只需要利用全部自由完成业务的核心工作。
-5.3- Spark 的 20个Transformations 操作函数总结及举例
function | 说明 | 注释 |
---|---|---|
map(func) | 将func函数作用到数据集的每个元素,生成一个新的分布式的数据集并返回 | |
filter(func) | 选出所有func返回值为true的元素,作为一个新的数据集返回 | |
flatMap(func) | 与map相似,但是每个输入的item能够被map到0个或者更多的items输出 | |
mapPartitions(func) | 与map相似,但是mapPartitions的输入函数单独作用于RDD的每个分区(block)上,因此func的输入和返回值都必须是迭代器iterator | |
mapPartitionsWithIndex(func) | 与mapPartitions相似,但是输入函数func提供了一个正式的参数,可以用来表示分区的编号 | |
sample(withReplacement, fraction, seed) | 从数据中抽样,withReplacement表示是否有放回,withReplacement=true表示有放回抽样,fraction为抽样的概率(0<=fraction<=1),seed为随机种子 | |
union(otherDataset) | 并集操作,将源数据集与union中的输入数据集取并集,默认保留重复元素 | |
intersection(otherDataset) | 交集操作,将源数据集与union中的输入数据集取交集,并返回新的数据集 | |
distinct([numTasks]) | 去除数据集中的重复元素 | |
groupByKey([numTasks]) | 作用于由键值对(K, V)组成的数据集上,将Key相同的数据放在一起,返回一个由键值对(K, Iterable)组成的数据集 | |
reduceByKey(func, [numTasks]) | 作用于键值对(K, V)上,按Key分组,然后将Key相同的键值对的Value都执行func操作,得到一个值 | |
aggregateByKey(zeroValue, seqOp, comOp, [numTasks]) | 在于键值对(K, V)的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seqOp函数的参数,进行计算,返回的结果作为一个新的键值对(K, V) | |
sortByKey([ascending=True], [numTasks]) | 按照Key进行排序,ascending的值默认为True,True/False表示升序还是降序 | |
join(otherDataset, [numTasks]) | 类似于SQL中的连接操作,即作用于键值对(K, V)和(K, W)上,返回元组 (K, (V, W)),spark也支持外连接,包括leftOuterJoin,rightOuterJoin和fullOuterJoin | |
cogroup(otherDataset, [numTasks]) | 作用于键值对(K, V)和(K, W)上,返回元组 (K, (Iterable, Iterable)) | |
cartesian(otherDataset) | 笛卡尔乘积,作用于数据集T和U上,返回(T, U),即数据集中每个元素的两两组合 | |
pipe(command, [envVars]) | 将驱动程序中的RDD交给shell处理(外部进程),例如Perl或bash脚本 | |
coalesce(numPartitions) | 将RDD的分区数减小到numPartitions个 | |
repartition(numPartitions) | 重组数据,数据被重新随机分区为numPartitions个,numPartitions可以比原来大,也可以比原来小,平衡各个分区 | |
repartitionAndSortWithinPartitions(partitioner) | 根据给定的partitioner函数重新将RDD分区,并在分区内排序 |
-5.4- Spark 的 12个Actions 操作函数总结及举例
function | 说明 | 注释 |
---|---|---|
reduce(func) | 对数据集中的元素做聚集操作 | |
collect() | 以数组形式返回数据集中所有的元素 | |
count() | 返回数据集中元素的个数 | |
first() | 返回数据集中的第一个元素 | |
take() | 以数组形式返回数据集中前n个元素 | |
takeSample(withReplacement, num, [seed]) | 以数组形式返回从数据集中抽取的样本数量为num的随机样本,有替换或者无替换的进行采样 | |
takeOrdered(n, [ordering]) | 返回RDD的前n个元素,可以利用自然顺序或者由用户执行排序的comparator | |
saveAsTextFile(path) | 将数据集中的元素以文本文件(或者文本文件的一个集合)的形式写入本地文件系统,或者HDFS,或者其他Hadoop支持的文件系统的指定路径path下。Spark会调用每个元素的toString方法,将其转换为文本文件中的一行 | |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop SequenceFile的形式写入本地文件系统,或者HDFS,或者其他Hadoop支持的文件系统的指定路径path下。RDD的元素必须由实现了Hadoop的Writable接口的key-value键值对组成。 | |
saveAsObjectFile(path) | 利用Java序列化,将数据集中的元素以一种简单的形式进行写操作,并能够利用SparkContext.objectFile()加载数据 | |
countByKey() | 只能作用于键值对(K, V)形式的RDDs上。按照Key进行计数,返回键值对(K, int)的哈希表 | |
foreach(func) | 在数据集的每个元素上调用函数func |
-6- cache
-7- Job、Stage、Task、Spark Web UI
*
一个action算子触发一个job
*
一个job中有好多的task,task是执行job的逻辑单元(猜测是根据partition划分任务)
*
一个job根据是否有shuffle发生可以分为好多的stage
-8- 参数配置
--conf spark.executor.instances=5
--conf spark.executor.cores=8
--conf spark.executor.memory=80G
网友评论