传送门
Spark实战系列之一--Spark是什么
Spark实战系列之二--什么是RDD以及RDD的常用API
Spark实战系列之三--RDD编程基础上
Spark实战系列之四--RDD编程基础下
Spark实战系列之五--键值对RDD
Spark实战系列之六--数据读写
Spark实战系列之七--综合案例
Spark基础系列之八--Spark SQL是什么
Spark基础系列之九--使用Spark SQL读写数据库
传送门
一、键值对RDD的创建
1)第一种创建方式:从文件中加载
- 可以采用多种方式创建pairRDD,其中一种主要方式是map()函数来实现
val lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
val pairRDD = lines.flatMap(line=>line.split(" ")).map(word => (word,1)).foreach(println)
(i,1)
(love,1)
(spark,1)
- 通过并行集合(数组)创建RDD
val list = List(“Hadoop”,"Spark","Hive","Spark")
val rdd = sc.parallelize(list)
val pairRDD = rdd.map(word => (word,1)).foreach(println)
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
二、常用的键值对RDD转换操作
更多相关的函数,请点击这个链接-->Spark实战系列之一--什么是RDD以及RDD的常用API
- reduceByKey(func)
- groupByKey()
- keys
- values
- sortByKey()
- mapValues(func)
- join
- combineByKey
-
reduceByKey(func)
reduceByKey(func)的功能是,使用func函数合并具有相同键的值
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
//结果
(Spark,2)
(Hive,1)
(Hadoop,1)
-
groupByKey(func)
groupByKey(func)的功能是,对具有相同键的值进行分组
(Hadoop,3)
(Spark,1)
(Hadoop,5)
(Spark,2)
pairRDD.groupByKey()
//结果
(Spark,(1,2))
(Hadoop,(3,5))
//reduceByKey(func)和groupByKey(func)区别
reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。
groupByKey也是对每个key进行操作,但只生成一个sequence,groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作。
推荐用reduceByKey,因为相对groupByKey,在面对大量数据时,可以节省很多资源。
-
sortByKey()
sortByKey()的功能是返回一个根据键排序的RDD
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
pairRDD.sortByKey().foreach(println)
//结果
(Hadoop,1)
(Hive,1)
(Spark,1)
(Spark,1)
-
mapValues(func)
对键值对RDD中的每个value都应用一个函数。但是,key不会发生变化
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
pairRDD.mapValues(x => x+1).foreach(println)
//结果
(Hadoop,2)
(Spark,2)
(Hive,2)
(Spark,2)
-
join
join表示内连接。对于内连接,对于给定的两个输入数据集(k,v1)和(k,v2),只有在两个数据集中都存在的key才会被输出,最终得到一个(k,(v1,v2))类型的数据集。
scala> val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))
scala> val pairRDD2 = sc.parallelize(Array(("spark","fast")))
scala> pairRDD1.join(pairRDD2)
scala> pairRDD1.join(pairRDD2).foreach(println)
(spark,(1,fast))
(spark,(2,fast))
三,综合实例
题目:给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
很显然,对于上面的题目,结果是很显然的,(“spark”,4),(“hadoop”,5)。
下面,我们在spark-shell中演示代码执行过程:
scala> val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:27
scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()
res22: Array[(String, Int)] = Array((spark,4), (hadoop,5))
步骤说明:
(1)首先构建一个数组,数组里面包含了四个键值对,然后,调用parallelize()方法生成RDD,从执行结果反馈信息,可以看出,rdd类型是RDD[(String, Int)]。
(2)针对构建得到的rdd,我们调用mapValues()函数,把rdd中的每个每个键值对(key,value)的value部分进行修改,把value转换成键值对(value,1),其中,数值1表示这个key在rdd中出现了1次,为什么要记录出现次数呢?因为,我们最终要计算每个key对应的平均值,所以,必须记住这个key出现了几次,最后用value的总和除以key的出现次数,就是这个key对应的平均值。比如,键值对(“spark”,2)经过mapValues()函数处理后,就变成了(“spark”,(2,1)),其中,数值1表示“spark”这个键的1次出现。下面就是rdd.mapValues()操作在spark-shell中的执行演示:
scala> rdd.mapValues(x => (x,1)).collect()
res23: Array[(String, (Int, Int))] = Array((spark,(2,1)), (hadoop,(6,1)), (hadoop,(4,1)), (spark,(6,1)))
上面语句中,collect()是一个行动操作,功能是以数组的形式返回数据集中的所有元素,当我们要实时查看一个RDD中的元素内容时,就可以调用collect()函数。
(3)这里,必须要十分准确地理解reduceByKey()函数的功能。可以参考上面我们对该函数的介绍,reduceByKey(func)的功能是使用func函数合并具有相同键的值。这里的func函数就是Lamda表达式(x,y) => (x._1+y._1,x._2 + y._2),这个表达式中,x和y都是value,而且是具有相同key的两个键值对所对应的value,比如,在这个例子中, (“hadoop”,(6,1))和(“hadoop”,(4,1))这两个键值对具有相同的key,所以,对于函数中的输入参数(x,y)而言,x就是(6,1),x._1表示这个键值对中的第1个元素6,x._2表示这个键值对中的第二个元素1,y就是(4,1),y._1表示这个键值对中的第1个元素4,y._2表示这个键值对中的第二个元素1,所以,函数体(x._1+y._1,x._2 + y._2),相当于生成一个新的键值对(key,value),其中,key是x._1+y._1,也就是6+4=10,value是x._2 + y._2,也就是1+1=2,因此,函数体(x._1+y._1,x._2 + y._2)执行后得到的value是(10,2),但是,要注意,这个(10,2)是reduceByKey()函数执行后,”hadoop”这个key对应的value,也就是,实际上reduceByKey()函数执行后,会生成一个键值对(“hadoop”,(10,2)),其中,10表示hadoop书籍的总销量,2表示两天。同理,reduceByKey()函数执行后会生成另外一个键值对(“spark”,(8,2))。
(4)最后,就可以求出最终结果。我们可以对上面得到的两个键值对(“hadoop”,(10,2))和(“spark”,(8,2))所构成的RDD执行mapValues()操作,得到每种书的每天平均销量。当第一个键值对(“hadoop”,(10,2))输入给mapValues(x => (x._1 / x._2))操作时,key是”hadoop”,保持不变,value是(10,2),会被赋值给Lamda表达式x => (x._1 / x._2中的x,因此,x的值就是(10,2),x._1就是10,表示hadoop书总销量是10,x._2就是2,表示2天,因此,hadoop书籍的每天平均销量就是x._1 / x._2,也就是5。mapValues()输出的一个键值对就是(“hadoop”,5)。同理,当把(“spark”,(8,2))输入给mapValues()时,会计算得到另外一个键值对(“spark”,4)。
网友评论