1、Spark SQL简介
Hive执行SQL过程- Spark线程级并行,MR是进程级并行
-
Spark SQL仅依赖HiveQL解析和Hive元数据,解析成抽象语法树后就交给Spark SQL
Spark SQL执行过程
2、DataFrame
RDD是分布式的Java对象集合,但是对象内部结构对于RDD是不可知的
RDD与DataFrameDataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息
3、DataFrame创建
SparkSession支持从不同的数据源加载数据,以及把数据转换成DataFrame
scala> import org.apache.spark.sql.SparkSession
scala> val spark=SparkSession.builder().getOrCreate()
scala> val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
scala> peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark/mycode/sql/newpeople.csv")
4、DataFrame常用操作
printSchema()
df.printSchema()
展示结构
select()
df.select(df("name").as("xxxname"), df("age")).show()
filter()
df.filter(df("age") > 20).show()
groupBy()
df.groupBy("age").count().show()
sort()
df.sort(df("age").desc).show()
5、从RDD转换为DataFrame
5.1 利用反射推断
- map(attributes => Person(attributes(0), attributes(1).trim.toInt))
scala> import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
scala> import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoder
scala> import spark.implicits._ //导入包,支持把一个RDD隐式转换为一个DataFrame
import spark.implicits._
scala> case class Person(name: String, age: Long) //定义一个case class
defined class Person
scala> val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(_.split(",")).
map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> peopleDF.createOrReplaceTempView("people") //必须注册为临时表才能供下面的查询使用
scala> val personsRDD = spark.sql("select name,age from people where age > 20")
//最终生成一个DataFrame,下面是系统执行返回的信息
personsRDD: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> personsRDD.map(t => "Name: "+t(0)+ ","+"Age: "+t(1)).show()
5.2 编程定义
- 制作表头
- 制作表中记录
- 把表头和内容拼在一起
- SparkSQL提供了StructType(fields:Seq[StructField])类来表示表的模式信息
- StructField(name, dataType, nullable) name字段名称 dataType字段类型
- 制作表中记录的时候,每条记录都封装到Row对象中,并把所有的Row对象一起保存到一个RDD中
- 表头和表中记录可以通过spark.createDataFrame()语句进行拼接并得到一个DataFrame
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
//生成字段
scala> val fields = Array(StructField("name",StringType,true), StructField("age",IntegerType,true))
fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,IntegerType,true))
scala> val schema = StructType(fields)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age, IntegerType,true))
//从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
//shcema就是“表头”
//下面加载文件生成RDD
scala> val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/examples/src/main/resources/people.txt MapPartitionsRDD[1] at textFile at <console>:26
//对peopleRDD 这个RDD中的每一行元素都进行解析
scala> val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim.toInt))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:29
//上面得到的rowRDD就是“表中的记录”
//下面把“表头”和“表中的记录”拼装起来
scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int]
//必须注册为临时表才能供下面查询使用
scala> peopleDF.createOrReplaceTempView("people")
scala> val results = spark.sql("SELECT name,age FROM people")
6、读写数据库
6.1 JDBC
scala> val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load()
scala> jdbcDF.show()
studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
6.2 Hive
scala> import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.SparkSession
scala> case class Record(key: Int, value: String)
scala> val warehouseLocation = "spark-warehouse" // 这个是配置的
scala> val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
scala> import spark.implicits._
scala> import spark.sql
//下面是运行结果
scala> sql("SELECT * FROM sparktest.student").show()
scala> import java.util.Properties
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
//下面设置两条数据表示两个学生信息
scala> val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
//下面设置模式信息
scala> val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
//下面创建Row对象,每个Row对象都是rowRDD中的一行
scala> val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
scala> val studentDF = spark.createDataFrame(rowRDD, schema)
//查看studentDF
scala> studentDF.show()
+---+---------+------+---+
| id| name|gender|age|
+---+---------+------+---+
| 3|Rongcheng| M| 26|
| 4| Guanhua| M| 27|
+---+---------+------+---+
//下面注册临时表
scala> studentDF.registerTempTable("tempTable")
//下面执行向Hive中插入记录的操作
scala> sql("insert into sparktest.student select * from tempTable")
网友评论