一、SparkSQL概述
1、概念
官网:http://spark.apache.org/sql/
Spark SQK是Spark用来处理结构化数据(结构化数据可以来自外部结构化数据源也可以通过RDD获取)的一个模块
外部的结构化数据源包括 Json,parquet(默认),rmdbs,hive等
2、Spark SQL的优点
mapreduce hive(sql框架)减少代码编写
sparkcore sparksql(sql框架)
hive将sql转换成mapreduce,然后提交到集群上执行,大大简化了mapreduce的程序的复杂性,由于mapreduce这种计算模型。因此spark sql就应运而生了
优点:
- 容易整合
- 同一的数据访问方式
- 兼容hive
- 标准的数据连接
3、Spark SQL版本迭代
1)sparkSQL的前身是shark
2)spark-1.1(2014-9-11)开始的引入sparksql,对hive进行无缝的兼容
3)spark-1.3:增加了DataFrame的API
4)spark-1.4:增加了窗口分析函数
5)spark-1.5:增加了UDF函数
6)spark-1.6:引入DataSet SparkSession
7)spark-2.x:SparkSQL+DataFrame+DataSet(正式版本),Structured Streaming(DataSet),引入SparkSession 统一了 RDD,DataFrame,DataSet 的编程入口
二、SparkSession
1、介绍
SparkSession实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样可以使用的。SparkSession内部封装了SparkContext,所有计算实际上是由SparkContext完成的。
2、特点
1)为用户提供了同一的切入点使用spark各项功能
2)允许用户通过它调用DataFrame和DataSet相关API来编写程序
3)减少用户需要了解的一些概念,可以很容易的与Spark进行交互
4)与spark交互之时不需要显示创建sparkconf,sparkcontext以及sparksql,这些对象已经封闭在sparksession中
5)sparksession提供对hive特征的内部支持,用hive sql写sql语句访问hive udfs,从hive表中读取数据
在创建对象的时候加上: enableHiveSupport();
hive url/metstore:元数据库在哪里;
hive warehouse:真实数据在哪里
3、SparkSession的创建
1)如果在spark-shell中
[qyl@qyl02 ~]$ ~/apps/spark-2.3.1-bin-hadoop2.7/bin/spark-shell \
--master spark://qyl02:7077
--executor-memory 512m
--total-executor-cores 1
2)在idea中创建SparkSession
val spark=SparkSession.builder()
.appName("sparksqlexample")
.master("local")
.getOrCreate()
三、RDD/DataFrame/DataSet
1、RDD的局限性
RDD仅表示数据集,RDD没有元数据,也就是说没有字段语义定义
2、什么是DataFrame?
是按列名的方式去组织的一个分布式数据集(RDD)
由于RDD的局限性,Spark产生了DataFrame
DataFrame=RDD+Schema=schemaRDD
其中Schenam就是元数据,是语义描述
特点:
内部数据无类型,统一为Row
DataFrame是一种特殊类型的DataSet,DataSet[Row]=DataFrame
DataFrame自带优化器Catalyst,可以自动优化程序
DataFrame提供了一整套的Data Source API
3、DataSet的产生
Row 运行时类型检查,比如salary是字符串类型,下面的语句也只有运行时才进行类型检查
dataframe.filter("salary>1000").show()
由于DataFrame的数据类型统一是Row,所以DataFrame也是有缺点的。
明显缺点:
1、Row不能直接操作domain对象
2、函数风格编程,没有面向对象风格的API
所以,Spark SQL引入了DataSet,扩展了DataFrmae的API,提供了编译时类型检查,面向对象风格的API
在这里向大家推荐一个学习资料分享群:894951460
4、Spark SQL程序基本编写步骤
1)创建SparkSession对象
val sparksession:SparkSession=SparkSession.builder()
.appName("mysparkSession")
.master("local")
.getOrCreate()
2)创建DataFrame或者DataSet
3)在DataFrame或者DataSet之上进行转换和Action
4)返回结果
5、创建DataFrame
创建DataFrame有三种方式: val studentRDD
1、导入隐式转换
import spark.implicits._
val studentDF=studentRDD.toDF
2、使用spark.sqlcontext.createDataFrame(studentRDD,schema)方法创建
valstudentRowRDD=studentRDD.map(student=>Row(student.getName,student.getAge,student.
getClass))
val schema=StructType(fields=List(
StructField("name",DataTypes.StringType,false),
StructField("age",DataTypes.IntegerType,false),
StructField("class",DataTypes.StringTyps,false)
))
val studentDF=spark.createDataFrame(studentRowRDD,schema)
四、Spark SQL 的wordcount
1、准备数据 hello.txt
hello word hello test
hello you hello me
you is beautiful and is pure
2、编写代码
package com.qyl
import org.apache.spark.sql.SparkSession
object WordCount {
def main(args: Array[String]): Unit = {
/*
* 1.创建编程入口
* */
val spark = SparkSession.builder()
.master("local[2]")
.appName("WordCount")
.getOrCreate()
/*
* 2.读取文件,转成DataFrame
* */
import spark.implicits._
val helloDF = spark.read.text("data/hello.txt").toDF("line")
/*
* 3.创建临时视图
* */
helloDF.createOrReplaceTempView("hellotable")
println("--------------------split拆分--------------")
var sql=
"""
|select
|split(line," ")
|from hellotable
""".stripMargin
println("-----------explode压平----------------------")
sql=
"""
|select
|explode(split(line," ")) as word
|from hellotable
""".stripMargin
println("------------统计结果------------------")
sql="""
|select
| tmp.word,
| count(tmp.word) as count
|from (
| select
| explode(split(line, '\\s+')) as word
| from hellotable
|) tmp
|group by tmp.word
|order by count desc
""".stripMargin
spark.sql(sql).show()
spark.stop()
}
}
3、结果
| word|count|
+---------+-----+
| hello| 4|
| is| 2|
| you| 2|
| me| 1|
| pure| 1|
| word| 1|
| test| 1|
| and| 1|
|beautiful| 1|
五、Spark SQL高级用法
1、SparkSQL自定义普通函数
/**
* SparkSQL自定义UDF操作:
*
* 1、编写一个UDF函数,输入输出参数
* 2、向SQLContext进行注册,该UDF
* 3、就直接使用
*
* 案例:通过计算字符串长度的函数,来学习如何自定义UDF
*/
object _02SparkSQLUDFOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
val spark = SparkSession.builder()
.master("local[2]")
.appName("_02SparkSQLUDFOps")
// .enableHiveSupport()
.getOrCreate()
//第二步:向SQLContext进行注册,该UDF
spark.udf.register[Int, String]("strLen", str => strLen(str))
val topnDF = spark.read.json("data/sql/people.json")
topnDF.createOrReplaceTempView("people")
//自定义字符串长度函数,来就去表中name的长度
//第三步:就直接使用
val sql =
"""
|select
| name,
| strLen(name) nameLen
|from people
""".stripMargin
spark.sql(sql).show()
spark.stop()
}
//第一步:编写一个UDF函数,输入输出参数
def strLen(str:String):Int = str.length
}
2、自定义聚集(UDAF)函数
/**
* 自定义UDAF操作:
* 1、编写一个UDAF类,extends UserDefinedAggregateFunction
* 复写其中的若干方法
* 2、和UDF一样向SQLContext进行注册,该UDAF
* 3、就直接使用
* 模拟count函数
* 可以参考在sparkCore中学习的combineByKey或者aggregateByKey
*/
object _03SparkSQlUDAFOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
val spark = SparkSession.builder()
.master("local[2]")
.appName("_02SparkSQLUDFOps")
// .enableHiveSupport()
.getOrCreate()
//2、和UDF一样向SQLContext进行注册,该UDAF
spark.udf.register("myCount", new MyCountUDAF())
val topnDF = spark.read.json("data/sql/people.json")
topnDF.createOrReplaceTempView("people")
topnDF.show()
println("------------------------------")
//3、就直接使用
val sql =
"""
|select
| age,
| myCount(age) countz
|from people
|group by age
""".stripMargin
spark.sql(sql).show()
spark.stop()
}
}
/**
* 自定义UDAF
*/
class MyCountUDAF extends UserDefinedAggregateFunction {
//该udaf的输入的数据类型
override def inputSchema: StructType = {
StructType(List(
StructField("age", DataTypes.IntegerType, false)
))
}
/**
* 在该udaf聚合过程中的数据的类型Schema
*/
override def bufferSchema: StructType = {
StructType(List(
StructField("age", DataTypes.IntegerType, false)
))
}
//该udaf的输出的数据类型
override def dataType: DataType = DataTypes.IntegerType
//确定性判断,通常特定输入和输出的类型一致
override def deterministic: Boolean = true
/**
初始化的操作
var sum = 1
for(i <- 0 to 9) {
sum += i
}
row.get(0)
@param buffer 就是我们计算过程中的临时的存储了聚合结果的Buffer(extends Row)
*/
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, 0)//更新当前buffer数组中的第1列(索引为0)的值为0
}
/**
* 分区内的数据聚合合并
* @param buffer 就是我们在initialize方法中声明初始化的临时缓冲区
* @param input 聚合操作新传入的值
*/
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val oldValue = buffer.getInt(0) //row.get(0)
buffer.update(0, oldValue + 1)
}
/**
* 分区间的数据聚合合并
* 聚合之后将结果传递给分区一
* @param buffer1 分区一聚合的临时结果
* @param buffer2 分区二聚合的临时结果
* reduce(v1, v2)
*/
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val pav1 = buffer1.getInt(0)
val pav2 = buffer2.getInt(0)
buffer1.update(0, pav1 + pav2)
}
/**
* 该聚合函数最终要返回的值
* @param buffer 数据就被存储在该buffer中
*/
override def evaluate(buffer: Row): Any = {
buffer.getInt(0)
}
}
3、SparkSQL开窗函数的使用
* SparkSQL中的开窗函数的使用:
* row_number() --->分组topN(必须掌握)
* sum() over() --->分组累加
* avg/min/max() over() --->分组求最大
*
*/
object _05SparkSQLWindowFuncOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
val spark = SparkSession.builder()
.master("local[2]")
.appName("_04SparkSQLUDTFOps")
// .enableHiveSupport()
.getOrCreate()
val topnDF = spark.read.json("data/sql/topn.json")
topnDF.createOrReplaceTempView("stu_score")
println("==================原始数据=========================")
topnDF.show()
println("===========计算各个科目学员成绩降序==================")
val sql =
"""
|select
| course,
| name,
| score,
| row_number() over(partition by course order by score desc) rank
|from stu_score
""".stripMargin
spark.sql(sql).show()
println("=========计算各个科目学员成绩降序Top3================")
// val topnSQL =
// """
// |select
// | course,
// | name,
// | score,
// | row_number() over(partition by course order by score desc) rank
// |from stu_score
// |having rank < 4
// """.stripMargin
val topnSQL =
"""
|select
| tmp.*
|from (
| select
| course,
| name,
| score,
| row_number() over(partition by course order by score desc) rank
| from stu_score
|)tmp
|where tmp.rank < 4
""".stripMargin
spark.sql(topnSQL).show
spark.stop()
}
}
所用pom.xml文件的配置:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qyl</groupId>
<artifactId>sparkSQL</artifactId>
<version>1.0-SNAPSHOT</version>
<inceptionYear>2008</inceptionYear>
<properties>
<project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<spark.version>2.3.2</spark.version>
<hadoop.version>2.7.6</hadoop.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- sparkStreaming -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc -->
<dependency>
<groupId>org.dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<!--<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>-->
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<!--<manifest>
<mainClass></mainClass>
</manifest>-->
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.10</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<!-- 我们可以通过在这里添加多个source节点,来添加任意多个源文件夹 -->
<sources>
<source>src/main/java</source>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
如何学习大数据?学习没有资料?
想学习大数据开发技术,Hadoop,spark,云计算,数据分析、爬虫等技术,在这里向大家推荐一个学习资料分享群:894951460,里面有大牛已经整理好的相关学习资料,希望对你们有所帮助。
网友评论