美文网首页
pyspark常用算子学习笔记

pyspark常用算子学习笔记

作者: 空尘AI | 来源:发表于2020-06-22 09:32 被阅读0次

    本文的pyspark代码是在jupyter中执行的,我的python环境用的是anaconda,版本为3.7 。如果你还没有搭建环境的话可以看我另外两篇文章
    win10安装anaconda
    在Windows上搭建pyspark环境

    image.png

    anaconda中自带jupyter,打开就能愉快的编码了。
    废话不多说,直接上代码

    创建SparkContext

    from  pyspark import SparkConf,SparkContext
    # 创建spark上下文环境
    conf = SparkConf().setMaster("local[*]").setAppName("wordcount")
    sc = SparkContext.getOrCreate(conf)
    
    # 用内存中的数据创建一个RDD
    list = [1,2,3,4,5]
    rdd = sc.parallelize(list)
    print(rdd.collect())
    

    [1, 2, 3, 4, 5]

    # 打印分区数
    print(rdd.getNumPartitions())
    # 重新分区并按分区打印
    print(rdd.repartition(3).glom().collect())
    

    4
    [[], [1, 3], [2, 4, 5]]

    rdd transformation转换算子

    numbersRdd = sc.parallelize(range(1,11))
    print(numbersRdd.collect())
    # map
    mapRdd = numbersRdd.map(lambda x : x*x)
    print(mapRdd.collect())
    #  filter
    filterRdd = numbersRdd.filter(lambda x : x%2 == 0)
    print(filterRdd.collect())
    #  flatMap
    flatMapRdd = numbersRdd.flatMap(lambda x : (x,x*x))
    print(flatMapRdd.collect())
    print(flatMapRdd.distinct())
    #  sample
    sampleRdd = numbersRdd.sample(withReplacement = True,fraction = 0.5,seed=10)
    print(sampleRdd.collect())
    #  sortBy
    sortedRdd = flatMapRdd.sortBy(keyfunc = lambda x : x,ascending = False)
    print(sortedRdd.collect())
    

    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
    [2, 4, 6, 8, 10]
    [1, 1, 2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81, 10, 100]
    PythonRDD[41] at RDD at PythonRDD.scala:53
    [5, 9]
    [100, 81, 64, 49, 36, 25, 16, 10, 9, 9, 8, 7, 6, 5, 4, 4, 3, 2, 1, 1]

    # 在一个job中连续使用转换API
    def odd(x):
        if x%2 == 1:
            return 2*x
        else:
            return x
        
    resultRdd = numbersRdd.map(lambda x: odd(x)).filter(lambda x :x > 6).distinct()
    print(resultRdd.collect())
    

    [8, 10, 14, 18]

    actin算子

    rdd = sc.parallelize(["Hello hello","Hello New York","Hi Beijing"])
    pairRdd = (rdd.flatMap(lambda x : x.split(" "))
              .map(lambda word : word.lower())
              .map(lambda word : (word,1)))
    # groupByKey
    groupRdd = pairRdd.groupByKey()
    print(groupRdd.collect())
    #  reduceByKey
    reduceRdd = pairRdd.reduceByKey(lambda x,y:x+y)
    print(reduceRdd.collect())
    #  sortByKey
    sortedRdd = reduceRdd.sortByKey(ascending = False,keyfunc = lambda x:x)
    print(sortedRdd.collect())
    # aggregateByKey
    zeroValue = 0
    seqFunc = lambda a,b : a+b
    combFunc = lambda x,y : x+y
    aggregateRdd = pairRdd.aggregateByKey(zeroValue,seqFunc,combFunc)
    print(aggregateRdd.collect())
    #  sampleByKey
    sampleRdd = pairRdd.sampleByKey(withReplacement=False,fractions={'hello':0.2,'new':0.1,'hi':0.2,'beijing':0.2,'york':0.1},seed = 81)
    print(sampleRdd.collect())
    

    [('hello', <pyspark.resultiterable.ResultIterable object at 0x0000014CA57BB448>), ('hi', <pyspark.resultiterable.ResultIterable object at 0x0000014CA57BB1C8>), ('beijing', <pyspark.resultiterable.ResultIterable object at 0x0000014CA578CB88>), ('new', <pyspark.resultiterable.ResultIterable object at 0x0000014CA578C108>), ('york', <pyspark.resultiterable.ResultIterable object at 0x0000014CA5C16848>)]
    [('hello', 3), ('hi', 1), ('beijing', 1), ('new', 1), ('york', 1)]
    [('york', 1), ('new', 1), ('hi', 1), ('hello', 3), ('beijing', 1)]
    [('hello', 3), ('hi', 1), ('beijing', 1), ('new', 1), ('york', 1)]
    [('hello', 1), ('york', 1), ('beijing', 1)]

    两个RDD之间使用的算子

    rdd1 = sc.parallelize([1,2,3])
    rdd2 = sc.parallelize([2,3,4])
    # 并集union
    unionRdd = rdd1.union(rdd2)
    print(unionRdd.collect())
    # 交集 intersection
    intersectionRdd = rdd1.intersection(rdd2)
    print(intersectionRdd.collect())
    #  差集substract 
    subtractRdd = rdd1.subtract(rdd2)
    print(subtractRdd.collect())
    # 笛卡尔积cartesian
    cartesianRdd = rdd1.cartesian(rdd2)
    print(cartesianRdd.collect())
    

    [1, 2, 3, 2, 3, 4]
    [2, 3]
    [1]
    [(1, 2), (1, 3), (1, 4), (2, 2), (2, 3), (2, 4), (3, 2), (3, 3), (3, 4)]

    rdd1 = sc.parallelize([
        ("Bob","Jack"),
        ("Bob","John"),
        ("lown","jane"),
        ("Boss","Mary")
    ])
    rdd2 = sc.parallelize([
        ("Bob",10),
        ("Boss",7),
        ("hello",6)
    ])
    # 内连接
    innerRdd = rdd1.join(rdd2)
    print(innerRdd.collect())
    # 左连接
    leftRdd = rdd1.leftOuterJoin(rdd2)
    print(leftRdd.collect())
    # 右连接
    rigthRdd = rdd1.rightOuterJoin(rdd2)
    print(rigthRdd.collect())
    # 全连接
    fullRdd = rdd1.fullOuterJoin(rdd2)
    print(fullRdd.collect())
    

    [('Boss', ('Mary', 7)), ('Bob', ('Jack', 10)), ('Bob', ('John', 10))]
    [('lown', ('jane', None)), ('Boss', ('Mary', 7)), ('Bob', ('Jack', 10)), ('Bob', ('John', 10))]
    [('hello', (None, 6)), ('Boss', ('Mary', 7)), ('Bob', ('Jack', 10)), ('Bob', ('John', 10))]
    [('lown', ('jane', None)), ('hello', (None, 6)), ('Boss', ('Mary', 7)), ('Bob', ('Jack', 10)), ('Bob', ('John', 10))]

    wordcount词频统计

    # 读取本地文件系统
    path = "file:///E:\spark-2.4.6-bin-hadoop2.7\data\mllib\sample_fpgrowth.txt"
    rdd = sc.textFile(path)
    print(rdd.collect())
    
    result = (rdd.flatMap(lambda x : x.split(" "))
              .map(lambda word : (word,1))
              .repartition(10)
              .reduceByKey(lambda a,b: a+b)
             )
    print(result.collect())
    

    ['r z h k p', 'z y x w v u t s', 's x o n r', 'x z y m t s q e', 'z', 'x z y r q t p']
    [('u', 1), ('m', 1), ('y', 3), ('k', 1), ('t', 3), ('o', 1), ('n', 1), ('e', 1), ('r', 3), ('h', 1), ('p', 2), ('s', 3), ('x', 4), ('q', 2), ('z', 5), ('w', 1), ('v', 1)]

    读写hdfs

    # 读取hdfs文件,生成RDD,spark conf中不加入hdfs相关的两个配置文件的话默认是本地
    path = "hdfs://110.141.77.118:8020/user/sjmt_ml/data/mllib/sample_fpgrowth.txt"
    rdd = sc.textFile(path)
    # print(rdd.collect())
    # 词频统计
    result = (rdd.flatMap(lambda x : x.split(" "))
              .map(lambda word : (word,1))
              .repartition(10)
              .reduceByKey(lambda a,b: a+b)
             )
    result.saveAsTextFile("/user/sjmt_ml/result/wc" + datetime.now().strftime('%Y-%m-%d'))
    
    def func(iter):
        for i in iter:
            return i
        
    result.mapPartitions(lambda iter : func(iter)).collect()    
    

    ['r z h k p', 'z y x w v u t s', 's x o n r', 'x z y m t s q e', 'z', 'x z y r q t p']

    相关文章

      网友评论

          本文标题:pyspark常用算子学习笔记

          本文链接:https://www.haomeiwen.com/subject/ueotfktx.html