传送门
Spark实战系列之一--Spark是什么
Spark实战系列之二--什么是RDD以及RDD的常用API
Spark实战系列之三--RDD编程基础上
Spark实战系列之四--RDD编程基础下
Spark实战系列之五--键值对RDD
Spark实战系列之六--数据读写
Spark实战系列之七--综合案例
Spark基础系列之八--Spark SQL是什么
Spark基础系列之九--使用Spark SQL读写数据库
传送门
一、Spark SQL是什么
1)DataFrame简述
- Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据。
- Spark SQL目前支持Scala、Java、Python三种语言
- DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能
- Spark能够轻松实现从Mysql到DataFrame的转化,并且支持SQL查询

2)DataFrame的创建

- 1)可以通过如下语句创建一个SparkSession对象:
scala> import org.apache.spark.sql.SparkSession
scala> val spark1=SparkSession.builder().getOrCreate()
- 2)在创建DataFrame之前,为了支持RDD转换为DataFrame及后续的SQL操作,需要通过import语句(即import spark1.implicits._)导入相应的包,启动隐式转换。(注意:上面import后面的spark,是你创建的命名)
- 3)在创建DataFrame时,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame,例如:
//读取people.json文件创建DataFrame;
//在读取本地文件或HDFS文件时,要注意给出正确的文件路径;
spark.read.json("people.json")
//读取people.csv文件创建DataFrame;
spark.read.csv("people.csv")


3)DataFrame的保存
- 可以使用spark.write操作,把一个DataFrame保存不同格式的文件。例如,把一个名称为df的DataFrame保存到不同格式文件中,方法如下:
df.write.json("people.json")
df.write.parquet("people.parquet")
df.write.csv("people.csv")
- 下面从示例文件people.json中创建一个DataFrame,然后保存成csv格式文件,代码如下:
scala> val peopleDF=spark.read.format("json").load("file:///usr/lcoal/people.json")
scala> peopleDF.select("name","age").write.format("csv").save("file:///usr/local/newpeople.csv")
- 一些常用的DataFrame操作



3)从RDD转换得到DataFrame
- 法一:利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换
- 法二:使用编程接口,构造一个schema并将其应用在已知的RDD上
练习
利用反射机制和编程方式实现rdd转化dataframe
源文件格式如下:
1::ella::65
2::jack::100
3::bruce::80
- 法一:利用反射来推断包含特定类型对象的RDD的
- 在利用反射机制推断RDD时,需要首先定义一个case class,因为,只有case class 才能被Spark隐式地转换为DataFrame
import org.apache.log4j.{Level, Logger}
import org.apache.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.DatasetHolder
import org.apache.spark.sql.SparkSession
object spark3 {
//定义一个case class
case class Person(key:Int,name:String,age:Int)
def main(args :Array[String]): Unit ={
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val conf = new SparkConf().setAppName("spark0").setMaster("local")
val sc = new SparkContext(conf)
val spark=SparkSession.builder().getOrCreate()
//导入包,支持把一个RDD隐式转换为一个DataFrame
import spark.implicits._
//val RDD = spark.sparkContext.textFile("src/spark/a.txt")
val RDD = spark.sparkContext.textFile("file:///usr/local/spark/mycode/dataframe2.txt")
val personDF=RDD.map(_.split("::")).map(x=>Person(x(0).trim.toInt,x(1),x(2).trim.toInt)).toDF()
personDF.show()
//必须注册为临时表才能供下面的查询项目
//personDF.createOrReplaceTempView("people")
//最终生成一个DataFrame,下面是系统执行返回的信息
//val personRDD = spark.sql("select * from people")
//DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值
//personRDD.map(t=>"key: "+t(0) +" name: "+ t(1) + " age: "+t(2)).show()
}
}
- 法二:使用编程接口,构造一个schema并将其应用在已知的RDD上
- 当无法提前定义case class时,就需要采用编程方式定义RDD模式

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
object spark5 {
def main(args :Array[String]): Unit = {
val conf = new SparkConf().setAppName("spark0").setMaster("local")
val sc = new SparkContext(conf)
val spark=SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val fields = Array(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true))
val schema = StructType(fields)
val RDD = spark.sparkContext.textFile("file:///usr/local/spark/mycode/dataframe2.txt")
val rowRDD = RDD.map(x=>x.split("::")).map(x=>Row(x(0).trim.toInt,x(1),x(2).trim.toInt))
val personDF = {
spark.createDataFrame(rowRDD, schema)
}
personDF.show()
}
}
网友评论