pySpark工具学习2-弹性分布式数据集(RDD Resili

作者: python测试开发 | 来源:发表于2019-05-22 09:07 被阅读25次

    弹性分布式数据集(RDD Resilient Distributed Dataset)是不可变JVM对象的分布式集合,允许您非常快速地执行计算,并且它们是Apache Spark的核心。

    顾名思义,数据集是分布式的;它根据一些密钥分成块并分发到执行程序节点。这样做可以非常快速地对这些数据集运行计算。RDD跟踪(日志)应用于每个块的所有转换,以加快计算速度,并在出现问题并且丢失部分数据时提供回退;在这种情况下,RDD可以重新计算数据。这是防止数据丢失的另一道防线,是数据复制的补充。

    本章内容:

    • RDD的内部工作
    • 创建RDD
    • 全局与本地范围
    • 转换
    • 操作

    RDD的内部工作方式

    RDD并行运行。这是在Spark中工作的最大优势:每个转换都是并行执行的,以便大幅提高速度。

    对数据集的转换是惰性的。这意味着只有在调用数据集上的操作时才会执行任何转换。这有助于Spark优化执行。例如,考虑以下非常常见的步骤,分析师通常会这样做以熟悉数据集:

    1.计算某列中不同值的出现次数。
    2.选择以A开头的值。
    3.将结果输出到屏幕上。

    首先使用.map(lambda v: (v, 1))方法映射A的值,然后选择那些以'A'开头的记录(使用.filter(lambda val: val.startswith('A')))。如果我们调用.reduceByKey(operator.add)方法,它将减少数据集并添加(在此示例中,计算)每个键的出现次数。所有这些步骤都会转换数据集。

    然后调用.collect()方法来执行这些步骤。此步骤是对我们的数据集的操作 - 它最终计算数据集的不同元素。实际上,该操作可能会颠倒转换的顺序并在映射之前首先过滤数据,从而将较小的数据集传递给reducer。

    如果您还不了解这些命令,请不要担心 - 我们将在本章后面详细解释它们。

    创建RDD

    在PySpark中有两种创建RDD的方法:parallelize方法传入集合(列表或一些元素的数组):

    data = sc.parallelize([('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12),('Amber', 9)])
    

    或者您可以引用位于本地或外部位置的文件(或多个文件),下面我们使用Mortality数据集VS14MORT.txt文件(2016年7月31日访问),数据集说明参见Record_Layout_2014.pdf,数据集下载VS14MORT.txt.gz

    data_from_file = sc.textFile('/home/andrew/code/meil/VS14MORT.txt.gz', 4)
    

    最后一个参数指定数据集分成的区数。一般是2-4个分区。
    Spark可以从多种文件系统中读取:本地文件系统,如NTFS,FAT或Mac OS Extended(HFS +),或分布式文件系统,如HDFS,S3,Cassandra等。注意路径不能包含特殊字符[]等。

    支持多种数据格式:可以使用JDBC驱动程序读取文本,parquet,JSON,Hive表和关系数据库中的数据。Spark可以自动使用压缩数据集(如前面示例中的Gzipped)。

    根据数据的读取方式,保存数据的对象的表示方式略有不同。当我们.paralellize(...)一个集合时,从文件读取的数据表示为MapPartitionsRDD而不是ParallelCollectionRDD。

    • Schema(模式)

    RDD是无模式数据结构(与DataFrame不同,我们将在下一章讨论)。比如可以并行如下数据:

    In [9]: data_heterogenous = sc.parallelize([('Ferrari', 'fast'),{'Porsche': 100000},['Spain','visited', 4504]]).collect()
    In [10]: data_heterogenous[1]['Porsche']
    Out[10]: 100000
    
    

    collect()方法将RDD的所有元素返回给驱动程序,并将其序列化为列表。

    • 从文件读取

    从文本文件读取时,文件中的每一行都构成RDD的元素。

    In [11]:  data_from_file.take(1)
    Out[11]: ['                   1                                          2101  M1087 432311  4M4                2014U7CN                                    I64 238 070   24 0111I64                                                                                                                                                                           01 I64                                                                                                  01  11                                 100 601']
    
    • Lambda表达式

    我们将从data_from_file中提取有用信息。

    需要注意一点。定义纯Python方法会降低应用程序的速度,因为Spark需要在Python解释器和JVM之间不断地来回切换。要尽量使用内置的Spark函数。

    In [12]: 
    def extractInformation(row):
        import re
        import numpy as np
    
        selected_indices = [
             2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
             19,21,22,23,24,25,27,28,29,30,32,33,34,
             36,37,38,39,40,41,42,43,44,45,46,47,48,
             49,50,51,52,53,54,55,56,58,60,61,62,63,
             64,65,66,67,68,69,70,71,72,73,74,75,76,
             77,78,79,81,82,83,84,85,87,89
        ]
    
        '''
            Input record schema
            schema: n-m (o) -- xxx
                n - position from
                m - position to
                o - number of characters
                xxx - description
            1. 1-19 (19) -- reserved positions
            2. 20 (1) -- resident status
            3. 21-60 (40) -- reserved positions
            4. 61-62 (2) -- education code (1989 revision)
            5. 63 (1) -- education code (2003 revision)
            6. 64 (1) -- education reporting flag
            7. 65-66 (2) -- month of death
            8. 67-68 (2) -- reserved positions
            9. 69 (1) -- sex
            10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated
            11. 71-73 (3) -- number of units (years, months etc)
            12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)
            13. 75-76 (2) -- age recoded into 52 categories
            14. 77-78 (2) -- age recoded into 27 categories
            15. 79-80 (2) -- age recoded into 12 categories
            16. 81-82 (2) -- infant age recoded into 22 categories
            17. 83 (1) -- place of death
            18. 84 (1) -- marital status
            19. 85 (1) -- day of the week of death
            20. 86-101 (16) -- reserved positions
            21. 102-105 (4) -- current year
            22. 106 (1) -- injury at work
            23. 107 (1) -- manner of death
            24. 108 (1) -- manner of disposition
            25. 109 (1) -- autopsy
            26. 110-143 (34) -- reserved positions
            27. 144 (1) -- activity code
            28. 145 (1) -- place of injury
            29. 146-149 (4) -- ICD code
            30. 150-152 (3) -- 358 cause recode
            31. 153 (1) -- reserved position
            32. 154-156 (3) -- 113 cause recode
            33. 157-159 (3) -- 130 infant cause recode
            34. 160-161 (2) -- 39 cause recode
            35. 162 (1) -- reserved position
            36. 163-164 (2) -- number of entity-axis conditions
            37-56. 165-304 (140) -- list of up to 20 conditions
            57. 305-340 (36) -- reserved positions
            58. 341-342 (2) -- number of record axis conditions
            59. 343 (1) -- reserved position
            60-79. 344-443 (100) -- record axis conditions
            80. 444 (1) -- reserve position
            81. 445-446 (2) -- race
            82. 447 (1) -- bridged race flag
            83. 448 (1) -- race imputation flag
            84. 449 (1) -- race recode (3 categories)
            85. 450 (1) -- race recode (5 categories)
            86. 461-483 (33) -- reserved positions
            87. 484-486 (3) -- Hispanic origin
            88. 487 (1) -- reserved
            89. 488 (1) -- Hispanic origin/race recode
         '''
    
        record_split = re\
            .compile(
                r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' + 
                r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' + 
                r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +
                r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +
                r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' + 
                r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
                r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
                r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
                r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
                r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
                r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
                r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
                r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
                r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' + 
                r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
        try:
            rs = np.array(record_split.split(row))[selected_indices]
        except:
            rs = np.array(['-99'] * len(selected_indices))
        return rs
    #     return record_split.split(row)
    In [13]: data_from_file_conv = data_from_file.map(extractInformation)
        ...: data_from_file_conv.map(lambda row: row).take(1)
    Out[13]:
    [array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
            '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
            '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
            '       ', '       ', '       ', '       ', '       ', '       ',
            '       ', '       ', '       ', '       ', '       ', '       ',
            '       ', '       ', '       ', '       ', '       ', '01',
            'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
            '     ', '     ', '     ', '     ', '     ', '     ', '     ',
            '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
            ' ', '1', '1', '100', '6'], dtype='<U40')]
    
    

    全局与本地范围

    Spark固有并行性,可以以两种模式运行:本地和集群。当您在本地运行Spark时,您的代码可能与集群没有太大区别。群集模式下,当提交作业执行时,作业将发送到驱动程序(或主节点)。驱动程序节点创建DAG(参见第1章,
    了解作业并确定哪个执行者(或工作者)节点将运行特定任务。

    然后,驱动程序指示工作人员执行他们的任务,并在完成后将结果返回给驱动程序。然而,在此之前,驱动程序准备每个任务的闭包:驱动程序上存在一组变量和方法,供工作人员在RDD上执行其任务。

    每个执行程序都从驱动程序中获取变量和方法的副本。如果,在运行任务时,执行程序会更改这些变量或覆盖方法,
    它不会影响其他执行者的副本或驱动程序的变量和方法。这可能会导致一些意外行为和运行时错误,有时可能很难追查。

    spark在单机的性能,比pandas,对中小数据集,起码差了1个数量级。

    更多参考:http://spark.apache.org/docs/latest/programming-guide.html#local-vs-cluster-modes

    转换

    转换可以塑造数据集。其中包括对数据集中的值进行映射,过滤,连接和转码。在本节中,我们将展示RDD上可用的一些转换。

    由于空间限制,我们仅包括最常用的转换和操作。对于一整套可用的方法,我们建议您查看PySpark关于RDD的文档http://spark.apache.org/docs/latest/api/python/pyspark

    • map 映射
    In [15]: data_2014.take(10)
    Out[15]: [2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, -99]
    
    In [16]: data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))
        ...: data_2014_2.take(10)
    Out[16]:
    [('2014', 2014),
     ('2014', 2014),
     ('2014', 2014),
     ('2014', 2014),
     ('2014', 2014),
     ('2014', 2014),
     ('2014', 2014),
     ('2014', 2014),
     ('2014', 2014),
     ('-99', -99)]
    
    • filter 过滤
    In [19]: data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')
    
    In [20]: data_filtered.count()
    Out[20]: 6
    
    
    • flatMap 扁平映射
    In [21]: data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
    
    In [22]: data_2014_flat.take(10)
    Out[22]: ['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]
    
    • flatMap 扁平映射
    In [21]: data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
    
    In [22]: data_2014_flat.take(10)
    Out[22]: ['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]
    
    • distinct 唯一值
    In [25]: distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
    
    In [26]: distinct_gender
    Out[26]: ['M', 'F', '-99']
    
    

    开销比较大,慎用

    • sample 取样

    .sample(...)方法返回数据集中的随机样本。 第一个参数指定采样是否应该替换,第二个参数定义要返回的数据的分数,第三个参数是伪随机数生成器的种子:

    在此示例中,我们从原始数据集中选择了10%的随机样本。

    In [27]: fraction = 0.1
    
    In [28]: data_sample = data_from_file_conv.sample(False, fraction, 666)
    
    In [29]: data_sample.take(1)
    Out[29]:
    [array(['1', '  ', '5', '1', '01', 'F', '1', '082', ' ', '42', '22', '10',
            '  ', '4', 'W', '5', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I251',
            '215', '063', '   ', '21', '02', '11I350 ', '21I251 ', '       ',
            '       ', '       ', '       ', '       ', '       ', '       ',
            '       ', '       ', '       ', '       ', '       ', '       ',
            '       ', '       ', '       ', '       ', '       ', '02',
            'I251 ', 'I350 ', '     ', '     ', '     ', '     ', '     ',
            '     ', '     ', '     ', '     ', '     ', '     ', '     ',
            '     ', '     ', '     ', '     ', '     ', '     ', '28', ' ',
            ' ', '2', '4', '100', '8'], dtype='<U40')]
    
    
    • leftOuterJoin 左连接

    就像在SQL世界中一样,根据在两个数据集中找到的值连接两个RDD,并返回左RDD中的记录,其中右侧记录附加在两个RDD匹配的位置:

    这是另一种昂贵的方法,应该谨慎使用,并且只在必要时使用,因为它会使数据混乱,从而导致性能下降。
    你在这里看到的是来自RDD rdd1的所有元素及其来自RDD rdd2的相应值。如您所见,值'a'在rdd3中显示两次,而'a'在RDD rdd2中显示两次。来自rdd1的值b仅显示一次,并与来自rdd2的值“6”连接。缺少两件事:
    来自rdd1的值'c'在rdd2中没有对应的键,因此返回的元组中的值显示为None,并且,因为我们执行了左外连接,
    rdd2的值'd'按预期消失了。

    如果我们使用.join(...)方法,我们只得到'a'和'b'的值,因为这两个值在这两个RDD之间相交。

    另一个有用的方法是.intersection(...),它返回两个RDD中相同的记录。执行以下代码:

    In [30]: rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
    
    In [31]: rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
    
    In [32]: rdd3 = rdd1.leftOuterJoin(rdd2)
    
    In [33]: rdd3.take(5)
    Out[33]: [('b', (4, '6')), ('c', (10, None)), ('a', (1, 4)), ('a', (1, 1))]
    
    In [34]: rdd4 = rdd1.join(rdd2)
    
    In [35]: rdd4.collect()
    Out[35]: [('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]
    
    In [36]: rdd5 = rdd1.intersection(rdd2)
    
    In [37]: rdd5.collect()
    Out[37]: [('a', 1)]
    
    
    • repartition 重新分区
    In [38]: rdd1 = rdd1.repartition(4)
    
    In [39]: len(rdd1.glom().collect())
    Out[39]: 4
    
    

    重新分区数据集会更改数据集分区的分区数。 应该谨慎使用此功能,并且仅在真正需要时才会使用,因为它会对数据进行混洗,这实际上会导致性能方面的重大影响:

    上面的代码打印出4作为新的分区数。
    与.collect()相比,.glom()方法生成一个列表,其中每个元素是指定分区中存在的数据集的所有元素的另一个列表; 返回的主列表包含与分区数一样多的元素。

    操作 Action

    与转换相比,操作在数据集上执行计划任务;完成数据转换后,您可以执行转换。这可能不包含任何转换(例如,即使您没有对RDD进行任何转换,.take(n)也只会从RDD返回n条记录)
    或者执行整个转换链。

    • take

    常用方法。该方法优先于.collect(...),因为它只返回单个数据分区中的n个行。处理大型数据集时,这一点尤为重要:

    In [40]: data_first = data_from_file_conv.take(1)
    
    In [41]: data_first
    Out[41]:
    [array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
            '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
            '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
            '       ', '       ', '       ', '       ', '       ', '       ',
            '       ', '       ', '       ', '       ', '       ', '       ',
            '       ', '       ', '       ', '       ', '       ', '01',
            'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
            '     ', '     ', '     ', '     ', '     ', '     ', '     ',
            '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
            ' ', '1', '1', '100', '6'], dtype='<U40')]
    
    In [42]: data_take_sampled = data_from_file_conv.takeSample(False, 1, 667)
    
    In [43]: data_take_sampled
    Out[43]:
    [array(['2', '17', ' ', '0', '08', 'M', '1', '069', ' ', '39', '19', '09',
            '  ', '1', 'M', '7', '2014', 'U', '7', 'U', 'N', ' ', ' ', 'I251',
            '215', '063', '   ', '21', '06', '11I500 ', '21I251 ', '61I499 ',
            '62I10  ', '63N189 ', '64K761 ', '       ', '       ', '       ',
            '       ', '       ', '       ', '       ', '       ', '       ',
            '       ', '       ', '       ', '       ', '       ', '05',
            'I251 ', 'I120 ', 'I499 ', 'I500 ', 'K761 ', '     ', '     ',
            '     ', '     ', '     ', '     ', '     ', '     ', '     ',
            '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
            ' ', '1', '1', '100', '6'], dtype='<U40')]
    
    
    • collect方法

    此方法将RDD的所有元素返回给驱动程序。类似pandas的all()。

    • .reduce(...)方法

    In [44]: rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)
    Out[44]: 15

    In [45]: data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1) # 1个分区计算比较准确

    In [46]: works = data_reduce.reduce(lambda x, y: x / y)

    In [47]: works
    Out[47]: 10.0

    In [48]: data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 3)

    In [49]: data_reduce.reduce(lambda x, y: x / y)
    Out[49]: 0.004

    In [50]: data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)

    In [51]: data_key.reduceByKey(lambda x, y: x + y).collect()
    Out[51]: [('b', 4), ('c', 2), ('a', 12), ('d', 5)]

    
    -count  统计元素个数
    
    ```python
    In [52]: data_reduce.count()
    Out[52]: 6
    
    In [53]: len(data_reduce.collect())
    Out[53]: 6
    
    In [54]: data_key.countByKey().items()
    Out[54]: dict_items([('a', 2), ('b', 2), ('c', 1), ('d', 2)])
    
    • saveAsTextFile 保存为文本文件
    In [55]: data_key.saveAsTextFile('data_key.txt')
    
    In [56]: def parseInput(row):
        ...:     import re
        ...:
        ...:     pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
        ...:     row_split = pattern.split(row)
        ...:
        ...:     return (row_split[1], int(row_split[2]))
        ...:
    
    In [57]: data_key_reread = sc.textFile('data_key.txt').map(parseInput)
        ...: data_key_reread.collect()
    Out[57]: [('a', 8), ('d', 2), ('a', 4), ('b', 3), ('c', 2), ('b', 1), ('d', 3)]
    
    

    每个分区到一个单独的文件:
    data_key.saveAsTextFile(
    '/Users/drabast/Documents/PySpark_Data/data_key.txt')
    要将其读回来,您需要将其解析回来,因为所有行都被视为字符串:

    • foreach
    In [58]: def f(x):
        ...:     print(x)
        ...:
    
    In [59]: data_key.foreach(f)
    ('a', 4)
    ('b', 1)
    ('d', 3)
    ('a', 8)
    ('d', 2)
    ('b', 3)
    ('c', 2)
    

    注意每次的顺序可能不同

    小结

    总结RDD是Spark的支柱; 这些无模式数据结构是我们将在Spark中处理的最基本的数据结构。
    在本章中,我们介绍了通过.parallelize(...)方法以及从文本文件中读取数据,从文本文件创建RDD的方法。 此外,还显示了处理非结构化数据的一些方法。
    Spark中的转换是惰性的 - 它们仅在调用动作时应用。 在本章中,我们讨论并介绍了最常用的转换和操作; PySpark文档包含更多http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
    Scala和Python RDD之间的一个主要区别是速度:Python RDD可能比它们的Scala对应物慢得多。
    在下一章中,我们将引导您完成一个数据结构,使PySpark应用程序与Scala编写的数据结构(DataFrames)相同。

    相关文章

      网友评论

        本文标题:pySpark工具学习2-弹性分布式数据集(RDD Resili

        本文链接:https://www.haomeiwen.com/subject/rkwezqtx.html