在知乎看到文章说,学习spark分为两个方向,一个是数据平台开发,一个是数据处理开发,如果用spark的话做数据处理,日常用得最多的就是sparkSQL,需要懂得什么是数据倾斜,原理,如何解决,还有sparkSQL的语句的灵活编写,了解spark的基本原理和特性方便定位问题。平时写习惯了hiveQL处理离线数仓数据的小江开始入门sparkSQL。
一、SparkSQL学习点
学习sparkSQL时可以与hiveQL对比,本质两者是同类的,比如hiveQL是经过hive解析器和优化最后转换成MapReduce任务然后提交到集群去运行,sparkSQL也是经过转换为RDD然后提交到集群去运行的,相比MapReduce这种计算模型,RDD就体现出效率之快了,毕竟计算的中间结果是在内存中,同时spark也支持从hive读取数据。
类比图.png
1、DataFrame API
在spark中,dataframe是一种基于RDD的分布式数据集,从spark1.3加入的数据抽象,与RDD的主要区别在于dataframe是具有schema元信息的,类似于二维表结构化数据,RDD是没有这些二维行列信息的,使得用户可以洞察更多的结构数据且做针对性的查询优化,达到提升运行时效率。比如可以有DataFrame[Person]
2、DataSet API
dataset也是一种分布式数据集,是dataframe的一种扩展,在spark1.6后加入的新的数据抽象,与RDD相比,多了强类型检查和兼顾了dataframe的查询优化特性,在编译期就做类型检查,使得用户可以更好地感知结构化数据。比如可以有DataSet[Person]
3、 RDD、DataFrame、DataSet三者相互转换
- RDD可通过样例类转换成DataFrame、DataSet:
样例类.toDF
,样例类.toDS
- DataFrame、DataSet也可以直接转换为RDD:
ds.rdd
- DataFrame转换为DataSet,借助定义的case class 样例类:
ds.as[Person]
- DataSet转换为DataFrame:
ds.toDF
借助以下图来加深印象了解:
image.png
4、UDF用户自定义函数、UDAF用户自定义聚合函数
//如何自定义一个UDF
spark.udf.register("funcName",(,,)=>())
//如何自定义一个UDAF
自定义udaf,首先要继承UserDefinedAggregateFunction类,然后重写里面的8个方法(4个格式定义+4个计算)
5、Spark读写数据 API
//读取数据文件,load加载数据,创建dataframe
spark.read.json/text/csv (path)
spark.read.format("json/text.csv").load(path)
spark.read.jdbc(url,table,username,password)
spark.read.format("jdbc").option(url).option(table).option(username).option(password).load()
//保存数据文件,save保存数据
df.write.json(path) //默认模式SaveMode,即当存在了就报错的模式ErrorIfExists
模式可以有多种,比如追加append、忽略ignore、覆盖overwrite
df.write.mode("overwrite").json("path") //保存到指定路径,当存在文件名就覆盖
df.write.format("json").save(path)
df.write.jdbc(url,table,username,password)
df.write.format("jdbc").option(url).option(table).option(username).option(password).save()
6、SparkSQL例子
package com.meizu.xiaojiang
import org.apache.spark.sql.{DataFrame, SparkSession}
case class Language(name: String,job: String)
object Spark_SQL {
def main(args: Array[String]): Unit = {
//1、创建SparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("Spark_SQL")
.getOrCreate()
//2、导入隐式转换
import spark.implicits._
//3、读取文件创建DF
val df: DataFrame = spark.read.text("hdfs://master:9000/sparkSQL.json")
//4、DSL风格
df.select("name").show()
println("打印分隔符==========================")
//5、SQL风格查询
df.createTempView("language")
spark.sql("select * from language").show()
//6、把dataframe转换为rdd
df.rdd
//7、把dataframe转换为dataset
df.as[Language]
//8、关闭连接
spark.stop()
}
}
网友评论