美文网首页
Spark学习(四)——Spark SQL

Spark学习(四)——Spark SQL

作者: 大数据阶梯之路 | 来源:发表于2021-03-24 15:36 被阅读0次

在知乎看到文章说,学习spark分为两个方向,一个是数据平台开发,一个是数据处理开发,如果用spark的话做数据处理,日常用得最多的就是sparkSQL,需要懂得什么是数据倾斜,原理,如何解决,还有sparkSQL的语句的灵活编写,了解spark的基本原理和特性方便定位问题。平时写习惯了hiveQL处理离线数仓数据的小江开始入门sparkSQL。

一、SparkSQL学习点

学习sparkSQL时可以与hiveQL对比,本质两者是同类的,比如hiveQL是经过hive解析器和优化最后转换成MapReduce任务然后提交到集群去运行,sparkSQL也是经过转换为RDD然后提交到集群去运行的,相比MapReduce这种计算模型,RDD就体现出效率之快了,毕竟计算的中间结果是在内存中,同时spark也支持从hive读取数据。

类比图.png

1、DataFrame API

在spark中,dataframe是一种基于RDD的分布式数据集,从spark1.3加入的数据抽象,与RDD的主要区别在于dataframe是具有schema元信息的,类似于二维表结构化数据,RDD是没有这些二维行列信息的,使得用户可以洞察更多的结构数据且做针对性的查询优化,达到提升运行时效率。比如可以有DataFrame[Person]

2、DataSet API

dataset也是一种分布式数据集,是dataframe的一种扩展,在spark1.6后加入的新的数据抽象,与RDD相比,多了强类型检查和兼顾了dataframe的查询优化特性,在编译期就做类型检查,使得用户可以更好地感知结构化数据。比如可以有DataSet[Person]

三者关系.png

3、 RDD、DataFrame、DataSet三者相互转换

  • RDD可通过样例类转换成DataFrame、DataSet:样例类.toDF ,样例类.toDS
  • DataFrame、DataSet也可以直接转换为RDD:ds.rdd
  • DataFrame转换为DataSet,借助定义的case class 样例类:ds.as[Person]
  • DataSet转换为DataFrame:ds.toDF

借助以下图来加深印象了解:


image.png

4、UDF用户自定义函数、UDAF用户自定义聚合函数

//如何自定义一个UDF
spark.udf.register("funcName",(,,)=>())
//如何自定义一个UDAF
自定义udaf,首先要继承UserDefinedAggregateFunction类,然后重写里面的8个方法(4个格式定义+4个计算)

5、Spark读写数据 API

//读取数据文件,load加载数据,创建dataframe
spark.read.json/text/csv (path)
spark.read.format("json/text.csv").load(path)

spark.read.jdbc(url,table,username,password)
spark.read.format("jdbc").option(url).option(table).option(username).option(password).load()

//保存数据文件,save保存数据
df.write.json(path)    //默认模式SaveMode,即当存在了就报错的模式ErrorIfExists
模式可以有多种,比如追加append、忽略ignore、覆盖overwrite
df.write.mode("overwrite").json("path")    //保存到指定路径,当存在文件名就覆盖
df.write.format("json").save(path)

df.write.jdbc(url,table,username,password)
df.write.format("jdbc").option(url).option(table).option(username).option(password).save()

6、SparkSQL例子

package com.meizu.xiaojiang

import org.apache.spark.sql.{DataFrame, SparkSession}

case class Language(name: String,job: String)

object Spark_SQL {
    def main(args: Array[String]): Unit = {

        //1、创建SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("Spark_SQL")
            .getOrCreate()

        //2、导入隐式转换
        import spark.implicits._

        //3、读取文件创建DF
        val df: DataFrame = spark.read.text("hdfs://master:9000/sparkSQL.json")

        //4、DSL风格
        df.select("name").show()

        println("打印分隔符==========================")

        //5、SQL风格查询
        df.createTempView("language")
        spark.sql("select * from language").show()

        //6、把dataframe转换为rdd
        df.rdd

        //7、把dataframe转换为dataset
        df.as[Language]

        //8、关闭连接
        spark.stop()

    }
}

相关文章

网友评论

      本文标题:Spark学习(四)——Spark SQL

      本文链接:https://www.haomeiwen.com/subject/nwrldktx.html