什么是SparkSql?
- SparkSql作用
主要用于用于处理结构化数据,底层就是将SQL
语句转成RDD
执行 - SparkSql的数据抽象
1.DataFrame
2.DataSet
SparkSession
在老的版本中,SparkSQL提供两种SQL查询起始点:
- 一个叫SQLContext,用于Spark自己提供的SQL查询;
- 一个叫HiveContext,用于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。
SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的
。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。
引入依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
创建SparkSession
导包
import org.apache.spark.sql.SparkSession
SparkSession 构造器
@Stable
class SparkSession private(
@transient val sparkContext: SparkContext,
@transient private val existingSharedState: Option[SharedState],
@transient private val parentSessionState: Option[SessionState],
@transient private[sql] val extensions: SparkSessionExtensions)
extends Serializable with Closeable with Logging {...}
SparkSession
主构造器已被私有化,无法通过常规的new
创建对象。在SparkSession
伴生对象中,有个Builder
类及builder
方法
第一种方式:
创建Builder
对象获取SparkSession
实例
// 创建Builder实例
val builder = new spark.sql.SparkSession.Builder
// 调用getOrCreate获取 SparkSession 实例
val session: SparkSession = builder.getOrCreate()
第二种方式:
通过SparkSession
调用builder()
函数获取Builder
的实例
// 通过调用 builder() 获取 Builder实例
val builder: SparkSession.Builder = SparkSession.builder()
// 调用getOrCreate获取 SparkSession 实例
val session: SparkSession = builder.getOrCreate()
在使用SparkContext
时 可以在SparkConf
指定master
及appName
如:
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
在Builder
也是可以
val builder: SparkSession.Builder = SparkSession.builder()
builder.master("local[4]")
builder.appName("test")
创建好SparkSession
就可以开始下面的工作了。
spark sql 编程有两种方式
- 声明式:SQL
- 命令式:DSL
声明式:SQL
使用声明式,需要注册成表注册成表的四种方式
-
createOrReplaceTempView:创建临时视图,如果视图已经存在则覆盖[只能在当前sparksession中使用] 【重点】
-
createTempView: 创建临时视图,如果视图已经存在则报错[只能在当前sparksession中使用]
示例:
注册成表;viewName
指定表名
df.createGlobalTempView(viewName="表名")
编写sql
sparksession.sql("sql语句")
案例:
@Test
def sparkSqlBySql(): Unit ={
val female=List(
Student(2,"绣花",16,"女",1),
Student(5,"翠花",19,"女",2),
Student(9,"王菲菲",20,"女",1),
Student(11,"小惠",23,"女",1),
Student(12,"梦雅",25,"女",3)
)
val boys=List(
Student(1,"张三",18,"男",3),
Student(3,"李四",18,"男",2),
Student(4,"王五",18,"男",2),
Student(7,"张鹏",14,"男",1),
Student(8,"刘秀",13,"男",2),
Student(10,"乐乐",21,"男",1)
)
// 导入隐式转换
import sparkSession.implicits._
val femaleDf: DataFrame = female.toDF()
val boysDf: DataFrame = boys.toDF()
//合并
val unionAll=femaleDf.unionAll(boysDf)
// 注册成表
unionAll.createOrReplaceTempView(viewName = "student")
//编写sql
// 统计男女人数
sparkSession.sql(
"""
|select sex,count(*) sex_count from student
|group by sex
|""".stripMargin).show()
}
+---+---------+
|sex|sex_count|
+---+---------+
| 男| 6|
| 女| 5|
+---+---------+
也可以支持开窗
// 统计男女人数
sparkSession.sql(
"""
|select *,row_number() over(partition by sex order by age)as rn from student
|""".stripMargin).show()
+---+------+---+---+-------+---+
| id| name|age|sex|classId| rn|
+---+------+---+---+-------+---+
| 8| 刘秀| 13| 男| 2| 1|
| 7| 张鹏| 14| 男| 1| 2|
| 1| 张三| 18| 男| 3| 3|
| 3| 李四| 18| 男| 2| 4|
| 4| 王五| 18| 男| 2| 5|
| 10| 乐乐| 21| 男| 1| 6|
| 2| 绣花| 16| 女| 1| 1|
| 5| 翠花| 19| 女| 2| 2|
| 9|王菲菲| 20| 女| 1| 3|
| 11| 小惠| 23| 女| 1| 4|
| 12| 梦雅| 25| 女| 3| 5|
+---+------+---+---+-------+---+
-
createOrReplaceGlobalTempView: 创建全局视图,如果视图已经存在则覆盖[能够在多个sparksession中使用]
-
createGlobalTempView: 创建全局视图,如果视图已经存在则报错[能够在多个sparksession中使用]
注意:
使用createOrReplaceGlobalTempView
、createGlobalTempView
创建的表后续查询的时候必须通过 global_temp.表名
方式使用
// 统计男女人数
sparkSession.sql(
"""
|select *,row_number() over(partition by sex order by age)as rn from global_temp.student
|""".stripMargin).show()
// 获取一个新的sparkSession
val sparkSession2: SparkSession = sparkSession.newSession()
sparkSession2.sql(
"""
|select *,row_number() over(partition by sex order by age)as rn from global_temp.student
|""".stripMargin).show()
结果都是一样,略...
命令式:DSL
通过算子操作数据
参考:https://blog.csdn.net/dabokele/article/details/52802150
DataFrame对象上Action操作
- show:展示数据
- collect:获取所有数据到数组
- collectAsList:获取所有数据到List
- describe(cols: String*):获取指定字段的统计信息
- first, head, take, takeAsList:获取若干行记录
DataFrame对象上的条件查询和join等操作
- where条件相关
1.where(conditionExpr: String):SQL语言中where关键字后的条件
2.filter:根据字段进行筛选 - 查询指定字段
1.select:获取指定字段值
2.electExpr:可以对指定字段进行特殊处理
3.col:获取指定字段
4.apply:获取指定字段
5.drop:去除指定字段,保留其他字段 - limit
limit方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take与head不同的是,limit方法不是Action操作。 - order by
1.orderBy和sort:按指定字段排序,默认为升序
2.sortWithinPartitions
和上面的sort方法功能类似,区别在于sortWithinPartitions方法返回的是按Partition排好序的DataFrame对象。 - group by
1.groupBy:根据字段进行group by操作
2.cube和rollup:group by的扩展
3.GroupedData对象
该方法得到的是GroupedData类型对象,在GroupedData的API中提供了group by之后的操作,比如,
max(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段
min(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段
mean(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段
sum(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段
count()
方法,获取分组中的元素个数 - distinct
1.distinct:返回一个不包含重复记录的DataFrame
2.dropDuplicates:根据指定字段去重 - 聚合
1.聚合操作调用的是agg方法,该方法有多种调用方式。一般与groupBy方法配合使用。 - union
1.unionAll方法:对两个DataFrame进行组合 - join
1.笛卡尔积
2.using一个字段形式
3.using多个字段形式
4.指定join类型
5.使用Column类型来join
6.在指定join字段同时指定join类型 - 获取指定字段统计信息
1.stat方法可以用于计算指定字段或指定字段之间的统计信息,比如方差,协方差等。这个方法返回一个DataFramesStatFunctions类型对象。 - 获取两个DataFrame中共有的记录
1.intersect方法可以计算出两个DataFrame中相同的记录, - 获取一个DataFrame中有另一个DataFrame中没有的记录
1.使用 except - 操作字段名
1.withColumnRenamed:重命名DataFrame中的指定字段名
如果指定的字段名不存在,不进行任何操作
2.withColumn:往当前DataFrame中新增一列
whtiColumn(colName: String , col: Column)方法根据指定colName往DataFrame中新增一列,如果colName已存在,则会覆盖当前列。 - 行转列
1.有时候需要根据某个字段内容进行分割,然后生成多行,这时可以使用explode方法 - 其他操作
API中还有na, randomSplit, repartition, alias, as方法。
网友评论