弹性分布式数据集(RDD Resilient Distributed Dataset)是不可变JVM对象的分布式集合,允许您非常快速地执行计算,并且它们是Apache Spark的核心。
顾名思义,数据集是分布式的;它根据一些密钥分成块并分发到执行程序节点。这样做可以非常快速地对这些数据集运行计算。RDD跟踪(日志)应用于每个块的所有转换,以加快计算速度,并在出现问题并且丢失部分数据时提供回退;在这种情况下,RDD可以重新计算数据。这是防止数据丢失的另一道防线,是数据复制的补充。
本章内容:
- RDD的内部工作
- 创建RDD
- 全局与本地范围
- 转换
- 操作
RDD的内部工作方式
RDD并行运行。这是在Spark中工作的最大优势:每个转换都是并行执行的,以便大幅提高速度。
对数据集的转换是惰性的。这意味着只有在调用数据集上的操作时才会执行任何转换。这有助于Spark优化执行。例如,考虑以下非常常见的步骤,分析师通常会这样做以熟悉数据集:
1.计算某列中不同值的出现次数。
2.选择以A开头的值。
3.将结果输出到屏幕上。
首先使用.map(lambda v: (v, 1))方法映射A的值,然后选择那些以'A'开头的记录(使用.filter(lambda val: val.startswith('A')))。如果我们调用.reduceByKey(operator.add)方法,它将减少数据集并添加(在此示例中,计算)每个键的出现次数。所有这些步骤都会转换数据集。
然后调用.collect()方法来执行这些步骤。此步骤是对我们的数据集的操作 - 它最终计算数据集的不同元素。实际上,该操作可能会颠倒转换的顺序并在映射之前首先过滤数据,从而将较小的数据集传递给reducer。
如果您还不了解这些命令,请不要担心 - 我们将在本章后面详细解释它们。
创建RDD
在PySpark中有两种创建RDD的方法:parallelize方法传入集合(列表或一些元素的数组):
data = sc.parallelize([('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12),('Amber', 9)])
或者您可以引用位于本地或外部位置的文件(或多个文件),下面我们使用Mortality数据集VS14MORT.txt文件(2016年7月31日访问),数据集说明参见Record_Layout_2014.pdf,数据集下载VS14MORT.txt.gz
data_from_file = sc.textFile('/home/andrew/code/meil/VS14MORT.txt.gz', 4)
最后一个参数指定数据集分成的区数。一般是2-4个分区。
Spark可以从多种文件系统中读取:本地文件系统,如NTFS,FAT或Mac OS Extended(HFS +),或分布式文件系统,如HDFS,S3,Cassandra等。注意路径不能包含特殊字符[]等。
支持多种数据格式:可以使用JDBC驱动程序读取文本,parquet,JSON,Hive表和关系数据库中的数据。Spark可以自动使用压缩数据集(如前面示例中的Gzipped)。
根据数据的读取方式,保存数据的对象的表示方式略有不同。当我们.paralellize(...)一个集合时,从文件读取的数据表示为MapPartitionsRDD而不是ParallelCollectionRDD。
- Schema(模式)
RDD是无模式数据结构(与DataFrame不同,我们将在下一章讨论)。比如可以并行如下数据:
In [9]: data_heterogenous = sc.parallelize([('Ferrari', 'fast'),{'Porsche': 100000},['Spain','visited', 4504]]).collect()
In [10]: data_heterogenous[1]['Porsche']
Out[10]: 100000
collect()方法将RDD的所有元素返回给驱动程序,并将其序列化为列表。
- 从文件读取
从文本文件读取时,文件中的每一行都构成RDD的元素。
In [11]: data_from_file.take(1)
Out[11]: [' 1 2101 M1087 432311 4M4 2014U7CN I64 238 070 24 0111I64 01 I64 01 11 100 601']
- Lambda表达式
我们将从data_from_file中提取有用信息。
需要注意一点。定义纯Python方法会降低应用程序的速度,因为Spark需要在Python解释器和JVM之间不断地来回切换。要尽量使用内置的Spark函数。
In [12]:
def extractInformation(row):
import re
import numpy as np
selected_indices = [
2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
19,21,22,23,24,25,27,28,29,30,32,33,34,
36,37,38,39,40,41,42,43,44,45,46,47,48,
49,50,51,52,53,54,55,56,58,60,61,62,63,
64,65,66,67,68,69,70,71,72,73,74,75,76,
77,78,79,81,82,83,84,85,87,89
]
'''
Input record schema
schema: n-m (o) -- xxx
n - position from
m - position to
o - number of characters
xxx - description
1. 1-19 (19) -- reserved positions
2. 20 (1) -- resident status
3. 21-60 (40) -- reserved positions
4. 61-62 (2) -- education code (1989 revision)
5. 63 (1) -- education code (2003 revision)
6. 64 (1) -- education reporting flag
7. 65-66 (2) -- month of death
8. 67-68 (2) -- reserved positions
9. 69 (1) -- sex
10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated
11. 71-73 (3) -- number of units (years, months etc)
12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)
13. 75-76 (2) -- age recoded into 52 categories
14. 77-78 (2) -- age recoded into 27 categories
15. 79-80 (2) -- age recoded into 12 categories
16. 81-82 (2) -- infant age recoded into 22 categories
17. 83 (1) -- place of death
18. 84 (1) -- marital status
19. 85 (1) -- day of the week of death
20. 86-101 (16) -- reserved positions
21. 102-105 (4) -- current year
22. 106 (1) -- injury at work
23. 107 (1) -- manner of death
24. 108 (1) -- manner of disposition
25. 109 (1) -- autopsy
26. 110-143 (34) -- reserved positions
27. 144 (1) -- activity code
28. 145 (1) -- place of injury
29. 146-149 (4) -- ICD code
30. 150-152 (3) -- 358 cause recode
31. 153 (1) -- reserved position
32. 154-156 (3) -- 113 cause recode
33. 157-159 (3) -- 130 infant cause recode
34. 160-161 (2) -- 39 cause recode
35. 162 (1) -- reserved position
36. 163-164 (2) -- number of entity-axis conditions
37-56. 165-304 (140) -- list of up to 20 conditions
57. 305-340 (36) -- reserved positions
58. 341-342 (2) -- number of record axis conditions
59. 343 (1) -- reserved position
60-79. 344-443 (100) -- record axis conditions
80. 444 (1) -- reserve position
81. 445-446 (2) -- race
82. 447 (1) -- bridged race flag
83. 448 (1) -- race imputation flag
84. 449 (1) -- race recode (3 categories)
85. 450 (1) -- race recode (5 categories)
86. 461-483 (33) -- reserved positions
87. 484-486 (3) -- Hispanic origin
88. 487 (1) -- reserved
89. 488 (1) -- Hispanic origin/race recode
'''
record_split = re\
.compile(
r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' +
r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' +
r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +
r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +
r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' +
r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' +
r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
try:
rs = np.array(record_split.split(row))[selected_indices]
except:
rs = np.array(['-99'] * len(selected_indices))
return rs
# return record_split.split(row)
In [13]: data_from_file_conv = data_from_file.map(extractInformation)
...: data_from_file_conv.map(lambda row: row).take(1)
Out[13]:
[array(['1', ' ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
' ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
'238', '070', ' ', '24', '01', '11I64 ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', '01',
'I64 ', ' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ', '01', ' ',
' ', '1', '1', '100', '6'], dtype='<U40')]
全局与本地范围
Spark固有并行性,可以以两种模式运行:本地和集群。当您在本地运行Spark时,您的代码可能与集群没有太大区别。群集模式下,当提交作业执行时,作业将发送到驱动程序(或主节点)。驱动程序节点创建DAG(参见第1章,
了解作业并确定哪个执行者(或工作者)节点将运行特定任务。
然后,驱动程序指示工作人员执行他们的任务,并在完成后将结果返回给驱动程序。然而,在此之前,驱动程序准备每个任务的闭包:驱动程序上存在一组变量和方法,供工作人员在RDD上执行其任务。
每个执行程序都从驱动程序中获取变量和方法的副本。如果,在运行任务时,执行程序会更改这些变量或覆盖方法,
它不会影响其他执行者的副本或驱动程序的变量和方法。这可能会导致一些意外行为和运行时错误,有时可能很难追查。
spark在单机的性能,比pandas,对中小数据集,起码差了1个数量级。
更多参考:http://spark.apache.org/docs/latest/programming-guide.html#local-vs-cluster-modes
转换
转换可以塑造数据集。其中包括对数据集中的值进行映射,过滤,连接和转码。在本节中,我们将展示RDD上可用的一些转换。
由于空间限制,我们仅包括最常用的转换和操作。对于一整套可用的方法,我们建议您查看PySpark关于RDD的文档http://spark.apache.org/docs/latest/api/python/pyspark。
- map 映射
In [15]: data_2014.take(10)
Out[15]: [2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, -99]
In [16]: data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))
...: data_2014_2.take(10)
Out[16]:
[('2014', 2014),
('2014', 2014),
('2014', 2014),
('2014', 2014),
('2014', 2014),
('2014', 2014),
('2014', 2014),
('2014', 2014),
('2014', 2014),
('-99', -99)]
- filter 过滤
In [19]: data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')
In [20]: data_filtered.count()
Out[20]: 6
- flatMap 扁平映射
In [21]: data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
In [22]: data_2014_flat.take(10)
Out[22]: ['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]
- flatMap 扁平映射
In [21]: data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
In [22]: data_2014_flat.take(10)
Out[22]: ['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]
- distinct 唯一值
In [25]: distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
In [26]: distinct_gender
Out[26]: ['M', 'F', '-99']
开销比较大,慎用
- sample 取样
.sample(...)方法返回数据集中的随机样本。 第一个参数指定采样是否应该替换,第二个参数定义要返回的数据的分数,第三个参数是伪随机数生成器的种子:
在此示例中,我们从原始数据集中选择了10%的随机样本。
In [27]: fraction = 0.1
In [28]: data_sample = data_from_file_conv.sample(False, fraction, 666)
In [29]: data_sample.take(1)
Out[29]:
[array(['1', ' ', '5', '1', '01', 'F', '1', '082', ' ', '42', '22', '10',
' ', '4', 'W', '5', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I251',
'215', '063', ' ', '21', '02', '11I350 ', '21I251 ', ' ',
' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', '02',
'I251 ', 'I350 ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ', '28', ' ',
' ', '2', '4', '100', '8'], dtype='<U40')]
- leftOuterJoin 左连接
就像在SQL世界中一样,根据在两个数据集中找到的值连接两个RDD,并返回左RDD中的记录,其中右侧记录附加在两个RDD匹配的位置:
这是另一种昂贵的方法,应该谨慎使用,并且只在必要时使用,因为它会使数据混乱,从而导致性能下降。
你在这里看到的是来自RDD rdd1的所有元素及其来自RDD rdd2的相应值。如您所见,值'a'在rdd3中显示两次,而'a'在RDD rdd2中显示两次。来自rdd1的值b仅显示一次,并与来自rdd2的值“6”连接。缺少两件事:
来自rdd1的值'c'在rdd2中没有对应的键,因此返回的元组中的值显示为None,并且,因为我们执行了左外连接,
rdd2的值'd'按预期消失了。
如果我们使用.join(...)方法,我们只得到'a'和'b'的值,因为这两个值在这两个RDD之间相交。
另一个有用的方法是.intersection(...),它返回两个RDD中相同的记录。执行以下代码:
In [30]: rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
In [31]: rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
In [32]: rdd3 = rdd1.leftOuterJoin(rdd2)
In [33]: rdd3.take(5)
Out[33]: [('b', (4, '6')), ('c', (10, None)), ('a', (1, 4)), ('a', (1, 1))]
In [34]: rdd4 = rdd1.join(rdd2)
In [35]: rdd4.collect()
Out[35]: [('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]
In [36]: rdd5 = rdd1.intersection(rdd2)
In [37]: rdd5.collect()
Out[37]: [('a', 1)]
- repartition 重新分区
In [38]: rdd1 = rdd1.repartition(4)
In [39]: len(rdd1.glom().collect())
Out[39]: 4
重新分区数据集会更改数据集分区的分区数。 应该谨慎使用此功能,并且仅在真正需要时才会使用,因为它会对数据进行混洗,这实际上会导致性能方面的重大影响:
上面的代码打印出4作为新的分区数。
与.collect()相比,.glom()方法生成一个列表,其中每个元素是指定分区中存在的数据集的所有元素的另一个列表; 返回的主列表包含与分区数一样多的元素。
操作 Action
与转换相比,操作在数据集上执行计划任务;完成数据转换后,您可以执行转换。这可能不包含任何转换(例如,即使您没有对RDD进行任何转换,.take(n)也只会从RDD返回n条记录)
或者执行整个转换链。
- take
常用方法。该方法优先于.collect(...),因为它只返回单个数据分区中的n个行。处理大型数据集时,这一点尤为重要:
In [40]: data_first = data_from_file_conv.take(1)
In [41]: data_first
Out[41]:
[array(['1', ' ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
' ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
'238', '070', ' ', '24', '01', '11I64 ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', '01',
'I64 ', ' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ', '01', ' ',
' ', '1', '1', '100', '6'], dtype='<U40')]
In [42]: data_take_sampled = data_from_file_conv.takeSample(False, 1, 667)
In [43]: data_take_sampled
Out[43]:
[array(['2', '17', ' ', '0', '08', 'M', '1', '069', ' ', '39', '19', '09',
' ', '1', 'M', '7', '2014', 'U', '7', 'U', 'N', ' ', ' ', 'I251',
'215', '063', ' ', '21', '06', '11I500 ', '21I251 ', '61I499 ',
'62I10 ', '63N189 ', '64K761 ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', '05',
'I251 ', 'I120 ', 'I499 ', 'I500 ', 'K761 ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ', '01', ' ',
' ', '1', '1', '100', '6'], dtype='<U40')]
- collect方法
此方法将RDD的所有元素返回给驱动程序。类似pandas的all()。
- .reduce(...)方法
In [44]: rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)
Out[44]: 15
In [45]: data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1) # 1个分区计算比较准确
In [46]: works = data_reduce.reduce(lambda x, y: x / y)
In [47]: works
Out[47]: 10.0
In [48]: data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 3)
In [49]: data_reduce.reduce(lambda x, y: x / y)
Out[49]: 0.004
In [50]: data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
In [51]: data_key.reduceByKey(lambda x, y: x + y).collect()
Out[51]: [('b', 4), ('c', 2), ('a', 12), ('d', 5)]
-count 统计元素个数
```python
In [52]: data_reduce.count()
Out[52]: 6
In [53]: len(data_reduce.collect())
Out[53]: 6
In [54]: data_key.countByKey().items()
Out[54]: dict_items([('a', 2), ('b', 2), ('c', 1), ('d', 2)])
- saveAsTextFile 保存为文本文件
In [55]: data_key.saveAsTextFile('data_key.txt')
In [56]: def parseInput(row):
...: import re
...:
...: pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
...: row_split = pattern.split(row)
...:
...: return (row_split[1], int(row_split[2]))
...:
In [57]: data_key_reread = sc.textFile('data_key.txt').map(parseInput)
...: data_key_reread.collect()
Out[57]: [('a', 8), ('d', 2), ('a', 4), ('b', 3), ('c', 2), ('b', 1), ('d', 3)]
每个分区到一个单独的文件:
data_key.saveAsTextFile(
'/Users/drabast/Documents/PySpark_Data/data_key.txt')
要将其读回来,您需要将其解析回来,因为所有行都被视为字符串:
- foreach
In [58]: def f(x):
...: print(x)
...:
In [59]: data_key.foreach(f)
('a', 4)
('b', 1)
('d', 3)
('a', 8)
('d', 2)
('b', 3)
('c', 2)
注意每次的顺序可能不同
小结
总结RDD是Spark的支柱; 这些无模式数据结构是我们将在Spark中处理的最基本的数据结构。
在本章中,我们介绍了通过.parallelize(...)方法以及从文本文件中读取数据,从文本文件创建RDD的方法。 此外,还显示了处理非结构化数据的一些方法。
Spark中的转换是惰性的 - 它们仅在调用动作时应用。 在本章中,我们讨论并介绍了最常用的转换和操作; PySpark文档包含更多http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD。
Scala和Python RDD之间的一个主要区别是速度:Python RDD可能比它们的Scala对应物慢得多。
在下一章中,我们将引导您完成一个数据结构,使PySpark应用程序与Scala编写的数据结构(DataFrames)相同。
网友评论