4 Spark Core

作者: 7125messi | 来源:发表于2018-01-09 22:33 被阅读71次

    1 Spark Core内模块功能

    API:JAVA、Python和R;

    BroadCast:广播变量的实现;

    Deploy:Spark部署与启动运行的实现;

    Executor:Worker节点负责计算部分的实现;

    Metrics:运行时状态监控的实现;

    Network:集群通信的实现;

    Partial:近似评估

    Serializer:序列化模块;

    Storage:存储模块;

    UI:监控界面的代码逻辑实现。

    2 Spark Core外模块功能

    Spark SQL/Spark MLlib/Spark ML/Spark Streaming/Spark GraphX/Spark on YARN

    弹性分布式数据集(RDD)不仅是一组不可变的JVM(JAVA虚拟机)对象的分布集,可以让你执行高速运算,而且是Apache Spark的核心。

    RDD 并行操作,Spark的最大优势是:每个转换并行执行,从而大大提高速度。

    RDD是一种无schema的数据结构,这与下一章节的DataFrame不同,我们可以混合使用任何类型的数据结构:tuple,dist,list等。

    Lambda表达式的使用:定义纯Python方法会降低应用程序的速度,因为Spark需要在Python解释器和JVM之间连续切换,这是一个开销大的操作,所以我们需要尽量使用Spark内置的功能函数。

    3 RDD的特点

    (1)不可变性:RDD是一种不可变的数据结构,一旦创建,它将不可以在原地修改,一旦修改RDD的操作都会返回一个新的RDD。

    (2)分片:RDD表示的是一组数据的分区。这些分区分布在多个集群节点上,然而当Spark在单个节点上运行时,所有的分区数据都会在当前节点上。Spark存储RDD的分区和数据集物理分区之间关系的映射关系。RDD是各个分布式数据源之中数据的一个抽象,它通常表示分布在多个集群节点上的分区数据。例如HDFS将数据分片或分块存储在集群中,默认情况下,一个RDD分区对应一个HDFS文件分片。

    (3)容错性

    (4)接口:RDD是一个处理数据的接口。

    (5)强类型:RDD类有一个参数用于表示类型,RDD可以表示不同类型的数据。RDD可以表示同一类型数据的分布式集合,包括Integer、Long、Float、Double和String等。

    (6)驻留在内存中:对于一个缓存在内存中的RDD进行操作比操作没缓存的RDD要快很多。

    3.1 创建RDD

    PySpark中,创建RDD的方法有两种:

    方法一:通过元组创建
    parallelize很少用于生产上,一般用于学习Spark

    data1 = sc.parallelize(("a",2))
    type(data1)
    

    输出结果:pyspark.rdd.RDD

    data2 = sc.parallelize([
    ('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12),('Amber', 9)
    ])
    
    data2.collect()
    

    输出结果:[('Amber', 22), ('Alfred', 23), ('Skye', 4), ('Albert', 12), ('Amber', 9)]

    data_heterogenous = sc.parallelize([('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain','visited', 4504]]).collect()
    data_heterogenous
    

    输出结果:[('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain', 'visited', 4504]]

    data_heterogenous[1]['Porsche']
    

    输出结果:100000

    方法二:通过读取文件创建

    (1)textFile方法

    用于从文本文件创建RDD实例,可以从多种数据源读取数据,包括单个文件、同一目录下(本地或HDFS)的多个文件、或者其他Hadoop支持的存储系统文件,返回一个RDD,这个RDD代表的数据集每个元素都是一个字符串,每个字符串代表输入文件的一行数据。

    textFile方法也可以读取压缩文件中的数据,参数中可以存在通配符,用于从一个目录中读取多个文件,例如:

    rdd = sc.textFile('hdfs://namenode:9000/path/to/directory/*.gz')
    

    textFile方法第二个可选参数,它用于指定分区的个数,默认情况下,Spark为每一个文件分块创建一个分区,可以设置成一个更大的数字从而提高并行化程度,但是设置成一个小于文件分块数的数字是不可以的。

    rdd = sc.textFile('file:///root/ydzhao/PySpark/Chapter02/VS14MORT.txt.gz', 4)
    

    (2)wholeTextFiles方法

    读取目录下的所有文本文件,然后返回一个键值型(key-value)RDD。返回RDD中的每一个键值对对应一个文件,键为文件路径,值为文件的内容,可以从多种数据源读取数据,包括单个文件、同一目录下(本地或HDFS)的多个文件、或者其他Hadoop支持的存储系统文件。

    例如:

    x = sc.wholeTextFiles("hdfs:///sh/signaling/2017/05/*")
    x.take(2)
    
    [('hdfs://meihui/user/ydzhao/2017/05/01/TRAFF_20170501233101.txt', '2996DCD4C52E6D,,,,'),
    
     ('hdfs://meihui/user/ydzhao/2017/05/01/TRAFF_20170501233102.txt',
    
     'E4AE800797AD7E3,,,,')]
    
    # 读取本地文件
    data_from_file = sc.textFile('file:///root/ydzhao/PySpark/Chapter02/VS14MORT.txt.gz', 4)
    data_from_file.count()
    

    输出结果:2631171

    # 读取HDFS文件 
    data_from_hdfs = sc.textFile('/user/ydzhao/PySpark/Chapter02/jupyter_notebook.txt', 4)
    data_from_hdfs.count()
    

    输出结果:253637566

    一个综合案例:

    x=sc.textFile("/sh/signaling/2017/05/*/")
    from operator import add
    x.map(lambda line:(1,line)).\
    mapValues(lambda s:(s.split(','))[1][0:12]).\
    map(lambda ss:(ss[1],ss[0])).\
    reduceByKey(add). \
    sortByKey(true,31).\
    saveAsTextFile('/user/ydzhao/appresult/201705opt')
    

    3.2 Transformations转换操作

    (1)map:一个高阶方法,它把一个函数作为它的参数,并把这个函数作用在原RDD的每个元素上,从而创建一个新的RDD实例。

    data_2014 = data_from_file_conv.map(lambda row: int(row[16]))
    data_2014.take(10)
    

    输出结果:[2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, -99]

    data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))
    data_2014_2.take(10)
    

    输出结果:

    [('2014', 2014),
    ('2014', 2014),
    ('2014', 2014),
    ('2014', 2014),
    ('2014', 2014),
    ('2014', 2014),
    ('2014', 2014),
    ('2014', 2014),
    ('2014', 2014),
    ('-99', -99)]
    

    (2)filter:高阶方法,它把一个布尔函数作为它的参数,并把这个函数作用原RDD的每个元素上,从而创建一个新的RDD实例。一个布尔函数只有一个参数作为输入,返回true或false。filter方法返回一个新的RDD实例,这个RDD实例代表的数据集由布尔函数返回true的元素构成。新的RDD实例代表的数据集是原RDD的子集。

    data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')
    data_filtered.count()
    

    输出结果:6

    (3)flatMap:高阶方法,它把一个函数作为它的参数,这个函数处理原RDD中每个元素返回一个序列,扁平化这个序列的集合得到一个数据集,flatMap方法返回的RDD就代表这个数据集。

    data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
    data_2014_flat.take(10)
    

    输出结果:['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]

    (4)distinct

    distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
    distinct_gender
    ['-99', 'M', 'F']
    

    (5)sample:返回原RDD数据集的一个抽样子集,它拥有三个参数。

    第一个参数指定是有放回抽样还是无放回抽样;

    第二个参数指定抽样比例;

    第三个参数是可选的,指定抽样的随机种子数。

    fraction = 0.1
    data_sample = data_from_file_conv.sample(False, fraction, 666)
    data_sample.take(1)
    [array(['1', ' ', '5', '1', '01', 'F', '1', '082', ' ', '42', '22', '10',
    
            '  ', '4', 'W', '5', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I251',
    
            '215', '063', '  ', '21', '02', '11I350 ', '21I251 ', '      ',
    
            '      ', '      ', '      ', '      ', '      ', '      ',
    
            '      ', '      ', '      ', '      ', '      ', '      ',
    
            '      ', '      ', '      ', '      ', '      ', '02',
    
            'I251 ', 'I350 ', '    ', '    ', '    ', '    ', '    ',
    
            '    ', '    ', '    ', '    ', '    ', '    ', '    ',
    
            '    ', '    ', '    ', '    ', '    ', '    ', '28', ' ',
    
            ' ', '2', '4', '100', '8'],
    
          dtype='
    print('Original dataset: {0}, sample: {1}'.format(data_from_file_conv.count(), data_sample.count()))
    

    输出结果:Original dataset: 2631171, sample: 263247

    (6)intersection

    rdd5 = rdd1.intersection(rdd2)
    rdd5.collect()
    

    输出结果:[('a', 1)]

    (7)union

    rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
    rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
    rdd3 = rdd1.union(rdd2)
    rdd3.collect()
    

    输出结果:[('a', 1), ('b', 4), ('c',10),('a', 4), ('a', 1), ('b', '6'), ('d', 15)]

    (8)subtract:以一个RDD实例作为输入,返回一个新RDD实例,这个新的RDD实例代表的数据集由那些存在于原RDD实例中但不在输入RDD实例中的元素构成。

    rdd1 = sc.parallelize([(1,4,10)])
    rdd2 = sc.parallelize([(1,4,6)])
    rdd3 = rdd1.subtract(rdd2)
    rdd3.collect()
    

    输出结果:[10]

    (9)repartition:把一个整数作为参数,返回分区数等于这个参数的RDD实例,有助于提高Spark的并行能力,它会重新分布数据,它是一个耗时操作。(高开销)

    rdd1 = rdd1.repartition(4)
    len(rdd1.glom().collect())
    

    输出结果:4

    (10)coalesce:用于减少RDD的分区数量,它把分区数作为参数,返回分区数等于这个参数的RDD实例。但是,使用coalesce方法时要小心,因为减少了RDD的分区数意味着降低了Spark的并行能力,它通常用于合并小分区。例如,使用filter操作之后,RDD可能会有很多小分区,这种情况下,减少分区数提升性能。(高开销)
    键值对型RDD的转换

    (11)keys:返回只由原RDD中的键构成的RDD

    rdd  = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
    rdd.keys
    

    (12)values:返回只由原RDD中的值构成的RDD

    rdd  = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
    rdd.values
    

    (13)mapValues(注意没有mapKeys转换),与map方法不同的是,不同点在于它把作为参数的函数作用在原RDD的值上,原RDD的键都没有变,返回的RDD和原RDD都拥有相同的键。

    rdd  = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
    rdd1 = rdd.mapValues(lambda v:v*2)
    rdd1.collect()
    

    (14)leftOuterJoin(高开销)

    rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
    rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
    rdd3 = rdd1.leftOuterJoin(rdd2)
    rdd3.,collect()
    

    输出结果:[('a', (1, 4)), ('a', (1, 1)), ('c', (10, None)), ('b', (4, '6'))]

    (15)subtractByKey:把一个键值对型RDD作为输入参数,返回一个键值对RDD,这个键值对RDD的键都是只存在于原RDD中但是不存在于输入RDD中。

    (16)groupByKey:返回一个由二元组构成的RDD,二元组的第一个元素是原RDD的键,第二个元素是一个集合,集合是由键对应的所有值构成。与groupBy有别,不需要生成键的函数作为输入参数。

    注意,应当尽量避免groupByKey转换操作,是一个耗时操作,可能对数据进行shuffle操作,大多数情况都有不适用groupByKey的替代方案。(高开销)

    (17)groupBy:高阶方法,它将原RDD中的元素按照用户定义的标准分组从而组成一个RDD。它把一个函数作为它的参数,这个函数为原RDD中的每一个元素生成一个键。groupBy把这个函数作用在原RDD的每一个元素上,然后返回一个由二元组构成的新RDD实例,每个二元组的第一个元素是函数生成的键值,第二个元素是对应这个键的所有原RDD元素的集合,其中,键和原RDD元素的对应关系由那个作为参数的函数决定。

    注意,groupBy是一个耗时的转换操作,需要对数据进行shuffle操作。(高开销)

    3.3 Action动作操作(结果返回给驱动程序)

    (1)collect:返回一个数组,这个数组由原RDD中的元素构成。在使用这个方法的时候需要小心,因为它把worker节点的数据移给了驱动程序driver。如果操作一个有大数据集的RDD,它有可能导致驱动程序崩溃。(高开销)

    (2)count:返回RDD中的元素的个数

    data_reduce.count()
    

    (3)countByValue:返回原RDD中每个元素的个数,返回是一个map类实例,键为元素的值,值为该元素的个数。

    (4)first:返回原RDD中的第一个元素。

    (5)take:输入参数为一个整数N,返回一个由原RDD中前N个元素构成的RDD。

    (7)top:返回一个由原RDD中前N小的元素构成的RDD。

    (8)max/min

    (9)reduce:对原RDD的元素做汇总操作,汇总的时候满足结合律和交换律的二元操作符。

    rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)
    data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)
    works = data_reduce.reduce(lambda x, y: x / y)
    

    键值对型RDD的动作操作
    (1)reduceByKey:它把满足结合律的二元操作符当作输入参数,它把这个操作符作用于相同键的值上。reduceByKey方法可以用于对同一键对应的值进行汇总操作(求和、求乘积、求max,min)。
    基于键的汇总操作、合并操作,reduceByKey比groupByKey更加适合。

    data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
    data_key.reduceByKey(lambda x, y: x + y).collect()
    from operator import add
    data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
    data_key.reduceByKey(add).collect()
    

    (2)countByKey:用于统计原RDD每个键的个数,它返回一个map类实例

    data_key.countByKey().items()
    

    数值对型RDD的动作操作
    如果RDD的元素类型为Integer、Long、Float和Double,则这样的RDD为数值型RDD,统计分析很有用。

    • mean

    • stdev

    • sum

    • variance

    保存RDD
    saveAsTextFile:将原RDD中的元素保存在指定目录中,这个目录位于任何hadoop支持的存储系统中。每一个RDD中的元素都用字符串表示并另存为文本中的一行。

    # 保存到本地文件系统
    data_key.saveAsTextFile('file:///root/***/export')
    
    # 保存到HDFS
    data_key.saveAsTextFile('hdfs://***/user/***/export')
    

    注意:上述方法均把目录的名字作为输入参数,然后在这个目录为每个RDD分区创建一个文件。这种设计不仅高效而且可容错。因为每个分区被存成一个文件,所以Spark在保存RDD的时候可以启动多个任务,并行执行,将数据写入文件系统中。从而保证写入数据的过程可容错。一旦有一个将分区写入文件的任务失败了,Spark可以再启动一个任务,重写刚才失败任务创建的文件。

    4 惰性操作

    RDD的创建和转换是惰性操作,只有调用动作函数或者保存RDD时才会真正触发计算。惰性转换使得Spark可以高效执行RDD计算,直到Spark应用需要操作结果时才会进行计算,Spark可以利用这一点优化RDD的操作,使得操作流水线化,而且避免了在网络间不必要的数据传输。

    5 缓存

    除了把数据驻留在内存中,缓存对于RDD也十分重要。创建RDD有两种方式,从存储系统读取或者应用其他现存RDD的转换操作。默认情况下,当一个RDD的操作方法被调用时,Spark会根据它的父RDD来创建这个RDD,有可能导致父RDD的创建,如此往复,可能一直持续到Spark找到根RDD,而后Spark通过从存储系统读取数据的方式创建根RDD。操作方法被调用一次,上述过程就会执行一次。每次调用操作方法,Spark都会遍历这个调用者RDD的血统数,执行所有的转换操作来创建它。

    如果一个RDD缓存了,Spark会执行到目前为止的所有转换操作并为这个RDD创建一个检查点。具体来说,这只会在第一次在一个缓存的RDD上调用某操作的时候发生。类似于转换方法,缓存方法也是惰性的。

    如果一个应用缓存了RDD,Spark并不是立即执行计算并把它存储在内存中。Spark只有在第一次缓存的RDD上调用某操作时才会将RDD物化在内存中。而且第一次缓存操作并不会从中受益,后续的操作才会从缓存中受益。因为它们不需要再执行从存储系统中读取数据开始的一系列操作,所以运行将会快很多。当然如果只使用一次数据应用去使用缓存就没有意义了。所以缓存对于那些同样数据做多次迭代的应用才能从缓存中受益。

    RDD的缓存方法

    cache和persist

    (1)cache :把RDD存储在集群中执行者的内存中,实际上是将RDD物化在内存中。

    (2)persist:通用版的cache方法,把RDD存储在内存中或者硬盘上或者二者皆有。它的输入参数是存储等级,这是一个可选参数,如果调用persist方法而没有提供参数,那么它的行为类似于cache方法。

    rdd.persist()
    

    可选参数:

    • MEMORY_ONLY

    • DISK_ONLY

    • MEMORY_AND_DISK

    • MEMORY_ONLY_SER:内存消耗和CPU使用之间做的妥协。

    • MEMORY_AND_DISK_SER

    需要注意的是,RDD缓存也是可容错的。

    6 共享变量、广播变量和累加器

    SparkContext类提供了一个叫做broadcast方法用于创建广播变量。

    SparkContext类提供了一个叫做accumulator方法用于创建累加器变量。

    相关文章

      网友评论

        本文标题:4 Spark Core

        本文链接:https://www.haomeiwen.com/subject/fcqnnxtx.html