基本概念
Shark、Spark SQL和Hive之间的关系:
- Shark借用了Hive大部分的组件,包括词法分析、语法分析和逻辑分析阶段,只是在最后将逻辑执行计划转化为物理执行计划这一步,将底层的实现从MapReduce替换成了Spark。
- Spark SQL在Hive兼容层面仅依赖HiveQL解析和Hive元数据,也就是说从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。
Spark SQL增加了DataFrame(即带有Scheme信息的RDD)。使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是Json格式的数据。
Spark SQL目前支持Scala、Java和Python三种语言。
DataFrame是以RDD为基础的分布式数据集,提供了详细的结构信息。
DataFrame的创建
Spark2.0开始使用全新的SparkSession接口替代Spark1.6中的SQLContext、HiveContext等来实现其对数据加载、转换、处理等功能。
SparkSession支持从不同的数据源(Hive、HDFS、Cassandra)把数据转换成DataFrame,并支持把DataFrame转换成SQLContext中的表,使用SQL进行查询。
/spark/examples/src/main/resources目录下自带一些样例数据,我们来进行加载:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
//支持RDDs转换为DataFrame及后续SQL
import spark.implicits._
val df1 = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
df1.show()
val df2 = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
df2.show()
以下是常用的DataFrame操作:
//打印模式信息
scala>df1.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
//选择多列
scala>df1.select(df("name"),df("age")+1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
//条件过滤
df1.filter(df("age") > 20 ).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
//分组聚合
scala>df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
//排序
scala>df.sort(df("age").desc).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+
//多列排序
scala>df.sort(df("age").desc,df("name").desc).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+
//对列进行重命名
scala>df.select(df("name").as("username"),df("age")).show()
+--------+----+
|username| age|
+--------+----+
| Michael|null|
| Andy| 30|
| Justin| 19|
+--------+----+
从RDD转换得到DataFrame
- 利用反射机制推断RDD模式
利用反射机制推断RDD模式时,需要首先定义一个case class,因为只有case class才能被Spark隐式转换为DataFrame:
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._ //导入包,支持把RDD隐式转换为DataFrame
case class Person(name:String,age:Long) //定义一个case class
val peopleDF = spark.sparkContext.textFile("file:///usr/local/people.txt").map(_.split(",")).map(attributes => Person(attributes(0),attributes(1).trim.toInt)).toDF()
peopleDF.createOrReplaceTempView("people") //必须注册为临时表才能提供下面的查询使用
val personsRDD = spark.sql("select name,age from people where age > 20") //最终生成一个Dataframe
personsRDD.map(t => "Name:" + t(0)+","+"Age:"+t(1)).show() //df中每一个元素都是一行记录
- 使用编程方式定义RDD模式
当无法提取定义case class时,就需要采用编程方式定义RDD模式:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val peopleRDD = spark.sparkContext.textFile("file:///usr/local/people.txt")
val schemaString = "name age" //定义一个模式字符串
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = true))
val schema = StructType(fields)
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1).trim))
val peopleDF = spark.createDataFrame(rowRDD,schema)
DataFrame保存成文件
- write.format().save()
val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF.select("name","age").write.format("csv").save("file:///usr/local/spark/newpeople.csv")
write.format()支持json,parquet,jdbc,orc,libsvm,csv,text等格式文件。
- saveAsTextFile()
val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF.saveAsTextFile("file:///usr/local/spark/newpeople.csv")
通过JDBC连接数据库
假设MySQL中创建了一个表spark.student(id int(4),name char(20),gender char(4),age int(4));
要让spark能连接mysql,需要将mysql的驱动包(mysql-connector-java-xx.jar)拷贝到spark/jars/目录下,启动spark-shell时需要指定该驱动包:
spark-shell --jars /usr/local/spark/jars/mysql-connector-java-xx.jar --driver-class-path /usr/local/spark/jars/mysql-connector-java-xx.jar
接下来开始编写程序:
val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://ocalhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable","student").option("user","root").option("password","hadoop").load()
jdbcDF.show()
向MySQL数据库写入数据
现在开始在spark-shell中编写程序,往spark.student表中插入两条记录:
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//下面设置两条数据表示两个学生信息
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
//下面要设置模式信息
val schema = StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("gender",StringType,true),StructField("age",IntegerType,true)))
//下面创建Row对象,每个Row对象都是 rowRDD中一行
val rowRDD = studentRDD.map(p => Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD,schema)
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user","root")
prop.put("password","hadoop")
prop.put("driver","com.mysql.jdbc.Driver")
//采用append模式,追加数据
studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
网友评论