本文全部手写原创,请勿复制粘贴、转载请注明出处,谢谢配合!
什么是弹性分布式数据集Rdd?
- 概念:RDD(Resilient Distributed Datasets)简单来说,就是Spark中元素的集合,如数组、集合、文本等等都能称作RDD,但是和普通数据集相比,它有个特点就是:分布式。每个RDD都有多个分区,这些分区分布在集群不同的节点中。
- 优势:其实只要理解了分布式的优势就不难理解Rdd的优势了,Spark会自动将Rdd的数据分发到集群上,将操作并行化执行。比如一个非常大的数据量(PB级),一台物理机处理肯定是不行的(除非是超级计算机但那非常昂贵),所以分布到多台机器上处理,不仅提高了处理速度还廉价许多。
(如果还是不清楚没有关系,下面会演示几个案例,建议自己手动敲一敲代码感受一下就懂了) - Spark中对数据的所有操作不外乎:RDD create(创建) 、RDD transformation(转换)、RDD action(行动)
Rdd的创建
创建Rdd有两种方式:
-
(1)从外部存储系统(如共享文件系统,HDFS,HBase等)中引用数据集。使用textFile()方法
//从本地文件系统读取数据集 scala> val rdd =sc.textFile("file:///usr/local/spark-2.2.0-bin-hadoop2.6.0-cdh5.11.1/README.md") rdd: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark-2.2.0-bin-hadoop2.6.0-cdh5.11.1/README.md MapPartitionsRDD[1] at textFile at <console>:24 //从HDFS中读取数据集 scala> val rdd1 =sc.textFile("hdfs://master:8020/user/test.rdd") rdd1: org.apache.spark.rdd.RDD[String] = hdfs://master:8020/user/test.rdd MapPartitionsRDD[5] at textFile at <console>:24 //打印操作 注:collect()是将所有元素先收集到一台节点上,有可能会内存溢出,如果只是为了打印部分元素,建议使用take() scala> rdd1.collect().foreach(println) hello spark love you scala> rdd1.take(2) res5: Array[String] = Array(hello, spark)
-
(2)从驱动程序中的现有集合(如List,Array等),使用parallelize()方法
//区间转Rdd scala>val rdd1 = sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24 scala>rdd1.collect().foreach(println) 1 2 3 4 5 //List集合转Rdd scala> val list = List(1,"a",'b') list: List[Any] = List(1, a, b) scala> sc.parallelize(list) res12: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[10] at parallelize at <console>:27 scala> res12.collect().foreach(println) 1 a b //Array数组转Rdd scala> val array = Array("a","b") array: Array[String] = Array(a, b) scala> sc.parallelize(array) res8: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:27 scala> res8.collect().foreach(println) a b
Rdd的转换 transformation
转换操作就是从现有的Rdd生成一个新的Rdd的操作,比如filter操作,它从现有的Rdd筛选出符合条件的数据,创建一个新的Rdd。
还是那句话,概念不多说,看实际操作最为直观!
单个Rdd操作
- map(func) 对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD
scala> val rdd = sc.parallelize(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24
scala> rdd.map(x=>(x+2))
res14: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at map at <console>:27
scala> res14.collect()
res15: Array[Int] = Array(3, 4, 5, 6, 7)
scala> rdd.map(x=>(x,1))
res18: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[14] at map at <console>:27
scala> res18.collect()
res19: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1))
- flatMap(func) 和map类似,但是它会把map“扁平化”,每个输入项可以映射到0个或更多个输出项。常用于统计单词数。举例对比一下和map的区别更容易理解:
scala> val rdd = sc.textFile("hdfs://master/user/spark.hello")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://master/user/spark.hello MapPartitionsRDD[16] at textFile at <console>:24
scala> rdd.map(line => line.split(" "))
res20: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[17] at map at <console>:27
scala> res20.collect()
res22: Array[Array[String]] = Array(Array(hello, spark, it, is, perfect), Array(i, want, to, learn))
scala> rdd.flatMap(line => line.split(" "))
res0: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:27
scala> res0.collect()
res1: Array[String] = Array(hello, spark, it, is, perfect, i, want, to, learn)
map输出是Array[Array[String]]、flatMap输出是Array[String],等于将map再次打平。
- reduceByKey(func,[numTask]) 根据函数定义的规则,按照key进行整合,类似MapReduce中的Reduce操作。比如,在map阶段(spark,1) (spark,1),定义方法(x,y)=>(x+y),即value进行加法运算,1+1 所以使用reduceByKey后变为(spark,2)
[root@master hadoop-2.6.0-cdh5.11.1]# hadoop fs -cat /user/wordcount.test
spark i love you
spark i want learn you
wordcount案例:
scala> val rdd = sc.textFile("hdfs://master/user/wordcount.test")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://master/user/wordcount.test MapPartitionsRDD[4] at textFile at <console>:24
scala> val wordmap = rdd.flatMap(line=>line.split(" ")).map(x=>(x,1))
wordmap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:26
scala> wordmap.collect()
res2: Array[(String, Int)] = Array((spark,1), (i,1), (love,1), (you,1), (spark,1), (i,1), (want,1), (learn,1), (you,1))
scala> val wordreduce = wordmap.reduceByKey((x,y)=>(x+y))
wordreduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:28
scala> wordreduce.collect()
res3: Array[(String, Int)] = Array((learn,1), (spark,2), (you,2), (love,1), (i,2), (want,1))
- filter(func) 很好理解,就是过滤的意思
scala> val rdd = sc.parallelize(Array(1,2,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> rdd.filter( x => (x==2)).collect()
res8: Array[Int] = Array(2)
scala> val rdd = sc.parallelize(List("sp","sa","vv"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:24
scala> rdd.filter(x=>x.contains('s')).collect()
res10: Array[String] = Array(sp, sa)
- groupByKey([numTasks]) 学过SQL的应该都知道group by,这里一样的意思,就是按照key来进行分组。numTasks是可选的,用来设置任务数量。
scala> val rdd = sc.parallelize(List((1,"sp"),(1,"sa"),(2,"vv")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24
scala> rdd.groupByKey().collect()
res12: Array[(Int, Iterable[String])] = Array((1,CompactBuffer(sp, sa)), (2,CompactBuffer(vv)))
- distinct([numTasks]) 去除重复元素,但注意:开销大,需要将所有数据通过网络传输进行Shuffle混洗
scala> val rdd = sc.parallelize(List("sp","sp","vv"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at parallelize at <console>:24
scala> rdd.distinct().collect()
res13: Array[String] = Array(sp, vv)
- sortByKey([ascending], [numTasks]) 排序,ascending表示递增的,默认ascending为true 即按递增排序 可以改为false,也可以自定义排序函数。
scala> val rdd = sc.parallelize(List((1,"sp"),(3,"sa"),(2,"vv")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[21] at parallelize at <console>:24
scala> rdd.sortByKey(false).collect()
res14: Array[(Int, String)] = Array((3,sa), (2,vv), (1,sp))
- 还有很多很多函数,大家可以参考:SparkAPI(请依据自己的版本号,这个是2.2.0的)
官方其实说的很详细,但是很多同学可能没有scala基础就会有点看不懂,举个例子解释一下,其他大同小异。比如:
API截图
zeroValue是一个初始的给定值,V是范型的意思,可以是Int型等等,但是这个类型必须和RDD中的V一样。
比如下例:zeroValue为3,是int型,rdd中的List(K,V) ,V也是int型,然后再根据func定义的规则对这两个V进行操作。最后返回RDD[(K,V)] 。
3(初始值)+1+2=6 即(1,6)。 3+5+6=14 即(2,14) 建议:((x,y) => x+y)可以简写为(+) scala语言真的非常优雅!
scala> val rdd = sc.parallelize(List((1,1),(1,2),(2,6),(2,5)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[23] at parallelize at <console>:24
scala> rdd.foldByKey(3)(_+_).collect()
res15: Array[(Int, Int)] = Array((1,6), (2,14))
多个Rdd操作
- 交集差集并集
scala> val list1 = sc.parallelize(List("spark","spark","hello"))
list1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[25] at parallelize at <console>:24
scala> val list2 = sc.parallelize(List("spark","love","you"))
list2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[26] at parallelize at <console>:24
//并集不去重
scala> list1.union(list2).collect()
res16: Array[String] = Array(spark, spark, hello, spark, love, you)
//交集去重
scala> list1.intersection(list2).collect()
res17: Array[String] = Array(spark)
//差集(只存在于第一个dataset、不存在于第二个)不去重
scala> list1.subtract(list2).collect()
res18: Array[String] = Array(hello)
- Join
scala> val list1 = sc.parallelize(List((1,"spark"),(2,"spark"),(3,"hello")))
list1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[38] at parallelize at <console>:24
scala> val list2 = sc.parallelize(List((1,"spark"),(3,"you"),(4,"good")))
list2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[39] at parallelize at <console>:24
//内连接
scala> list1.join(list2).collect()
res20: Array[(Int, (String, String))] = Array((1,(spark,spark)), (3,(hello,you)))
// 左外连接,左边rdd全部显示,右边没有的补null
scala> list1.leftOuterJoin(list2).collect()
res21: Array[(Int, (String, Option[String]))] = Array((1,(spark,Some(spark))), (3,(hello,Some(you))), (2,(spark,None)))
// 右外连接,右边rdd全部显示,左边没有的补null
scala> list1.rightOuterJoin(list2).collect()
res22: Array[(Int, (Option[String], String))] = Array((4,(None,good)), (1,(Some(spark),spark)), (3,(Some(hello),you)))
最后再强调一句,函数太多了,此文只列举了常见的,最好的办法就是直接看官方API !!
Spark官方文档
SparkAPI
网友评论