Spark 简介
Spark是什么
一个快速且通用的集群计算平台
Spark特点:
1. Spark扩充了流行的Mapreduce计算模型
2. Spark是基于内存的计算(一些中间数据存储在内存中,加快读写)
Spark 是通用的
Spark的设计容纳了其他分布式系统拥有的功能
批处理,迭代式计算,交互查询和流处理等
Spark是高度开放的
Spark提供了Python Java Scala Sql的API和丰富的内置库
Spark和其他的大数据工具整合的很好,包括Hadoop、kafaka等
- Spark生态圈
最初是基于Hadoop MapReduce的
发现MapReduce在迭代式计算和交互式上低效,引入了内存存储(重大的区别)
Spark的组件:
Spark SQL structured data:
是Spark处理结构化数据的库,就像Hive SQL,Mysql一样--报表统计
Spark Streaming real-time:
是实时数据流处理组件,类似Storm
Spark Streaming提供了API来操作实时流数据
应用场景,企业中用来从Kafka接收数据做实时统计
Mlib machine learning
一个包含通用机器学习功能的包, Machine learning lib
包含分类,聚类,回归等,还包括模型评估,和数据导入
MLib提供的上面这些方法,都支持集群上的横向扩展
Graphx graph processing
是处理图的库(例如,社交网络),并进行图的并行计算
继承了RDD API
它提供了各种图的操作和常用的图算法,例如PangeRank算法
应用场景,图计算
---------------------
Spark Core
包含了Spark的基本功能,包含任务调度,内存管理,容错机制
内部定义了RDDs(弹性分布式数据集)
提供了很多APPIs来创建和操作这些RDDs
应用场景,为其他组件提供底层服务
---------------------
Cluster Managers:
就是集群管理,Spark自带一个集群管理是单独调度器
常见集群管理包括Hadoop YARN, Apache Mesos
紧密集成的优点
Spark底层优化了,基于Spark底层的组件,也得到了相应的优化
紧密集成,节省了各个组件组合使用时的部署,测试等时间
向Spark增加新的组件,其它组件时,可立刻享用新组件的功能
- Spark 和 Hadoop比较
Hadoop应用场景:
离线处理
对时效性要求不高:中间数据会落在硬盘上(几分钟到几小时不等)
Spark应用场景:
时效性要求高的场景(几秒钟到几分钟不等)
机器学习等领域
基于内存
比较:
这是生态系统,每个组件都有其作用,各司其职即可
Spark不具有HDFS的存储能力,要借助HDFS等持久化数据。
大数据将会孕育出更多的新技术
Spark安装
数据处理????
Spark运行环境:
Spark是Scala写的,运行在JVM上,所有运行环境是Java7+
python 2.6+ 3.7+
Spark的Shell:
Spark的Shell使你能够处理分布在集群上的数据
Spark把数据加载到节点的内存中,因此分布式处理可在秒级完成
快速迭代式计算,实时查询,分析一般能够在shells中完成
Spark提供了Python shells 和 Scala shells
python shell :
bin/pyspark
bin/spark-shell
spark是用Scala写的???
网络问题 版本问题
Rdds
RDDs介绍
Driver programe
包含程序main()方法,RDDs定义和操作
它管理很多节点,我们称作executors
SparkContext:
driver programs 通过SparkContext对象访问Spark
SparkContext对象代表和一个集群的连接
在shell中SparkContext自动创建好了,就是sc变量(直接调用即可)
scala> val lines=sc.textFile(../xx)
-----------------------------
RDDs:
1. Resilient distributed datasets(弹性分布式数据集,简写RDDs)
2. 这些RDDs,并行的分布在整个集群中
eg。1
scala> val lines=sc.textFile(../xx)
假设这个文件有500g ,对其切分分布在5个节点上
通过lines就可以知道文件存放在哪,并操作(lines就是RDDs)
3. RDDs是Spark分发数据和计算的基础抽象类
1.1 一个RDD是一个不可改变的分布式集合对象(相当于文件被占用,不可修改)
1.2 Spark中,所有的计算都是通过RDDs的创建,转换,操作完成的。
1.3 一个RDD内部由许多partitions(分片)组成
分片:
每个分片包括一部分数据,partitions可在集群不同节点上计算
分片是Spark并行处理的单元,Spark顺序的,并行的处理分片
RDDs的创建方法:
一、 把一个存在的集合传给SparkContext的parallelize()方法,测试用
val rdd = sc.parallelize(Array(1,2,2,4),4)
第一个参数:待并行化处理的集合,第2个参数:分区个数
二、 加载外部数据集
val rddText = sc.textFile(“xxx.txt”)
Scala基础语法:
val var创建变量
val:不可修改
var:可变
Scala的匿名函数和类型推断
RDD基本操作之Transformation
Transformation介绍:
从之前RDD构建一个新的RDD,像map()和filter()
逐元素Transformation
map()
map接收函数,把函数应用到RDD的每一个元素,返回新RDD
val lines = lines.map(word=>(word,1))
filter()
filter()接收函数,返回只包含满足filter()函数的元素的新rdd
val lines =lines.filter(word=>word.contains(‘hello’))
flatMap()
val inputs = sc.TextFile("xxx.txt")
val lines = inputs.flatMap(line=>line.split(" "))
集合运算
并集 交集等
val rdd1=sc.parallelize(Array("coffe","coffe","panda","monkey","tea"))
val rdd2=sc.parallelize(Array("coffe","coffe","panda","monkey","cat"))
val rdd_distinct=rdd1.distinct(rdd2)
val rdd_union =rdd1.union()
val rdd_inter=rdd.intersection(rdd2)
val rdd_sub=rdd1.subtract(rdd2)
RDD基本操作action,对返回结果对操作???
Action介绍
在RDD上计算出来一个结果
把结果返回给driver program或保存在文件系统,count(),save
reduce() //累加,计数,和其他类型的聚集操作
rdd.reduce((x,y)=>x+y)
Collect()
//遍历整个RDD,向driver program返回RDD内容
//需要单机内存能够容纳下(因为数据要拷贝给driver,测试使用)
大数据的时候,使用saveAsTextFile() action等
take(n)
返回RDD的n个元素(同时尝试访问最少的partitions)
返回结果是无序的测试使用
top()
//排序
foreach()
//计算RDD中的每个元素,但不返回本地,不保存
RDDs的特性
RDDs的血统关系图:
Spark维护着RDDs之间的依赖关系和创建关系,叫做血统关系图
Spark使用血统关系图来计算每个RDD的需求和恢复丢失的数据
延迟计算(lazy evaluation)
Spark对RDDs的计算是,他们第一次使用action操作的时候
这种方式在处理大数据的时候特别有用,可以减少数据的传输
Spark内部记录metadata表明transformation操作已被响应
加载数据也是延迟计算,数据只有在必要的时候,才会被加载进去
RDD.persist()
默认每次在RDDs上进行action操作时,Spark都重新计算RDDs
如果想重复利用一个RDD,可以使用RDD.persist()参数:内存,硬盘
unpersist 从缓存中移除
KeyValue对RDDs
创建KeyValue对Rdds:
使用Map函数 ,返回keyValue
eg.1
使用map()函数,返回key/vlaule
例如,包含数行数据对RDD,把每行数据数据的第一个单词作为keys
val rdd2=rdd.map(line=>(line.split(" ")(0),line))
KeyValue对RDDs的Transformations(example:{(1,2),(3,4),(5,6)})
reduceBykey()
groupByKey()
.............
combineByKey():
(createCombiner,mergeValue,MergeCombiners,partitioner)
最常用基于key的聚合函数,返回类型可以与输入类型不一样
许多基于key的聚合函数都用到了它,像groupByKey()
combineByKey():
遍历partition中的元素,元素的key,要么之前见过,要么不是
如果是新元素,使用我们提供的createCombiner()函数
如果是这个partition中已经存在key,就会使用mergeValue()函数
合计每个partition的结果的时候,使用mergeCombiner()函数
网友评论