Spark06 RDD 行动算子

作者: 山高月更阔 | 来源:发表于2020-05-24 13:36 被阅读0次

行动操作

行动操作会把最终计算的结果返回到驱动程序,或写入外部的存储系统中。由于行动操作需要产生实际的输出,他们会强制执行那些必须用到的RDD的转换操作。

非行动操作的其他操作都是惰性求值的。这意味着在被调用行动操作之前Spark不会开始计算。

惰性求值意味着当我们对 RDD 调用转换操作 (例如 map() )时,操作不会立即执行。相反,Spark 会在内部记录下所要求的执行的操作的相关信息。我们不应该把 RDD 看做存放特定数据的数据集,而最好把每个 RDD 看做通过转化操作构建出来、记录如何计算的指令集。把数据读取到 RDD 仍然是惰性的。所以当调用 textFile() 时,数据不会读取进来, 而是在必要时才会读取。

常见的非行动算子有 map flatMap filter distinct sample union intersection subtract cartesian 等 用法详情见上一篇 Spar05 RDD 转换算子

collect 算子

返回 RDD 中所有元素

Python

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd.collect()
['a', 'b', 'c']

count 算子

计算 RDD 中的元素个数

Python

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd.count()
3

countByValue 算子

计算各个元素在 RDD 中出现的次数

Python

>>> rdd = sc.parallelize(['a','a','b','c'])
>>> rdd.countByValue()
defaultdict(<class 'int'>, {'a': 2, 'b': 1, 'c': 1})

take(num) 算子

从 RDD 中返回 num个元素

Python

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd.take(2)
['a', 'b']

top(num)

从RDD中返回最前面的 num 个元素

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd.top(2)
['c', 'b']

takeOrdered(num,key=ordering)

从 RDD 中按照顺序返回前num个元素

Python

>>> rdd = sc.parallelize([1,3,5,2,9,7])
>>> rdd.takeOrdered(3,key=lambda x:-x)
[9, 7, 5]

对复杂类型操作

Python

>>> rdd = sc.parallelize([('a',1),('b',3),('c',5),('d',2),('e',9),('f',7)])
>>> rdd.takeOrdered(3,key=lambda x:x[1])
[('a', 1), ('d', 2), ('b', 3)]

takeSample(withReplacement,num,[sed])

从 RDD 中返回任意一些元素
withReplacement: true表示有放回的采样,false表示无放回采样
num: 表示返回的采样数据的个数
sed: 表示用于指定的随机数生成器种子

Python

>>> rdd = sc.parallelize([1,3,5,2,9,7])
>>> rdd.takeSample(False,3)
[3, 9, 1]
>>> rdd.takeSample(False,3)
[2, 3, 5]
>>> rdd.takeSample(False,3)
[7, 3, 9]
>>> rdd.takeSample(False,3)
[7, 9, 1]

reduce(func)

并行整合 RDD 中所有的元素 例如 就和

Python

>>> rdd = sc.parallelize([1,3,5,2,9,7])
>>> rdd.reduce(lambda x,y:x+y)
27

fold(zero,func)

与 reduct 类型 需要提供一个初始值 zero是0值 也称为初始值

Python

>>> rdd = sc.parallelize([1,3,5,2,9,7])
>>> rdd.fold(0,lambda x,y:x+y)
27

与reduce 返回结果一致 假如zero不是0 而是1 结果是多少呢

>>> rdd.fold(1,lambda x,y:x+y)
32

结果并不是28而是32
这是为什么呢?
因为 RDD 是分区存放的 计算 fold 每个分区计算 让后将各个分区计算的结果在计算。所以每个分区计算时都有 zero 值
我们来看 rdd 的分区情况

>>> rdd.glom().collect()
[[1], [3, 5], [2], [9, 7]]

所以计算逻辑为:
先计算各个分区 第一列为 zero
1 + 1 = 2
1 + 3 + 5 = 9
1 + 2 = 3
1 + 9 + 7 = 17
各个分区求和:
1 + 2 + 9 + 3 + 17 = 32 (第一个1位zero)

aggregate(zeroValue, seqOp, combOp)

zeroValue:初始值
seqOp:每个分区操作
combOp:分区间操作

例如求 1,2,3,4,5,6,7,8,9,10求和
Python

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.aggregate(0,lambda acc,value:acc+value,lambda acc1,acc2:acc1+acc2)
55

seqOp:的第一个参数 acc 是上一次计算结果 value 是当前值
combOp:参数 acc1和acc2 都是各个分区计算结果

在 例如求 1,2,3,4,5,6,7,8,9,10 的平均值

Python

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> res = rdd.aggregate((0,0),lambda acc,value:(acc[0]+value,acc[1]+1),lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))
>>> res[0]/res[1]
5.5

foreach(func)

对每个元素使用给定的函数 func

Python

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.foreach(lambda x:print(str(x)+'\n'))
1

2

5

6

3

4

7

8

9

10

持久化(缓存)

如前所说,Spark RDD 是惰性求值的,而有时我们期望能多次使用同一个 RDD。如果简单的对 RDD 做调用操作,Spark 每次会重新计算 RDD 以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法长长会多次使用同一组数据。例如

Python

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd1 = rdd.map(lambda x : x * x)
>>> rdd1.count()
10
>>> rdd1.collect()
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

在调用 rdd1.count 和 rdd1.collect() 都会计算 map。为了避免多次计算同一个 RDD ,可以让Spark 对数据进行持久化。Spark 会将 RDD 的节点分别保存到他们求出的分区上。如果有一个节点数据丢失,Spark 会计算丢失分区的数据。持久化方法persist

Python

>>> import pyspark.StorageLevel
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd1 = rdd.map(lambda x : x * x)
>>> rdd1.persist(StorageLevel.MEMORY_ONLY)

下表是各个缓存级别及说明:

级别 使用空间 CPU时间 是否内存中 是否磁盘上 备注
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK 中等 部分 部分 如果数据在内存放不下,则溢写到磁盘
MEMORY_AND_DISK_SER 部分 部分 如果数据在内存放不下,则溢写到磁盘。在内存中存放序列化的数据
DISK_ONLY

如果有必要在存储级别的末尾加上 '_2' 来把持久化数据存为两份

相关文章

网友评论

    本文标题:Spark06 RDD 行动算子

    本文链接:https://www.haomeiwen.com/subject/zgblahtx.html