美文网首页
pyspark--RDD基本操作

pyspark--RDD基本操作

作者: FTDdata | 来源:发表于2021-05-01 08:17 被阅读0次

    spark中的RDD是一个核心概念,RDD是一种弹性分布式数据集,spark计算操作都是基于RDD进行的,本文介绍RDD的基本操作。

    Spark 初始化

    Spark初始化主要是要创建一个SprakContext实例,该实例表示与spark集群的连接。可以通过多种方式创建。

    SparkContext

    直接使用SparkContext类创建一个spark上下文,主要参数是指定masterappName

    from pyspark import SparkContext
    sc = SprakContext(master = 'local[*]',appName='test')
    

    SprakContext的属性

    # spark版本
    sc.version
    '2.4.5'
    
    # python版本
    sc.pythonVer
    '3.7'
    
    # master地址
    sc.master
    'local[*]'
    
    # 应用名字
    sc.appName
    'test'
    
    # 应用id
    sc.applicationId
    'local-1596522649115'
    

    SparkConf

    还可以通过调用SparkConf配置类来生成spark上下文。

    from pyspark import SparkConf, SprakContext
    conf = SparkConf().setMaster('local').setAppName('test')
    sc = SparkContext(conf=conf)
    

    创建RDD

    RDD是spark中的主要数据格式,名称为弹性分布式数据集,可以序列化python对象来得到RDD,或者读取文件。

    序列化

    # parallelize方法序列化python对象为RDD
    rdd = sc.parallelize([('a', 7), ('a', 2), ('b', 2)])
    rdd1 = sc.parallelize([2,5,1,8])
    rdd2 = sc.parallelize([('a', 2), ('d', 1), ('b', 1)])
    rdd3 = sc.parallelize(range(100))
    rdd4 = sc.parallelize([('a', ['x', 'y', 'z']), ('b', ['p', 'r'])])
    

    读取文件

    # 读取本地json文件,返回RDD
    text_file = sc.textFile("e:/a.json")
    

    获取RDD信息

    基本信息

    # 获取rdd的分区数
    rdd.getNumPartitions()
    12
    
    # 获取rdd的key
    rdd.keys().collect()
    ['a', 'a', 'b']
    
    # 获取rdd的value
    rdd.values().collect()
    [7, 2, 2]
    
    # 判断rdd是否为空
    rdd.isEmpty()
    False
    
    sc.parallelize([]).isEmpty()
    True
    

    统计信息

    统计信息包含了基本的统计计算值,如最大值、最小值、平均数、描述统计等。

    # 求和
    rdd3.sum()
    4950
    
    # 最大值
    rdd3.max()
    99
    
    # 最小值
    rdd3.min()
    0
    
    # 均值
    rdd3.mean()
    49.5
    
    # 标准差
    rdd3.stdev()
    28.86607004772212
    
    # 方差
    rdd3.variance()
    833.25
    
    # 分区间计数
    rdd3.histogram(3)
    ([0, 33, 66, 99], [33, 33, 34])
    
    # 描述统计
    rdd3.stats()
    (count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99.0, min: 0.0)
    

    处理RDD

    切片/collect

    # 获取rdd里的所有元素,返回list
    rdd.collect()
    [('a', 7), ('a', 2), ('b', 2)]
    
    # 获取rdd里的元素,返回字典
    rdd.collectAsMap()
    {'a': 2, 'd': 1, 'b': 1}
    
    # 获取开始的2个元素
    rdd.take(2)
    [('a', 7), ('a', 2)]
    
    # 获取第一个位置的元素
    rdd.first()
    ('a', 7)
    
    # 获取降序排序的前3个元素
    rdd3.top(3)
    [99, 98, 97]
    

    计数/count

    # 统计rdd里的元素个数
    rdd.count()
    3
    
    # 按key统计rdd里的元素个数
    rdd.countByKey()
    defaultdict(<class 'int'>, {'a': 2, 'b': 1})
    
    # 按value统计rdd里的元素个数
    rdd.countByValue()
    defaultdict(<class 'int'>, {('a', 7): 1, ('a', 2): 1, ('b', 2): 1})
    

    重采样/sample

    # 对rdd进行重采样
    rdd3.sample(False,0.1,81).collect()
    [4, 27, 28, 41, 49, 53, 58, 85, 93]
    

    过滤/filter

    # 根据key过滤
    rdd.filter(lambda x:'a' in x).collect()
    [('a', 7), ('a', 2)]
    

    去重/distinct

    # 对rdd元素去重
    rdd5.distinct().collect()
    ['a', 7, 2, 'b']
    

    排序/sortBy

    # 升序排序(默认)
    rdd1.sortBy(lambda x:x).collect()
    [1, 2, 5, 8]
    
    # 降序排序
    rdd1.sortBy(lambda x:x,ascending=False).collect()
    [8, 5, 2, 1]
    
    # 对键值对rdd按照key排序
    rdd2.sortByKey().collect()
    [('a', 2), ('b', 1), ('d', 1)]
    

    映射/map

    # map方法对每个元素应用函数
    rdd.map(lambda x: x+(x[0],x[1])).collect()
    [('a', 7, 'a', 7), ('a', 2, 'a', 2), ('b', 2, 'b', 2)]
    
    # flatMap方法,返回的结果会扁平化
    rdd5 = rdd.flatMap(lambda x: x+(x[0],x[1]))
    rdd5.collect()
    ['a', 7, 'a', 7, 'a', 2, 'a', 2, 'b', 2, 'b', 2]
    
    # flatMapValues方法
    rdd4.flatMapValues(lambda x:x).collect()
    [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
    

    迭代/foreach

    def g(x):print(x)
    # foreach方法对所有元素应用函数
    rdd.foreach(x)
    ('a', 7)
    ('a', 2)
    ('b', 2)
    

    简化/reduce

    # reduce方法对rdd进行合并
    rdd.reduce(lambda x,y:x+y)
    ('a', 7, 'a', 2, 'b', 2)
    
    # reduceByKey方法根据key对value进行合并
    rdd.reduceByKey(lambda v1,v2:v1+v2).collect()
    [('a', 9), ('b', 2)]
    

    分组/groupBy

    # groupBy方法对rdd的元素分组
    rdd1.groupBy(lambda x:x%2).mapValues(list).collect()
    [(0, [2, 8]), (1, [5, 1])]
    
    # groupByKey方法对rdd的元素根据key分组
    rdd.groupByKey().mapValues(list).collect()
    [('a', [7, 2]), ('b', [2])]
    

    聚合/aggregate

    # 定义两个聚合函数
    seq_op=lambda x,y:(x[0]+y,x[1]+1)
    comb_op=lambda x,y:(x[0]+y[0],x[1]+y[1])
    
    # aggregate方法聚合rdd
    rdd1.aggregate((0,0),seq_op,comb_op)
    (16, 4)
    
    # aggregateByKey方法根据key聚合rdd
    rdd.aggregateByKey((0,0),seq_op,comb_op).collect()
    [('a', (9, 2)), ('b', (2, 1))]
    
    # fold方法聚合rdd
    rdd1.fold(0,lambda x,y:x+y)
    16
    
    # foldByKey方法根据key聚合rdd
    rdd.foldByKey(0,lambda x,y:x+y).collect()
    [('a', 9), ('b', 2)]
    

    合并/union

    # 调用sc的union方法按顺序合并多个rdd
    sc.union([rdd,rdd2]).collect()
    [('a', 7), ('a', 2), ('b', 2), ('a', 2), ('d', 1), ('b', 1)]
    

    集合/intersection,union,subtract

    # 两个rdd的交集
    rdd.intersection(rdd2).collect()
    [('a', 2)]
    
    # 两个rdd的并集(包含重复元素)
    rdd.union(rdd2).collect()
    [('a', 7), ('a', 2), ('b', 2), ('a', 2), ('d', 1), ('b', 1)]
    
    # rdd对rdd2的补集
    rdd.subtract(rdd2).collect()
    [('a', 7), ('b', 2)]
    
    # 根据key求rdd2对rdd的补集)
    rdd2.subtractByKey(rdd).collect()
    [('d', 1)]
    
    # 两个rdd计算笛卡尔积
    rdd1.cartesian(rdd1).collect()
    [(2, 2), (2, 5), (2, 1), (2, 8), (5, 2), (5, 5), (5, 1), (5, 8), (1, 2), (1, 5), (1, 1), (1, 8), (8, 2), (8, 5), (8, 1), (8, 8)]
    

    保存RDD

    # 保存rdd到本地
    rdd.saveAsTextFile('rdd.txt')
    

    关闭spark

    # 使用stop方法关闭spark context实例
    sc.stop()
    

    运行

    进入spark安装目录下,通过sprak-submit命令运行py文件。

    ./bin/spark-submit example/src/main/python/pi.py
    

    另外,本地开发,可直接通过pyCharm运行。

    相关文章

      网友评论

          本文标题:pyspark--RDD基本操作

          本文链接:https://www.haomeiwen.com/subject/rvtrhltx.html