美文网首页
Spark学习(四)——Spark SQL

Spark学习(四)——Spark SQL

作者: 大数据阶梯之路 | 来源:发表于2021-03-24 15:36 被阅读0次

    在知乎看到文章说,学习spark分为两个方向,一个是数据平台开发,一个是数据处理开发,如果用spark的话做数据处理,日常用得最多的就是sparkSQL,需要懂得什么是数据倾斜,原理,如何解决,还有sparkSQL的语句的灵活编写,了解spark的基本原理和特性方便定位问题。平时写习惯了hiveQL处理离线数仓数据的小江开始入门sparkSQL。

    一、SparkSQL学习点

    学习sparkSQL时可以与hiveQL对比,本质两者是同类的,比如hiveQL是经过hive解析器和优化最后转换成MapReduce任务然后提交到集群去运行,sparkSQL也是经过转换为RDD然后提交到集群去运行的,相比MapReduce这种计算模型,RDD就体现出效率之快了,毕竟计算的中间结果是在内存中,同时spark也支持从hive读取数据。

    类比图.png

    1、DataFrame API

    在spark中,dataframe是一种基于RDD的分布式数据集,从spark1.3加入的数据抽象,与RDD的主要区别在于dataframe是具有schema元信息的,类似于二维表结构化数据,RDD是没有这些二维行列信息的,使得用户可以洞察更多的结构数据且做针对性的查询优化,达到提升运行时效率。比如可以有DataFrame[Person]

    2、DataSet API

    dataset也是一种分布式数据集,是dataframe的一种扩展,在spark1.6后加入的新的数据抽象,与RDD相比,多了强类型检查和兼顾了dataframe的查询优化特性,在编译期就做类型检查,使得用户可以更好地感知结构化数据。比如可以有DataSet[Person]

    三者关系.png

    3、 RDD、DataFrame、DataSet三者相互转换

    • RDD可通过样例类转换成DataFrame、DataSet:样例类.toDF ,样例类.toDS
    • DataFrame、DataSet也可以直接转换为RDD:ds.rdd
    • DataFrame转换为DataSet,借助定义的case class 样例类:ds.as[Person]
    • DataSet转换为DataFrame:ds.toDF

    借助以下图来加深印象了解:


    image.png

    4、UDF用户自定义函数、UDAF用户自定义聚合函数

    //如何自定义一个UDF
    spark.udf.register("funcName",(,,)=>())
    //如何自定义一个UDAF
    自定义udaf,首先要继承UserDefinedAggregateFunction类,然后重写里面的8个方法(4个格式定义+4个计算)
    

    5、Spark读写数据 API

    //读取数据文件,load加载数据,创建dataframe
    spark.read.json/text/csv (path)
    spark.read.format("json/text.csv").load(path)
    
    spark.read.jdbc(url,table,username,password)
    spark.read.format("jdbc").option(url).option(table).option(username).option(password).load()
    
    //保存数据文件,save保存数据
    df.write.json(path)    //默认模式SaveMode,即当存在了就报错的模式ErrorIfExists
    模式可以有多种,比如追加append、忽略ignore、覆盖overwrite
    df.write.mode("overwrite").json("path")    //保存到指定路径,当存在文件名就覆盖
    df.write.format("json").save(path)
    
    df.write.jdbc(url,table,username,password)
    df.write.format("jdbc").option(url).option(table).option(username).option(password).save()
    

    6、SparkSQL例子

    package com.meizu.xiaojiang
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    case class Language(name: String,job: String)
    
    object Spark_SQL {
        def main(args: Array[String]): Unit = {
    
            //1、创建SparkSession
            val spark: SparkSession = SparkSession
                .builder()
                .master("local[*]")
                .appName("Spark_SQL")
                .getOrCreate()
    
            //2、导入隐式转换
            import spark.implicits._
    
            //3、读取文件创建DF
            val df: DataFrame = spark.read.text("hdfs://master:9000/sparkSQL.json")
    
            //4、DSL风格
            df.select("name").show()
    
            println("打印分隔符==========================")
    
            //5、SQL风格查询
            df.createTempView("language")
            spark.sql("select * from language").show()
    
            //6、把dataframe转换为rdd
            df.rdd
    
            //7、把dataframe转换为dataset
            df.as[Language]
    
            //8、关闭连接
            spark.stop()
    
        }
    }
    

    相关文章

      网友评论

          本文标题:Spark学习(四)——Spark SQL

          本文链接:https://www.haomeiwen.com/subject/nwrldktx.html