美文网首页
Spark开发--Spark SQL简介(十)

Spark开发--Spark SQL简介(十)

作者: 无剑_君 | 来源:发表于2020-03-31 15:18 被阅读0次

一、 SparkSQL 简介

  spark SQL是spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。
SparkSQL的作用:
提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎。
DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD。
运行原理:
将 Spark SQL 转化为 RDD, 然后提交到集群执行。
特点:
(1)容易整合
(2)统一的数据访问方式
(3)兼容 Hive
(4)标准的数据连接
文档地址:http://spark.apache.org/docs/latest/sql-programming-guide.html

二、SparkSession

  SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。
  在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。
  SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
特点:
---- 为用户提供一个统一的切入点使用Spark 各项功能。
---- 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序。
---- 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互。
---- 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中。

1. DataFrame

  在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

DataFrames

2. RDD转换成为DataFrame

spark自带示例数据:

root@master:~# ls /usr/local/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/
employees.json  full_user.avsc  kv1.txt  people.csv  people.json  people.txt  user.avsc  users.avro  users.orc  users.parquet

1)通过 case class 创建 DataFrames(反射)

// 定义case class,相当于表结构
scala> case class People(var name:String,var age:Int)

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala>  val context = new SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
context: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@13835bdc

// 将本地的数据读入 RDD, 并将 RDD 与 case class 关联
scala>  val peopleRDD = sc.textFile("file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = /usr/local/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt MapPartitionsRDD[1] at textFile at <console>:25
scala> .map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt))
res0: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[2] at map at <console>:29

scala> import context.implicits._
import context.implicits._

// 将RDD 转换成 DataFrames
scala> val df = peopleRDD.toDF
df: org.apache.spark.sql.DataFrame = [value: string]

// 将DataFrames创建成一个临时的视图
scala> df.createOrReplaceTempView("people")

// 使用SQL语句进行查询
scala> context.sql("select * from people").show()
+-----------+                                                                   
|      value|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+

2) 通过 structType 创建 DataFrames(编程接口)

scala> val fileRDD  = sc.textFile("file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt")
fileRDD: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt MapPartitionsRDD[23] at textFile at <console>:34

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala>  val sqlContext = new SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4203cb07

// 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.{SaveMode, Row, SQLContext}
import org.apache.spark.sql.{SaveMode, Row, SQLContext}

scala> val rowRDD = fileRDD.map(line => {
     |   val fields = line.split(",")
     |   Row(fields(0), fields(1).trim.toInt)
     |  })
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[24] at map at <console>:44

 // 创建 StructType 来定义结构
scala>     val structType: StructType = StructType(
     |       //字段名,字段类型,是否可以为空
     |       StructField("name", StringType, true) ::
     |         StructField("age", IntegerType, true) :: Nil
     |     )
structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
// 创建DataFrame
scala> val df = sqlContext.createDataFrame(rowRDD,structType)
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
// 创建临时视图
scala>  df.createOrReplaceTempView("people")
// 执行查询并显示
scala>  sqlContext.sql("select * from people").show()
+-------+---+                                                                   
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

3) 通过 json 文件创建 DataFrames

// 加载JSON数据
scala>  val df = sqlContext.read.json("file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.createOrReplaceTempView("people")
// 执行查询
scala> sqlContext.sql("select * from people").show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

3. DataFrame数据读取与保存

1)数据的读取

// 方式一
scala>  val df1 = sqlContext.read.json("file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.json")
df1: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala>  val df2 = sqlContext.read.parquet("file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/users.parquet")
df2: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

// 方式二
scala> val df3 = sqlContext.read.format("json").load("file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.json")
df3: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> val df4 = sqlContext.read.format("parquet").load("file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/users.parquet")
df4: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
// 方式三,默认是parquet格式
scala> val df5 = sqlContext.load("file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/users.parquet")
warning: there was one deprecation warning; re-run with -deprecation for details
df5: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

2) 数据的保存

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
scala>  val sqlContext = new SQLContext(sc)
// 加载数据
scala>  val df1 = sqlContext.read.json("file:///usr/local/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.json")
df1: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

// 方式一
scala> df1.write.json("file:///root/test1")
scala>  df1.write.parquet("file:///root/test2")

// 方式二
scala> df1.write.format("json").save("file:///root/test3")
scala> df1.write.format("parquet").save("file:///root/test4")

// 方式三
scala> df1.write.save("file:///root/test5")

结果查看:

root@master:~# ls test*
testdata

test1:
part-00000-aa54221a-fb88-4bb9-b487-cfd9d826e11f-c000.json  _SUCCESS

test2:
part-00000-d5c696d3-5d13-4647-8645-c0a54e6dbaff-c000.snappy.parquet  _SUCCESS

test3:
part-00000-7ab6a478-cf15-4209-8a62-299a9276c0c1-c000.json  _SUCCESS

test4:
part-00000-9bb599db-3abe-4cc4-a62f-674cbe7f7444-c000.snappy.parquet  _SUCCESS

test5:
part-00000-05ebcb86-e6b5-40b6-9666-a069a363e466-c000.snappy.parquet  _SUCCESS

root@master:~# cat test1/part-00000-aa54221a-fb88-4bb9-b487-cfd9d826e11f-c000.json 
{"name":"Michael"}
{"age":30,"name":"Andy"}
{"age":19,"name":"Justin"}

数据的保存模式:

scala> df1.write.format("parquet").mode(SaveMode.Ignore).save("file:///root/test6")

相关文章

网友评论

      本文标题:Spark开发--Spark SQL简介(十)

      本文链接:https://www.haomeiwen.com/subject/uwtpgctx.html