Spark
一.Spark生态圈:
(1)Spark Core : RDD(弹性分布式数据集)
(2)Spark SQL
(3)Spark Streaming
(4)Spark MLLib:协同过滤,ALS,逻辑回归等等 --> 机器学习
(5)Spark Graphx : 图计算
二.什么是Spark
1.Spark是什么:
Spark是一个针对大规模数据处理的快速通用引擎。
- Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的Spark已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群达到8000台的规模,是当前已知的世界上最大的Spark集群。
2.特点:
- 快:快100倍(Hadoop 3 之前)
- 易用:支持多种语言开发
- 通用性:生态系统全。
- 易用性:兼容Hadoop
3.最大特点:基于内存
- Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。
三.Spark体系架构
1.Spark集群的体系架构图解:
image.png2.Spark的主从结构
image.png四.Spark的安装部署
1.Spark的安装部署方式有以下几种模式:
(1)Standalone: 本机调试
(2)YARN
(3)Mesos
(4)Amazon EC2
2.执行过程:
一个Worker有多个 Executor。 Executor是任务的执行者,按阶段(stage)划分任务。————> RDD
五.Spark的搭建:
1.准备工作:
- jdk
- 配置主机名
- 免密码登录
2.伪分布式模式安装:
(1)下载
(2)上传到linux
(3)解压
(4)修改配置文件
- 配置文件:conf/spark-env.sh
cd /opt/module
mv spark-2.1.0-bin-hadoop2.7/ spark/ //重命名spark文件夹
cd /opt/module/spark/conf
mv spark-env.sh.template spark-env.sh //重命名配置文件
vi spark-env.sh
修改内容如下:
export JAVA_HOME=/opt/module/jdk1.8.0_144
export SPARK_MASTER_HOST=bigdata121 //主节点的服务器名
export SPARK_MASTER_PORT=7077 //主节点端口号
//下面的可以不写,默认
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1024m
image.png
- 配置文件:conf/slave
mv slaves.template slaves
vi slaves
新增内容:
bigdata121
image.png
(5)启动:
cd /opt/module/spark
sbin/start-all.sh
image.png
(6)验证:192.168.127.121:8080
image.png3.全分布的安装部署:
(1)下载
(2)上传到linux
(3)解压
(4)修改配置文件
- 配置文件:conf/spark-env.sh
cd /opt/module
mv spark-2.1.0-bin-hadoop2.7/ spark/ //重命名spark文件夹
cd /opt/module/spark/conf
mv spark-env.sh.template spark-env.sh //重命名配置文件
vi spark-env.sh
修改内容如下:
export JAVA_HOME=/opt/module/jdk1.8.0_144
export SPARK_MASTER_HOST=bigdata121 //主节点的服务器名
export SPARK_MASTER_PORT=7077 //主节点端口号
//下面的可以不写,默认
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1024m
image.png
- 配置文件:conf/slave
mv slaves.template slaves
vi slaves
新增内容:
bigdata122
bigdata123
image.png
(5)拷贝到其他两台服务器
cd /opt/module
src -r spark/ bigdata122:/opt/module
src -r spark/ bigdata123:/opt/module
(6)启动Spark集群:
cd /opt/module/spark
sbin/start-all.sh
六.Spark的HA
1.回顾HA:
(1)HDFS,Yarn,Hbase,Spark:都是主从结构
(2)单点故障
2.基于文件系统的单点恢复
(1)主要用于开发或测试环境。当spark提供目录保存spark Application和worker的注册信息,并将他们的恢复状态写入该目录中,这时,一旦Master发生故障,就可以通过重新启动Master进程(sbin/start-master.sh),恢复已运行的spark Application和worker的注册信息。
(2)基于文件系统的单点恢复,主要是在spark-en.sh里对SPARK_DAEMON_JAVA_OPTS设置
配置参数 | 参考值 |
---|---|
spark.deploy.recoveryMode | 设置为FILESYSTEM开启单点恢复功能,默认值:NONE |
spark.deploy.recoveryDirectory | Spark 保存恢复状态的目录 |
3.基于Zookeeper的Standby Masters
(1)ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到ZooKeeper,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示。
配置参数 | 参考值 |
---|---|
spark.deploy.recoveryMode | 设置为ZOOKEEPER开启单点恢复功能,默认值:NONE |
spark.deploy.zookeeper.url | ZooKeeper集群的地址 |
spark.deploy.zookeeper.dir | Spark信息在ZK中的保存目录,默认:/spark |
(2)修改spark-env.sh参考:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=bigdata12:2181,bigdata13:2181,bigdata14:2181
-Dspark.deploy.zookeeper.dir=/spark"
(3)另外:每个节点上,需要将以下两行注释掉。
image.png(4)同步到其他两台服务器
(5)ZooKeeper中保存的信息
七.执行Spark Demo程序
1.使用Spark Shell
(1)spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。相当于REPL ,作为一个独立的Application运行
(2)两种模式:
- 本地模式:spark-shell 不接任何参数,代表本地模式
- 集群模式:spark-shell后面带有参数
(3)启动Spark shell:
spark-shell
参数说明:
--master spark://spark81:7077 //指定Master的地址
--executor-memory 2g //指定每个worker可用内存为2G
--total-executor-cores 2 //指定整个集群使用的cup核数为2个
例如:
spark-shell --master spark://spark81:7077 --executor-memory 2g --total-executor-cores 2
(4)在Spark shell中编写WordCount程序
sc.textFile("hdfs://192.168.88.111:9000/data/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")
参数说明:
- sc是SparkContext对象,该对象时提交spark程序的入口
- textFile("hdfs://192.168.88.111:9000/data/data.txt")是hdfs中读取数据
- flatMap(_.split(" "))先map在压平
- map((_,1))将单词和1构成元组
- reduceByKey(+)按照key进行reduce,并将value累加
- saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")将结果写入到hdfs中
(5)wordcount程序,处理本地文件,把结果打印到屏幕上
scala> sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.collect
res0: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))
(6)wordcount程序,处理HDFS文件,结果保存在hdfs上
sc.textFile("hdfs://node1:8020/tmp_files/test_WordCount.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile("hdfs://node1:8020/output/0331/test_WordCount")
(7)单步运行wordcount --->RDD
scala> val rdd1 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[12] at textFile at <console>:24
scala> 1+1
res2: Int = 2
scala> rdd1.collect
res3: Array[String] = Array(I love Beijing, I love China, Beijing is the capital of China)
scala> val rdd2 = rdd1.flatMap(_.split(" "))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:26
scala> rdd2.collect
res4: Array[String] = Array(I, love, Beijing, I, love, China, Beijing, is, the, capital, of, China)
scala> val rdd3 = rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:28
scala> rdd3.collect
res5: Array[(String, Int)] = Array((I,1), (love,1), (Beijing,1), (I,1), (love,1), (China,1), (Beijing,1), (is,1), (the,1), (capital,1), (of,1), (China,1))
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15] at reduceByKey at <console>:30
scala> rdd4.collect
res6: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))
(8)RDD 弹性分布式数据集
(9)Scala复习:
- flatten:把嵌套的结果展开
scala>List(List(2,4,6,8,10),List(1,3,5,7,9)).flatten
res21: List[Int] = List(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)
- flatmap:相当于一个 map + flatten
scala> var myList = List(List(2,4,6,8,10),List(1,3,5,7,9))
myList: List[List[Int]] = List(List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9))
scala> myList.flatMap(x=>x.map(_*2))
res22: List[Int] = List(4, 8, 12, 16, 20, 2, 6, 10, 14, 18)
myList.flatMap(x=>x.map(_*2))
flatmao执行过程:
- 将 List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9) 调用 map(_*2) 方法。x 代表一个List
- flatten
2.在IDEA中编写WordCount程序
(1)需要的jar包:$SPARK_HOME/jars/*.jar
(2)创建Scala Project,并创建Scala Object、或者Java Class
(3)书写源代码,并打成jar包,上传到Linux
Scala版本
image.png(4)运行程序:
spark-submit --master spark://spark81:7077
--class mydemo.WordCount jars/wc.jar
hdfs://192.168.88.111:9000/data/data.txt
hdfs://192.168.88.111:9000/output/spark/wc1
Java版本(直接输出在屏幕)
image.png(4)运行程序:
spark-submit --master spark://spark81:7077
--class mydemo.JavaWordCount jars/wc.jar
hdfs://192.168.88.111:9000/data/data.txt
八.Spark运行机制及原理分析
1.WordCount执行的流程分析
image.png2.Spark提交任务的流程析
image.png九.RDD和RDD特性,RDD的算子
1.RDD:弹性分布式数据集
(1)什么是RDD?
- RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
(2)RDD的属性:
- 一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
- 一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
- RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
- 一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
- 一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
2.如何创建RDD
(1)通过SparkContext.parallelize方法来创建(通过sc.parallelize进行创建)
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
rdd1:org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:29
scala> rdd1.partitions.length
res35: Int = 3
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),2)
rdd1:org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:29
scala> rdd1.partitions.length
res36: Int = 2
(2)通过外部数据源来创建
sc.textFile()
scala> val rdd2 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
rdd2:org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[35] at textFile at <console>:29
(3)RDD的类型:Transformation和Action
3.RDD的基本原理:
4.Transformation
(1)RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
转换 | 含义 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]) | |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 与sortByKey类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD |
cartesian(otherDataset) | 笛卡尔积 |
pipe(command, [envVars]) | |
coalesce(numPartitions) | |
repartition(numPartitions) | |
repartitionAndSortWithinPartitions(partitioner) |
5.Action
动作 | 含义 |
---|---|
reduce(func) | 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
saveAsObjectFile(path) | |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func进行更新。 |
十.RDD特性
1.RDD的缓存机制:
(1)作用:提高性能
(2)使用:标识RDD可以被缓存 persist cache
(3)可以缓存的位置:
2.RDD的容错机制:通过检查点来实现
(1)
(1)复习检查点:HDFS中的检查点:有SecondaryNamenode来实现日志的合并。
(2)RDD的检查点:容错
- 概念:血统 Lineage
- 理解:表示任务执行的生命周期。
- WordCount textFile ---> redceByKey
- 如果血统越长,越容易出错。
- 假如有检查点,可以从最近的一个检查点开始,往后面计算。不用重头计算。
(3)RDD检查点的类型:
a.基于本地目录:需要将Spark shell 或者任务运行在本地模式上(setMaster("local"))
b.HDFS目录:用于生产,集群模式
image.pngsc.setCheckPointDir(目录)
//举例:设置检查点
scala> var rdd1 = sc.textFile("hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[1] at textFile at <console>:24
//设置检查点目录:
scala> sc.setCheckpointDir("hdfs://192.168.109.131:8020/sparkckpt")
//标识rdd1可以执行检查点操作
scala> rdd1.checkpoint
scala> rdd1.count
res2: Long = 923452
3.依赖关系:宽依赖,窄依赖
(1)RDD的依赖关系:
- RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
(2)窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用
总结:窄依赖我们形象的比喻为独生子女
(3)宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition
总结:宽依赖我们形象的比喻为超生
4.Spark任务中的Stage
- DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。
十一.RDD的高级算子
1.mapPartitionsWithIndex:对RDD中的每个分区(带有下标)进行操作,下标用index表示
通过这个算子,我们可以获取分区号。
def mapPartitionsWithIndex[U](
f: (Int, Iterator[T]) ⇒ Iterator[U],
preservesPartitioning: Boolean = false)(
implicit arg0: ClassTag[U]): RDD[U]
//参数:f是个函数参数 f 中第一个参数是Int,代表分区号,第二个Iterator[T]代表分区中的元素
例如:
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> def fun1(index:Int, iter:Iterator[Int]) : Iterator[String] = {
| iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
| }
fun1: (index: Int, iter: Iterator[Int])Iterator[String]
scala> rdd1.mapPartitions
mapPartitions mapPartitionsWithIndex
scala> rdd1.mapPartitionsWithIndex(fun1).collect
res3: Array[String] = Array(
[partId : 0 , value = 1 ], [partId : 0 , value = 2 ], [partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ], [partId : 2 , value = 6 ], [partId : 2 , value = 7 ], [partId : 2 , value = 8 ])
2.aggregate:聚合操作。类似于分组。
(1)先对局部进行聚合操作,再对全局进行聚合操作。
//调用聚合操作
scala> val rdd2 = sc.parallelize(List(1,2,3,4,5),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> rdd2.mapPartitionsWithIndex(fun1).collect
res4: Array[String] = Array(
[partId : 0 , value = 1 ], [partId : 0 , value = 2 ], [partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])
scala> import scala.math._
import scala.math._
scala> rdd2.aggregate(0)(max(_,_),_+_)
res6: Int = 7
说明:
(2)对字符串操作
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:27
scala> rdd2.aggregate("")(_+_,_+_)
res11: String = abcdef
scala> rdd2.aggregate("*")(_+_,_+_)
res12: String = **def*abc
(3)复杂的例子:
a.
scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27
scala> def fun1(index:Int, iter:Iterator[String]) : Iterator[String] = {
| iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
| }
scala> rdd3.mapPartitionsWithIndex(fun1).collect
res17: Array[String] = Array(
[partId : 0 , value = 12 ], [partId : 0 , value = 23 ], [partId : 1 , value = 345 ], [partId : 1 , value = 4567 ])
scala> rdd3.aggregate("")((x,y)=> math.max(x.length,y.length).toString,(x,y)=>x+y)
res13: String = 42
执行过程:
-
第一个分区:
(a)第一次比较: "" "12" 长度最大值 2 2-->"2"
(b)第二次比较: “2” “23” 长度最大值 2 2-->"2" -
第二个分区:
(a)第一次比较: "" "345" 长度最大值 3 3-->"3"
(b)第二次比较: “3” “4567” 长度最大值 4 4-->"4"
b.
rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res18: String = 11
执行过程:
-
第一个分区:
第一次比较: "" "12" 长度最小值 0 0-->"0"
第二次比较: “0” “23” 长度最小值 1 1-->"1" -
第二个分区:
第一次比较: "" "345" 长度最小值 0 0-->"0"
第二次比较: “0” “4567” 长度最小值 1 1-->"1"
c.aggregateByKey:类似于aggregate,区别:操作的是 key value 的数据类型。
3.其他高级算子:
十二.编程案例
1.分析日志
(1)需求:找到访问量最高的两个网页
- 第一步:对网页的访问量求和
- 第二步:排序,降序
(2)创建自定义分区
(3)使用JDBCRDD 操作数据库
(4)操作数据库:把结果存放到数据库中
Spark SQL
Spark sql基础
1.什么是Spark SQL
- Spark SQL 是spark的一个模块。来处理 结构化 的数据,不能处理非结构化的数据
2.特点:
(1)容易集成: - 不需要单独安装。
(2)统一的数据访问方式
- 结构化数据的类型:JDBC JSon Hive parquer文件 都可以作为Spark SQL 的数据源
- 对接多种数据源,且使用方式类似
(3)完全兼容hive - 把Hive中的数据,读取到Spark SQL中运行。
(4) 支持标准的数据连接
- JDBC
3.为什么学习Spark SQL
- 执行效率比Hive高
- hive 2.x 执行引擎可以使用 Spark
4.核心概念:表(DataFrame DataSet) - mysql中的表:表结构、数据
- DataFrame:Schema、RDD(数据)
- DataSet 在spark1.6以后,对DataFrame做了一个封装。
5.创建DataFrame
测试数据:员工表、部门表
(1)第一种方式:使用case class
a.定义Schema
b.读取文件
c.把每行数据,映射到Emp上
d.生成DataFrame
(2)第二种方式 使用Spark Session
(3)直接读取一个带格式的文件。
6.操作DataFrame
(1)DSL语句
(2)SQL语句
注意:不能直接执行SQL,需要生成一个视图,再执行sql。
(3)多表查询
7.操作DataSet
(1)跟DataFrame类似,是一套新的接口。高级的Dataframe
(2)创建DataSet
- 使用序列来创建DataSet。
- 使用JSON数据来创建DataSet
- 使用其他数据
(3)DataSet案例
(4)多表查询
- 创建部门表
- 创建员工表
- 执行多表查询:等值连接
- 多表连接后再筛选
7.Spark SQL中的视图
(1)视图是一个虚表,不存储数据。
(2)两种类型:
- 普通视图(本地视图):只在当前Session中有效。createOrReplaceTempView createTempView
- 全局视图: createGlobalTempView:在不同的Session中都有用 把全局视图创建在命名空间中:global_temp中。类似于一个库。
二.使用数据源
1.在Spark SQL中,可以使用各种各样的数据源来操作。 结构化
2.使用load函数、save函数
- load函数是加载数据,save是存储数据
注意:使用load 或 save时,默认是Parquet文件。列式存储文件。
3.Parquet文件:列式存储文件,是Spark SQL 默认的数据源 - 就是一个普通的文件
(1)把其他文件,转换成Parquet文件
(2)支持Schema的合并
4.json文件
5.JDBC
(1)使用JDBC操作关系型数据库,加载到Spark中进行分析和处理。
(2)方式一:
(3)方式二:
6.使用hive
(1)spark SQL 完全兼容hive
(2)需要进行配置
- 拷贝一下文件到spark/conf目录下:
- Hive 配置文件: hive-site.xml
- Hadoop 配置文件:core-site.xml hdfs-site.xml
(3)配置好后,重启spark
(4)启动Hadoop 与 hive
三.在IDE中开发Spark SQL
四.性能优化
1.用内存中缓存表的数据
直接读取内存的值,来提高性能
2.了解性能优化的相关参数:参考讲义
Spark Streaming
一.常用的实时计算引擎(流式计算)
1.Apache Storm:真正的流式计算
2.Spark Streaming :严格上来说,不是真正的流式计算(实时计算)
把连续的流式数据,当成不连续的RDD
本质:是一个离散计算(不连续)
3.Apache Flink:真正的流式计算。与Spark Streaming相反。
把离散的数据,当成流式数据来处理
4.JStorm
二.Spark Streaming基础
1.什么是 Spark Streaming。
- Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.
易于构建灵活的、高容错的流式系统。
2.特点:
- 易用,已经集成到Spark中
- 容错性:底层RDD,RDD本身具有容错机制
- 支持多种语言:Java Scala Python
3.演示官方的Demo
往Spark Streaming中发送字符串,Spark 接收到以后,进行计数
使用消息服务器 netcat Linux自带
yum install nc.x86_64
nc -l 1234
注意:总核心数 大于等于2。一个核心用于接收数据,另一个用于处理数据
在netcat中写入数据 Spark Streaming可以取到
4.开发自己的NetWorkWordCount程序,和Spark Core类似
问题:Hello Hello
Hello World
现在现象:(Hello,2)
(Hello , 1) (World , 1)
能不能累加起来?保存记录下以前的状态?
通过Spark Streaming提供的算子来实现
三.高级特性:
1.什么是DStream?离散流
- 把连续的数据变成不连续的RDD
- 因为DStream的特性,导致,Spark Streaming不是真正的流式计算
2.重点算子讲解
(1)updateStateByKey
默认情况下,Spark Streaming不记录之前的状态,每次发数据,都会从0开始
现在使用本算子,实现累加操作。
(2)transform
3.窗口操作
- 窗口:对落在窗口内的数据进行处理,也是一个DStream,RDD
- 举例:每10秒钟把过去30秒的数据采集过来
- 注意:先启动nc 再启动程序 local[2]
4.集成Spark SQL : 使用SQL语句来处理流式数据
5.缓存和持久化:和RDD一样
6.支持检查点:和RDD一样
四.数据源
Spark Streaming是一个流式计算引擎,就需要从外部数据源来接收数据
1.基本的数据源
- 文件流:监控文件系统的变化,如果文件有增加,读取文件中的内容
希望Spark Streaming监控一个文件夹,如果有变化,则把变化采集过来 - RDD队列流:可以从队列中获取数据
- 套接字流:socketTextStream
2.高级数据源
(1)Flume
(2)Spark SQL 对接flume有多种方式:
- push方式:flume将数据推送给Spark Streaming
- custom sink 模式:比第一种有更好的健壮性和容错性。使用这种方式,flume配置一个sink。
- 使用官方提供的spark sink组件
需要把 spark-streaming-flume-sink_2.10-2.1.0.jar 拷贝到flume lib下
需要把 spark-streaming-flume-sink_2.10-2.1.0.jar 拷贝到IDE的lib下添加到build path中
(3)Kafka
在讲Kafka时,举例。
四.性能优化的参数
(1)性能优化:
spark submit的时候,程序报OOM错误
程序跑的很慢
(2)方法:调整spark参数
conf.set...
性能调优
一.Spark 性能优化概览:
- Spark的计算本质是,分布式计算。
- 所以,Spark程序的性能可能因为集群中的任何因素出现瓶颈:CPU、网络带宽、或者内存。
- CPU、网络带宽,是运维来维护的。
- 聚焦点:内存。
- 如果内存能够容纳下所有的数据,那就不需要调优了。
- 如果内存比较紧张,不足以放下所有数据(10亿量级---500G),需要对内存的使用进行性能优化。
- 比如:使用某些方法减少内存的消耗。
二.Spark性能优化,主要针对在内存的使用调优。
三.Spark性能优化的技术:
1.使用高性能序列化类库
2.优化数据结构
3.对于多次使用的RDD进行持久化、checkpoint
4.持久化级别:MEMORY_ONLY ---> MEMORY_ONLY_SER 序列化
5.Java虚拟机垃圾回收调优
6.Shuffle调优,1.x版本中,90%的性能问题,都是由于Shuffle导致的。
四.其他性能优化:
1.提高并行度
2.广播共享数据
等等。。。
五.诊断Spark内存使用:首先要看到内存使用情况,才能进行针对性的优化。
1.内存花费:
(1)每个Java对象,都有一个对象头,占用16字节,包含一些对象的元信息,比如指向他的类的指针。
- 如果对象本身很小,比如int,但是他的对象头比对象自己还大。
(2)Java的String对象,会比他内存的原始数据,多出40个字节。
- String内部使用的char数组来保存内部的字符串序列,并且还要保存诸如输出长度之类的信息。
- char使用的是UTF-16编码,每个字符会占2个字节。比如,包含10个字符的String,2*10+40=60字节
(3)Java中的集合类型,比如HashMap和LinkedList,内部使用链表数据结构。 - 链表中的每个数据,使用Entry对象包装。
- Entry对象,不光有对象头,还有指向下一个Entry的指针,占用8字节。
(4)元素类型为原始数据类型(int),内部通常会使用原始数据类型的包装类型(Integer)来存储元素。
2.如何判断Spark程序消耗内存情况?:答案是预估
(1)设置RDD的并行度。
- 两种方法创建RDD,parallelize() textFile() 在这两个方法中,传入第二个参数,设置RDD的partition数量。
- 在SparkConfig中设置一个参数:
- spark.default.parallelism
- 可以统一设置这个application中所有RDD的partition数量
(2)将RDD缓存 cache()
(3)观察日志:driver日志
/usr/local/spark-2.1.0-bin-hadoop2.7/work
19/04/13 22:01:05 INFO MemoryStore: Block rdd_3_1 stored as values in memory (estimated size 26.0 MB, free 339.9 MB)
19/04/13 22:01:06 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated size 26.7 MB, free 313.2 MB)
(4)将这个内存信息相加,就是RDD内存占用量。
六.使用高性能序列化类库
1.数据序列化概述
-
数据序列化,就是将对象或者数据结构,转换成特定的格式,使其可在网络中传输,或存储在内存或文件中。
-
反序列化,是相反的操作,将对象从序列化数据中还原出来。
-
序列化后的数据格式,可以是二进制,xml,Json等任何格式。
-
对象、数据序列化的重点在于数据的交换与传输。
-
在任何分布式系统中,序列化都是扮演着一个重要的角色。
-
如果使用的序列化技术,操作很慢,或者序列化后的数据量还是很大,会让分布式系统应用程序性能下降很多。
-
所以,Spark性能优化的第一步,就是进行序列化的性能优化。
-
Spark自身默认会在一些地方对数据进行序列化,比如Shuffle。另外,我们使用了外部数据(自定义类型),也要让其课序列化。
-
Spark本身对序列化的便捷性和性能进行了取舍
-
默认情况下:Spark倾向于序列化的便捷性,使用了Java自身提供的序列化机制,很方便使用。
-
但是,Java序列化机制性能不高,序列化速度慢,序列化后数据较大,比较占用内存空间。
2.kryo
- Spark支持使用kryo类库来进行序列化。
- 速度快,占用空间更小,比Java序列化数据占用空间小10倍。
3.如何使用kryo序列化机制
(1)设置Spark conf
bin/spark-submit will also read configuration options from conf/spark-defaults.conf,
in which each line consists of a key and a value separated by whitespace. For example:
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
(2)使用kryo是,要求需要序列化的类,要提前注册,以获得高性能
4.kryo类库的优化
(1)优化缓存大小
- 如果注册的自定义类型,本身特别大(100个字段),会导致要序列化的对象太大。此时需要对kyro本身进行优化。因为kryo内部的缓存,可能不能存放这么大的class对象。
spark.kryoserializer.buffer.max //设置这个参数,将其调大。
(2)预先注册自定义类型
- 虽然不注册自定义类型,kryo也可以正常工作,但会保存一份他的全限定类名,耗费内存。
- 推荐预先注册要序列化的自定义类型。
七.优化数据结构
1.概述
- 要减少内存的消耗,除了使用高效的序列化类库外,还要优化数据结构。
- 避免Java语法特性中所导致的额外内存开销。
- 核心:优化算子函数内部使用到的局部数据或算子函数外部的数据。
- 目的:减少对内存的消耗和占用。
2.如何做?
(1)优先使用数组以及字符串,而不是集合类。即:优先使用Array,而不是ArrayList、LinkedList、HashMap
- 使用int[] 会比List<Integer> 节省内存
(2)将对象转换成字符串。 - 企业中,将HashMap、List这种数据,统一用String拼接成特殊格式的字符串
Map<Integer,Person> persons = new HashMap<Integer,Person>()
可以优化为:
"id:name,address"
String persons = "1:Andy,Beijing|2:Tom,Tianjin...."
(3)避免使用多层嵌套对象结构
(4)对于能够避免的场景,尽量使用int代替String
- 虽然String比List效率高,但int类型占用更少内存,比如:数据库主键,id,推荐使用自增的id,而不是uuid
八.rdd.cache checkpoint
九.持久化级别:MEMORY_ONLY ---> MEMORY_ONLY_SER 序列化
十.Java虚拟机的调优
1.概述
- 如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能成为一个瓶颈
- Java虚拟机会定期进行垃圾回收,此时会追踪所有Java对象,并且在垃圾回收时,找到那些已经不再使用的对象。
- 清理旧对象,给新对象腾出空间。
- 垃圾回收的性能开销,是与内存中的对象数量成正比。
- 在做Java虚拟机调优之前,必须先做好上面的调优工作,这样才有意义。
- 必须注意顺序
2.Spark GC原理
见图片
3.监测垃圾回收
- 我们可以进行监测,比如多久进行一次垃圾回收以及耗费的时间等等。
spark-submit脚本中,添加一个配置
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimesStamps"
注意:这个是输出到worker日志中,而不是driver日志。
/usr/local/spark-2.1.0-bin-hadoop2.7/logs worker日志
/usr/local/spark-2.1.0-bin-hadoop2.7/work driver日志
4.优化Executor内存比例
(1)目的:减少GC次数。
- 对于GC调优来说,最重要的就是调节,RDD的缓存占用的内存空间 与 算子执行时创建对象所占用的内存空间 的比例
- 对于默认情况,Spark使用每个Executor 60% 的内存空间来缓存RDD,在task运行期间所创建的对象,只有40%内存空间来存放。
//使用:
conf.set("spark.storage.memoryFraction",0.5)
5.Java GC 调优 (-)
十一.shuffle原理
1.优化前
图片
2.优化后
图片
十二.其他调优
1.提高并行度
2.广播共享数据
Spark Mllib:MLlib 是 Spark 可以扩展的机器学习库。
一.MLlib概述
MLlib 是 Spark 可以扩展的机器学习库。
Spark在机器学习方面具有得天独厚的有事,有以下几个原因:
1.机器学习算法一般都有多个步骤迭代计算,需要在多次迭代后,获得足够小的误差或者收敛才会停止。
double wucha = 1.0
while(wucha>=0.00001){
建模 wucha -= 某个值
}
模型计算完毕
当迭代使用Hadoop的MapReduce计算框架时,每次都要读写硬盘以及任务启动工作,导致很大的IO开销。
而Spark基于内存的计算模型天生擅长迭代计算。只有在必要时,才会读写硬盘。
所以Spark是机器学习比较理想的平台。
2.通信,Hadoop的MapReduce计算框架,通过heartbeat方式来进行通信和传递数据,执行速度慢。
- spark 有高效的 Akka 和 Netty 的通信系统,通行效率高。
- SPark MLlib 是Spark 对常用的机器学习算法的实现库,同时包括相关测试和数据生成器。
二.什么是机器学习?
1.机器学习的定义。
A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P,
if its performance at tasks in T, as measured by P, improves with experience E。
2.三个关键词:算法、经验、模型评价
在数据的基础上,通过算法构建出模型,并进行评价
如果达到要求,则用该模型测试其他数据
如果不达到要求,要调整算法来重新建立模型,再次进行评估
循环往复,知道获得满意的经验
3.应用:金融反欺诈、语音识别、自然语言处理、翻译、模式识别、智能控制等等
4.基于大数据的机器学习
(1)传统的机器学习算法,由于技术和单机存储的现值,只能在少量数据上使用。即,依赖于数据抽样。
(2)传统的机器学习存在的问题:很难做好随机,导致学习的模型不准确。
(3)在大数据上进行机器学习,直接处理全量数据并进行大量迭代计算。
(4)Spark本身计算优势,适合机器学习。
(5)另外 spark-shell pyspark 都可以提供及时查询工具
5.MLlib
-
MLlib是Spark机器学习库,简化机器学习的工程实践工作,方便扩展到更大规模。
-
集成了通用的学习算法:分类、回归、聚类、协同过滤、降维等等
-
另外,MLlib本身在Spark中,数据清洗、SQL、建模放在一起。
三、线性回归
四、余弦相似性
https://blog.csdn.net/u012160689/article/details/15341303
Spark Graphx
一.Spark Graphx 是什么?
1.是Spark 的一个模块,主要用于进行以图为核心的计算,还有分布式图计算
2.Graphx 底层基于RDD计算,和RDD共用一种存储形态。在展示形态上,可以用数据集来表示,也可以用图来表示。
二.Spark GraphX 有哪些抽象?
1.顶点
RDD[(VertexId,VD)]表示
VertexId 代表了顶点的ID,是Long类型
VD 是顶点的属性,可以是任何类型
2.边
RDD[Edge[ED]]表示
Edge表示一个边
包含一个ED类型参数来设定属性
另外,边还包含了源顶点ID和目标顶点ID
3.三元组
三元组结构用RDD[EdgeTriplet[VD,ED]]表示
三元组包含一个边、边的属性、源顶点ID、源顶点属性、目标顶点ID、目标顶点属性。
4.图
Graph表示,通过顶点和边来构建。
网友评论