每个Spark应用都由一个驱动器程序(driver program)(例如Spark Shell本身)来发起集群上的各种并行操作,一般要管理多个执行器(executor)节点。
驱动器程序是整个程序的入口,通过一个SparkContext对象来访问Spark。这个对象代表集群上的一个连接。
初始化Spark
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster('local').setAppName('Myapp')
sc = SparkContext(conf = conf)
关闭Spark
sc.stop()
RDD
RDD(弹性分布式数据集Resilient Distributed Dataset)就是分布式的元素集合。
Spark中,对数据的所有操作不外乎创建RDD,转化已有RDD,调用RDD操作进行求值。
Spark会自动将RDD中的数据分发到集群上,并将操作并行化执行。
RDD是一个不可变的分布式集合对象
一个RDD内部有许多partitions组成,partition 是RDD的最小单元,RDD是由分布在各个节点上的partition 组成的。
创建RDD
- 读取一个外部数据集
lines = sc.textFile("README.md")
- 在驱动器程序里对一个集合进行并行化(如list,set)
words = sc.parallelize(["scala","java","hadoop","spark","akka"])
RDD操作
- 转化操作
惰性求值,不会立即执行操作
Spark会在内部记录下所要求执行的操作的相关信息。
会生成一个新的RDD
如map(),filter(),union()
返回RDD
pylines = lines.filter(lambda line: 'python' in line )
- 行动操作
用到RDD时会触发实际的计算,强制执行那些用到的RDD的转化操作
最终结果会返回到驱动器程序或写入外部存储系统
如count(),first()
每当调用一个新的行动操作时,整个RDD都会从头开始计算。可以通过将中间结果持久化,避免这种低效的行为。
把数据读取到RDD的操作也是惰性的。,到可以执行一个行动操作来强制执行。如count()
Spark使用谱系图来记录这些不同的RDD之间的依赖关系。Spark需要这些信息来按需计算每个RDD,也可以依靠谱系图在持久化的RDD丢失部分数据时恢复所丢失的数据。
常见操作
- 转化操作
- map()与 flatMap()
将函数用于RDD中每一个元素
flatMap()对每个输入元素生成多个输出元素,通常用于切分单词 - distinct() 保持元素唯一性
开销大,需要将所有数据通过通过网络进行shuffle(混洗),保证每个元素只有一份 - filter() 过滤
2.伪集合操作
- union() 合集,有重复元素
- intersection()交集,无重复元素(shuffle,单个RDD内的重复元素也一起被移除)
- substract() 也需要shuffle
- 计算两个RDD的笛卡尔积 RDD1.cartesian(RDD2)
- 行动操作
- reduce(),接收函数作为参数,函数操作两个相同元素类型的RDD数据并返回一个同样类型的新元素。
sum = rdd.reduce(lambda x,y: x+y)
- fold()
- collect() 可以用来获取整个RDD中的数据(数据规模小的情况下)
- count() RDD中元素个数
- countByValue() RDD中各元素出现次数
- take(num) 返回num个元素,无序
- top(num) 返回前num个元素
- foreach(func) 对每个元素进行操作,不需把RDD返回本地
lines.foreach(println)
4.数据持久化
- 如果要在多个行动操作中重用同一个RDD,应该用RDD.persist()让Spark把整个RDD的内容保存到内存中(以分区的方式存储到集群中的各机器上)。也可以缓存到磁盘上。
- persist()调用本身不会触发强制求值
- cache()
- unpersist()手动把持久化的RDD从缓存中移除
persist()与cache():
1)RDD的cache()方法其实调用的就是persist方法,缓存策略均为MEMORY_ONLY;
2)可以通过persist方法手工设定StorageLevel来满足工程需要的存储级别;
3)cache或者persist并不是action;
4)cache和persist都可以用unpersist来取消
网友评论