美文网首页
Spark基础系列之五--键值对RDD

Spark基础系列之五--键值对RDD

作者: 微生活_小阿楠 | 来源:发表于2020-05-05 07:13 被阅读0次

    传送门
    Spark实战系列之一--Spark是什么
    Spark实战系列之二--什么是RDD以及RDD的常用API
    Spark实战系列之三--RDD编程基础上
    Spark实战系列之四--RDD编程基础下
    Spark实战系列之五--键值对RDD
    Spark实战系列之六--数据读写
    Spark实战系列之七--综合案例
    Spark基础系列之八--Spark SQL是什么
    Spark基础系列之九--使用Spark SQL读写数据库
    传送门

    一、键值对RDD的创建

    1)第一种创建方式:从文件中加载

    • 可以采用多种方式创建pairRDD,其中一种主要方式是map()函数来实现
    val lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
    val pairRDD = lines.flatMap(line=>line.split(" ")).map(word => (word,1)).foreach(println)
    
    (i,1)
    (love,1)
    (spark,1)
    
    
    • 通过并行集合(数组)创建RDD
    val list = List(“Hadoop”,"Spark","Hive","Spark")
    val rdd = sc.parallelize(list)
    val pairRDD = rdd.map(word => (word,1)).foreach(println)
    
    (Hadoop,1)
    (Spark,1)
    (Hive,1)
    (Spark,1)
    
    

    二、常用的键值对RDD转换操作

    更多相关的函数,请点击这个链接-->Spark实战系列之一--什么是RDD以及RDD的常用API

    • reduceByKey(func)
    • groupByKey()
    • keys
    • values
    • sortByKey()
    • mapValues(func)
    • join
    • combineByKey
    • reduceByKey(func)
      reduceByKey(func)的功能是,使用func函数合并具有相同键的值
      (Hadoop,1)
      (Spark,1)
      (Hive,1)
      (Spark,1)
    pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
    //结果
    (Spark,2)
    (Hive,1)
    (Hadoop,1)
    

    • groupByKey(func)
      groupByKey(func)的功能是,对具有相同键的值进行分组
      (Hadoop,3)
      (Spark,1)
      (Hadoop,5)
      (Spark,2)
    pairRDD.groupByKey()
    //结果
    (Spark,(1,2))
    (Hadoop,(3,5))
    
    //reduceByKey(func)和groupByKey(func)区别
    reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。
    
    groupByKey也是对每个key进行操作,但只生成一个sequence,groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作。
    
    推荐用reduceByKey,因为相对groupByKey,在面对大量数据时,可以节省很多资源。
    

    • sortByKey()
      sortByKey()的功能是返回一个根据键排序的RDD
      (Hadoop,1)
      (Spark,1)
      (Hive,1)
      (Spark,1)
    pairRDD.sortByKey().foreach(println)
    //结果
    (Hadoop,1)
    (Hive,1)
    (Spark,1)
    (Spark,1)
    

    • mapValues(func)
      对键值对RDD中的每个value都应用一个函数。但是,key不会发生变化
      (Hadoop,1)
      (Spark,1)
      (Hive,1)
      (Spark,1)
    pairRDD.mapValues(x => x+1).foreach(println)
    //结果
    (Hadoop,2)
    (Spark,2)
    (Hive,2)
    (Spark,2)
    

    • join
      join表示内连接。对于内连接,对于给定的两个输入数据集(k,v1)和(k,v2),只有在两个数据集中都存在的key才会被输出,最终得到一个(k,(v1,v2))类型的数据集。
    scala> val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))
    
    scala> val pairRDD2 = sc.parallelize(Array(("spark","fast")))
    
    scala> pairRDD1.join(pairRDD2)
    
    scala> pairRDD1.join(pairRDD2).foreach(println)
    (spark,(1,fast))
    (spark,(2,fast))
    

    三,综合实例

    题目:给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
    很显然,对于上面的题目,结果是很显然的,(“spark”,4),(“hadoop”,5)。
    下面,我们在spark-shell中演示代码执行过程:

    scala> val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:27
     
    scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()
    res22: Array[(String, Int)] = Array((spark,4), (hadoop,5))
    
    
    步骤说明:
    (1)首先构建一个数组,数组里面包含了四个键值对,然后,调用parallelize()方法生成RDD,从执行结果反馈信息,可以看出,rdd类型是RDD[(String, Int)]。
    
    (2)针对构建得到的rdd,我们调用mapValues()函数,把rdd中的每个每个键值对(key,value)的value部分进行修改,把value转换成键值对(value,1),其中,数值1表示这个key在rdd中出现了1次,为什么要记录出现次数呢?因为,我们最终要计算每个key对应的平均值,所以,必须记住这个key出现了几次,最后用value的总和除以key的出现次数,就是这个key对应的平均值。比如,键值对(“spark”,2)经过mapValues()函数处理后,就变成了(“spark”,(2,1)),其中,数值1表示“spark”这个键的1次出现。下面就是rdd.mapValues()操作在spark-shell中的执行演示:
    scala> rdd.mapValues(x => (x,1)).collect()
    res23: Array[(String, (Int, Int))] = Array((spark,(2,1)), (hadoop,(6,1)), (hadoop,(4,1)), (spark,(6,1)))
    上面语句中,collect()是一个行动操作,功能是以数组的形式返回数据集中的所有元素,当我们要实时查看一个RDD中的元素内容时,就可以调用collect()函数。
    
    (3)这里,必须要十分准确地理解reduceByKey()函数的功能。可以参考上面我们对该函数的介绍,reduceByKey(func)的功能是使用func函数合并具有相同键的值。这里的func函数就是Lamda表达式(x,y) => (x._1+y._1,x._2 + y._2),这个表达式中,x和y都是value,而且是具有相同key的两个键值对所对应的value,比如,在这个例子中, (“hadoop”,(6,1))和(“hadoop”,(4,1))这两个键值对具有相同的key,所以,对于函数中的输入参数(x,y)而言,x就是(6,1),x._1表示这个键值对中的第1个元素6,x._2表示这个键值对中的第二个元素1,y就是(4,1),y._1表示这个键值对中的第1个元素4,y._2表示这个键值对中的第二个元素1,所以,函数体(x._1+y._1,x._2 + y._2),相当于生成一个新的键值对(key,value),其中,key是x._1+y._1,也就是6+4=10,value是x._2 + y._2,也就是1+1=2,因此,函数体(x._1+y._1,x._2 + y._2)执行后得到的value是(10,2),但是,要注意,这个(10,2)是reduceByKey()函数执行后,”hadoop”这个key对应的value,也就是,实际上reduceByKey()函数执行后,会生成一个键值对(“hadoop”,(10,2)),其中,10表示hadoop书籍的总销量,2表示两天。同理,reduceByKey()函数执行后会生成另外一个键值对(“spark”,(8,2))。
    
    (4)最后,就可以求出最终结果。我们可以对上面得到的两个键值对(“hadoop”,(10,2))和(“spark”,(8,2))所构成的RDD执行mapValues()操作,得到每种书的每天平均销量。当第一个键值对(“hadoop”,(10,2))输入给mapValues(x => (x._1 / x._2))操作时,key是”hadoop”,保持不变,value是(10,2),会被赋值给Lamda表达式x => (x._1 / x._2中的x,因此,x的值就是(10,2),x._1就是10,表示hadoop书总销量是10,x._2就是2,表示2天,因此,hadoop书籍的每天平均销量就是x._1 / x._2,也就是5。mapValues()输出的一个键值对就是(“hadoop”,5)。同理,当把(“spark”,(8,2))输入给mapValues()时,会计算得到另外一个键值对(“spark”,4)。
    
    

    相关文章

      网友评论

          本文标题:Spark基础系列之五--键值对RDD

          本文链接:https://www.haomeiwen.com/subject/nqnfghtx.html