美文网首页
(二)SparkSQL DataFrame和DataSet基本概

(二)SparkSQL DataFrame和DataSet基本概

作者: 白面葫芦娃92 | 来源:发表于2018-09-26 21:37 被阅读0次

一、定义:
A Dataset is a distributed collection of data.
A DataFrame is a Dataset organized into named columns.It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
1.3版本之前还没有DataFrame的概念,之前叫做SchemaRDD
DataSet是1.6版本开始引入的
二、RDD和DF的异同点:
1.RDD和DataFrame都是分布式的数据集,支持并行运算
2.数据结构方面:和RDD相比,DataFrame包含数据和schema(RDD没有schema),可以理解为一个关系型数据库的表格,暴露的信息更多,SparkSQL可以进行更多的优化
3.API方面:DataFrame提供的API比RDD更为丰富
4.使用DF编程,不管是用什么语言,底层的执行性能是一样的;但是如果使用RDD,用不同的语言,执行性能的差别很大,因为他们是依赖于自身的运行时环境的,比如java/scala和python语言,jvm和python的运行环境是完全不同的
三、DF和DS的异同点:
1.The Dataset API is available in Scala and Java;
The DataFrame API is available in Scala, Java, Python and R
2.In the Scala API, DataFrame is simply a type alias of Dataset[Row]
DF可看做DS的一个特例,DS是一个强类型


假设因为输入错误,在SQL中写了一句"seletc * from XXX",SQL在运行时才会报错,而对于DF和DS,因输入错误写"df.seletc("name")"或"ds.seletc("name")"在编译时就已经发现错误;
假设再因为输入错误,在SQL中写了一句"select nname from XXX",而XXX表里并没有nname这一列(其实列名为"name"),SQL在运行时才会报错;对于DF,写成"df.select("nname")",在编译时不会报错,在运行时才报错;而对于DS,写成"ds.select("nname")",在编译时就已经报错,因为DS是强类型。
Analysis Errors在一个分布式的job开始之前越早暴露越好,如果资源申请好了,运行时才发现错误,之前所做的准备工作就都白费了,强类型就有这点好处
四、SparkSQL 入口点
老版本(2.x之前):SQLContext
新版本(2.x之后):SparkSession
那么SparkSession如何构建呢?
val spark = SparkSession
      .builder()
      .appName("SparkSQLApp")
      .master("local[2]")
      .getOrCreate()
//      .enableHiveSupport()  有这个就可以访问Hive里的东西了

如何创建DataFrame呢?

[hadoop@hadoop001 bin]$ ./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27.jar
scala>   val df = spark.read.format("json").load("file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
scala> df.show
+----+-------+                                                                  
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
scala> df.select("name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+
scala> df.select('name).show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+
scala> df.select($"name",$"age"+1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+
scala> df.filter($"age">21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.groupBy("age").count().show
+----+-----+                                                                    
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+
df.createOrReplaceTempView("people")
scala> val sqlDF = spark.sql("SELECT * FROM people").show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
scala> df.createGlobalTempView("people")
scala> spark.sql("SELECT * FROM global_temp.people").show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

相关文章

网友评论

      本文标题:(二)SparkSQL DataFrame和DataSet基本概

      本文链接:https://www.haomeiwen.com/subject/gjyvwftx.html