美文网首页
3RDD创建

3RDD创建

作者: barriers | 来源:发表于2020-04-02 23:53 被阅读0次

    1键值对rdd的创建

    from pyspark import SparkConf, SparkContext
    
    conf = SparkConf().setMaster("local").setAppName("Myapp")
    sc = SparkContext(conf=conf)
    # 1从文件中加载
    lines = sc.textFile("file:///home/glzt/script/spark/word.txt")
    pariRDD=lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1))
    pairRDD.foreach(print)
    
    # 通过并行集合(列表)创建RDD
    list = ["Hadoop", "Spark", "Hive", "Spark"]
    rdd = sc.parallelize(list)
    pariRDD=rdd.map(lambda word: (word, 1))
    pairRDD.foreach(print)
    

    2常用的RDD键值对转换操作

    常用的键值对转换操作有reduceByKey(func),groupByKey(),keys,values,sortByKey(),mapValues(func),join,combineByKey
    1.reduceByKey(func)使用func函数合并具有相同键的值;
    2.groupByKey()对具有相同键的值进行分组;
    groupByKey是针对每个key进行操作,但只生成一个sequence,groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作;
    reduceByKey用于对每个key对应的多个value进行marge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义;

    list = ["Hadoop", "Spark", "Hive", "Spark"]
    rdd = sc.parallelize(list)
    pariRDD=rdd.map(lambda word: (word, 1))
    pairRDD.reduceByKey(lambda a,b: a+b)
    pairRDD.groupByKey()
    
    words = ['one','two','two','three','one','four','five','seven','seven']
    wordPairsRDD = sc.parallelize(words).map(lambda word:(word,1))
    wordCountWithReduce = wordPairsRDD.reduceByKey(lambda a,b:a+b)
    wordCountWithReduce.foreach(print)
    wordCountWithGroup = wordPairsRDD.groupByKey().map(lambda t: (t[0],sum(t[1])))
    wordCountWithGroup.foreach(print)
    

    4.3转换操作

    keys,values,sortByKey,mapValues,join
    keys把rdd中的key返回形成一个新的rdd;
    keys把rdd中的value返回形成一个新的rdd;
    sortByKey返回一个根据键排序的rdd;
    mapValues对键值对rdd中的每个value都应用一个函数,key不会发生改变;
    join表示内连接,对于内连接,对于给定的两个输入数据集(K, V1)和(K, V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

    list1 = [('hadoop', 1), ('spark', 1), ('flink', 1), ('hive', 1)]
    rdd = sc.parallelize(list1)
    rdd.keys().foreach(print)
    rdd.values().foreach(print)
    # 升序
    rdd.sortByKey().foreach(print)
    # 降序
    rdd.sortByKey(False).foreach(print)
    
    list1 = [('c', 8), ('b', 12), ('c', 2), ('a', 7), ('d', 19), ('a', 17), ('e', 15), ('b', 11)]
    rdd = sc.parallelize(list1)
    # 根据键聚合求和并根据键倒序排序
    rdd.reduceByKey(lambda a,b:a+b).sortByKey(False).collect()
    # 根据键聚合求和并根据键倒序排序
    rdd.reduceByKey(lambda a,b:a+b).sortBy(lambda x: x, False).collect()
    # 根据键聚合求和并根据键倒序排序
    rdd.reduceByKey(lambda a,b:a+b).sortBy(lambda x: x[0], False).collect()
    # 根据键聚合求和并根据值倒序排序
    rdd.reduceByKey(lambda a,b:a+b).sortBy(lambda x: x[1], False).collect()
    
    list1 = [('hadoop', 1), ('spark', 1), ('flink', 1), ('hive', 1)]
    rdd = sc.parallelize(list1)
    rdd.mapValues(lambda x: x+1).foreach(print)
    
    list1 = [('hadoop', 1), ('spark', 5), ('spark', 7), ('hive', 1)]
    rdd1 = sc.parallelize(list1)
    rdd2 = sc.parallelize([('spark', 'fast')])
    rdd3 = rdd1.join(rdd2)
    rdd3.foreach(print)
    

    5综合实例

    求每个键对应的平均值并排序

    list1 = [('hadoop', 6), ('spark', 2), ('spark', 6), ('hadoop', 6), ('hive', 7)]
    rdd = sc.parallelize(list1)
    rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])).mapValues(lambda x: x[0]/x[1]).sortBy(lambda x: x[1], False).collect()
    

    6本地文件数据读写

    保存是给一个文件夹而不是文件名,因为存在分区概念;当加载保存的文件时,给一个文件夹名,就会加载文件夹下所有的文件

    lines = sc.textFile("file:///home/glzt/script/spark/word.txt")
    lines.first()
    lines.saveAsTextFile("file:///home/glzt/script/other")
    line = sc.textFile("file:///home/glzt/script/other")
    

    7分布式文件系统HDFS数据的读写

    写也是传入一个文件夹

    # 三种写法等价
    lines = sc.textFile("hdfs://localhost:9000/home/glzt/software/hadoop/word.txt")
    lines = sc.textFile("/home/glzt/software/hadoop/word.txt")
    lines = sc.textFile("word.txt")
    lines.first()
    lines.saveAsTextFile("writeback")

    相关文章

      网友评论

          本文标题:3RDD创建

          本文链接:https://www.haomeiwen.com/subject/xihhuhtx.html