RDD
参考 http://blog.csdn.net/qq_16103331/article/details/53443890
五大特性
两个动作
对于RDD,有两种类型的动作,一种是Transformation,一种是Action。它们本质区别是:
Transformation返回值还是一个RDD。它使用了链式调用的设计模式,对一个RDD进行计算后,变换成另外一个RDD,然后这个RDD又可以进行另外一次转换。这个过程是分布式的
Action返回值不是一个RDD。它要么是一个Scala的普通集合,要么是一个值,要么是空,最终或返回到Driver程序,或把RDD写入到文件系统中
所以我可以根据算子的返回类型来判断这个算子是Transformation还是action
Transformations转换操作,返回值还是一个 RDD,如 map、 filter、 union;
Actions行动操作,返回结果或把RDD持久化起来,如 count、 collect、 save。
lazy懒加载
RDD是一个懒执行,直到action阶段才会真正执行
三大操作方式
image.png
TransFormation函数
image.png
action函数
image.png
练习:
val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
val rdd2 = sc.textFile("file:///opt/datas/stu2.txt")
val allRDD = rdd1.union(rdd2) //融合
allRDD.cache //缓存到内存
val lines = allRDD.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x => (x._2 > 1)).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).saveAsTextFile("hdfs://bigdata-pro01.kfk.com:8020/user/kfk/wordcount")
DataFrame
操作大全 :http://blog.csdn.net/dabokele/article/details/52802150
测试:
dataFrame show来展示
toDF rdd转dataframe
val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
val rdd2 = sc.textFile("file:///opt/datas/stu2.txt")
allRDD = rdd1.union(rdd2)
lines = allRDD.flatMap(x => s.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x => (x._2 > 1)).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).toDF //rdd转dataFrame
lines.printSchema //打印元数据
lines.select("_1","_2").show //select操作
##dataset转dataFrame,指定列名
val dataFrame = spark.read.textFile("file:///opt/datas/stu.txt").flatMap(x => x.split(" ")).map(x => (x,1)).toDF("name","count")
dataFrame.select("name","count").groupBy("name").count.show
dataSet
image.png创建方法
spark.read.textFile
练习
//定义例类
case class Person(username:String,count:Int)
//创建dataset
val dataSet = spark.read.textFile("file:///opt/datas/stu.txt")
//dataset转换成类
val res0 = dataSet.flatMap(x => x.split(" ")).map(x => (x,1)).map(x => (Person(x._1,x._2)))
// 创建成表
res0.createOrReplaceTempView("person")
//分组计数并排序
spark.sql("select username,count(1) from person group by username order by count(1) desc").show
dataFrame转dataSet
image.png
//创建rdd
val rdd = sc.textFile("file:///opt/datas/stu.txt")
//转成dataframe
val df = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) =>(a+b)).toDF("username","count")
//转成dataset,前提有case class Person,且字段名一样
val ds = df.as[Person]
//降序排列
ds.orderBy($"count".desc).show
三大数据集之间的转成和对比
image.pngimage.png
image.png
image.png
image.png
RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换
DataFrame/Dataset转RDD:
这个转换很简单
val rdd1=testDF.rdd
val rdd2=testDS.rdd
RDD转DataFrame:
import spark.implicits._
val testDF = rdd.map {line=>
(line._1,line._2)
}.toDF("col1","col2")
一般用元组把一行的数据写在一起,然后在toDF中指定字段名
RDD转Dataset:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = rdd.map {line=>
Coltest(line._1,line._2)
}.toDS
可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可
Dataset转DataFrame:
这个也很简单,因为只是把case class封装成Row
import spark.implicits._
val testDF = testDS.toDF
DataFrame转Dataset:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]
这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便
特别注意:
在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用
网友评论