美文网首页
20 spark弹性分布式数据集

20 spark弹性分布式数据集

作者: 6cc89d7ec09f | 来源:发表于2018-03-10 14:40 被阅读7次

RDD

参考 http://blog.csdn.net/qq_16103331/article/details/53443890
五大特性

image.png
两个动作
对于RDD,有两种类型的动作,一种是Transformation,一种是Action。它们本质区别是:
Transformation返回值还是一个RDD。它使用了链式调用的设计模式,对一个RDD进行计算后,变换成另外一个RDD,然后这个RDD又可以进行另外一次转换。这个过程是分布式的 
Action返回值不是一个RDD。它要么是一个Scala的普通集合,要么是一个值,要么是空,最终或返回到Driver程序,或把RDD写入到文件系统中

所以我可以根据算子的返回类型来判断这个算子是Transformation还是action

Transformations转换操作,返回值还是一个 RDD,如 map、 filter、 union; 
Actions行动操作,返回结果或把RDD持久化起来,如 count、 collect、 save。

lazy懒加载

RDD是一个懒执行,直到action阶段才会真正执行

三大操作方式


image.png

TransFormation函数


image.png
action函数
image.png

练习:

 val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
val rdd2 = sc.textFile("file:///opt/datas/stu2.txt")
 val allRDD = rdd1.union(rdd2)  //融合
allRDD.cache  //缓存到内存
val lines = allRDD.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x => (x._2 > 1)).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).saveAsTextFile("hdfs://bigdata-pro01.kfk.com:8020/user/kfk/wordcount")

DataFrame

操作大全 :http://blog.csdn.net/dabokele/article/details/52802150

image.png
测试:
dataFrame show来展示
toDF rdd转dataframe
val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
 val rdd2 = sc.textFile("file:///opt/datas/stu2.txt")
allRDD = rdd1.union(rdd2)
lines = allRDD.flatMap(x => s.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x => (x._2 > 1)).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).toDF //rdd转dataFrame
lines.printSchema //打印元数据
lines.select("_1","_2").show //select操作

##dataset转dataFrame,指定列名
val dataFrame = spark.read.textFile("file:///opt/datas/stu.txt").flatMap(x => x.split(" ")).map(x => (x,1)).toDF("name","count")
dataFrame.select("name","count").groupBy("name").count.show

dataSet

image.png

创建方法

spark.read.textFile

练习

//定义例类
case class Person(username:String,count:Int) 
//创建dataset
val dataSet = spark.read.textFile("file:///opt/datas/stu.txt")  
//dataset转换成类
val res0 = dataSet.flatMap(x => x.split(" ")).map(x => (x,1)).map(x => (Person(x._1,x._2)))
// 创建成表
res0.createOrReplaceTempView("person")
//分组计数并排序
spark.sql("select username,count(1) from person group by username order by count(1) desc").show

dataFrame转dataSet


image.png
//创建rdd
val rdd = sc.textFile("file:///opt/datas/stu.txt")
//转成dataframe
val df = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) =>(a+b)).toDF("username","count")
//转成dataset,前提有case class Person,且字段名一样
val ds = df.as[Person]
//降序排列
ds.orderBy($"count".desc).show

三大数据集之间的转成和对比

image.png
image.png
image.png
image.png
image.png

RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换

DataFrame/Dataset转RDD:
这个转换很简单

val rdd1=testDF.rdd
val rdd2=testDS.rdd

RDD转DataFrame:

import spark.implicits._
val testDF = rdd.map {line=>
      (line._1,line._2)
    }.toDF("col1","col2")

一般用元组把一行的数据写在一起,然后在toDF中指定字段名

RDD转Dataset:

import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = rdd.map {line=>
      Coltest(line._1,line._2)
    }.toDS

可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可

Dataset转DataFrame:
这个也很简单,因为只是把case class封装成Row

import spark.implicits._
val testDF = testDS.toDF

DataFrame转Dataset:

import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]

这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便

特别注意:
在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用

相关文章

网友评论

      本文标题:20 spark弹性分布式数据集

      本文链接:https://www.haomeiwen.com/subject/xjhmfftx.html