最全的SparkSql核心技术分享

作者: 大数据首席数据师 | 来源:发表于2019-01-10 15:51 被阅读26次

    一、SparkSQL概述

    1、概念
    官网:http://spark.apache.org/sql/
    Spark SQK是Spark用来处理结构化数据(结构化数据可以来自外部结构化数据源也可以通过RDD获取)的一个模块

    外部的结构化数据源包括 Json,parquet(默认),rmdbs,hive等

    2、Spark SQL的优点
    mapreduce hive(sql框架)减少代码编写

    sparkcore sparksql(sql框架)

    hive将sql转换成mapreduce,然后提交到集群上执行,大大简化了mapreduce的程序的复杂性,由于mapreduce这种计算模型。因此spark sql就应运而生了

    优点:

    • 容易整合
    • 同一的数据访问方式
    • 兼容hive
    • 标准的数据连接

    3、Spark SQL版本迭代
    1)sparkSQL的前身是shark

    2)spark-1.1(2014-9-11)开始的引入sparksql,对hive进行无缝的兼容

    3)spark-1.3:增加了DataFrame的API

    4)spark-1.4:增加了窗口分析函数

    5)spark-1.5:增加了UDF函数

    6)spark-1.6:引入DataSet SparkSession

    7)spark-2.x:SparkSQL+DataFrame+DataSet(正式版本),Structured Streaming(DataSet),引入SparkSession 统一了 RDD,DataFrame,DataSet 的编程入口

    二、SparkSession

    1、介绍
    SparkSession实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样可以使用的。SparkSession内部封装了SparkContext,所有计算实际上是由SparkContext完成的。

    2、特点
    1)为用户提供了同一的切入点使用spark各项功能

    2)允许用户通过它调用DataFrame和DataSet相关API来编写程序

    3)减少用户需要了解的一些概念,可以很容易的与Spark进行交互

    4)与spark交互之时不需要显示创建sparkconf,sparkcontext以及sparksql,这些对象已经封闭在sparksession中

    5)sparksession提供对hive特征的内部支持,用hive sql写sql语句访问hive udfs,从hive表中读取数据

                      在创建对象的时候加上: enableHiveSupport();
    
                         hive url/metstore:元数据库在哪里;
    
                        hive  warehouse:真实数据在哪里
    

    3、SparkSession的创建
    1)如果在spark-shell中

    [qyl@qyl02 ~]$ ~/apps/spark-2.3.1-bin-hadoop2.7/bin/spark-shell \

    --master spark://qyl02:7077
    --executor-memory 512m
    --total-executor-cores 1
    2)在idea中创建SparkSession

    val spark=SparkSession.builder()
                          .appName("sparksqlexample")
                          .master("local")
                          .getOrCreate()
    

    三、RDD/DataFrame/DataSet

    1、RDD的局限性
    RDD仅表示数据集,RDD没有元数据,也就是说没有字段语义定义

    2、什么是DataFrame?
    是按列名的方式去组织的一个分布式数据集(RDD)

          由于RDD的局限性,Spark产生了DataFrame
    
         DataFrame=RDD+Schema=schemaRDD
    
               其中Schenam就是元数据,是语义描述
    

    特点:

      内部数据无类型,统一为Row
    
      DataFrame是一种特殊类型的DataSet,DataSet[Row]=DataFrame
    
      DataFrame自带优化器Catalyst,可以自动优化程序
    
      DataFrame提供了一整套的Data Source API
    

    3、DataSet的产生
    Row 运行时类型检查,比如salary是字符串类型,下面的语句也只有运行时才进行类型检查

    dataframe.filter("salary>1000").show()
    由于DataFrame的数据类型统一是Row,所以DataFrame也是有缺点的。

    明显缺点:

    1、Row不能直接操作domain对象

    2、函数风格编程,没有面向对象风格的API

    所以,Spark SQL引入了DataSet,扩展了DataFrmae的API,提供了编译时类型检查,面向对象风格的API
    在这里向大家推荐一个学习资料分享群:894951460
    4、Spark SQL程序基本编写步骤
    1)创建SparkSession对象

    val sparksession:SparkSession=SparkSession.builder()
                                              .appName("mysparkSession")
                                              .master("local")
                                              .getOrCreate()
    

    2)创建DataFrame或者DataSet

    3)在DataFrame或者DataSet之上进行转换和Action

    4)返回结果

    5、创建DataFrame
    创建DataFrame有三种方式: val studentRDD

    1、导入隐式转换

    import spark.implicits._
      val studentDF=studentRDD.toDF
    

    2、使用spark.sqlcontext.createDataFrame(studentRDD,schema)方法创建

    valstudentRowRDD=studentRDD.map(student=>Row(student.getName,student.getAge,student.
     getClass))
     
    val schema=StructType(fields=List(
            StructField("name",DataTypes.StringType,false),
            StructField("age",DataTypes.IntegerType,false),
            StructField("class",DataTypes.StringTyps,false)
    ))
     
    val studentDF=spark.createDataFrame(studentRowRDD,schema)
    

    四、Spark SQL 的wordcount

    1、准备数据 hello.txt
    hello word hello test
    hello you hello me
    you is beautiful and is pure
    2、编写代码

    package com.qyl
     
    import org.apache.spark.sql.SparkSession
     
    object WordCount {
      def main(args: Array[String]): Unit = {
     
        /*
        * 1.创建编程入口
        * */
        val spark = SparkSession.builder()
          .master("local[2]")
          .appName("WordCount")
          .getOrCreate()
        /*
        * 2.读取文件,转成DataFrame
        * */
        import spark.implicits._
        val helloDF = spark.read.text("data/hello.txt").toDF("line")
        /*
        * 3.创建临时视图
        * */
        helloDF.createOrReplaceTempView("hellotable")
        println("--------------------split拆分--------------")
        var sql=
          """
            |select
            |split(line," ")
            |from hellotable
          """.stripMargin
        println("-----------explode压平----------------------")
        sql=
          """
            |select
            |explode(split(line," ")) as word
            |from hellotable
          """.stripMargin
        println("------------统计结果------------------")
        sql="""
          |select
          | tmp.word,
          | count(tmp.word) as count
          |from (
          |  select
          |     explode(split(line, '\\s+')) as word
          |  from hellotable
          |) tmp
          |group by tmp.word
          |order by count desc
        """.stripMargin
     
        spark.sql(sql).show()
        spark.stop()
      }
    }
    

    3、结果

    | word|count|
    +---------+-----+
    | hello| 4|
    | is| 2|
    | you| 2|
    | me| 1|
    | pure| 1|
    | word| 1|
    | test| 1|
    | and| 1|
    |beautiful| 1|

    五、Spark SQL高级用法
    1、SparkSQL自定义普通函数

    /**
      * SparkSQL自定义UDF操作:
      *
      *  1、编写一个UDF函数,输入输出参数
      *  2、向SQLContext进行注册,该UDF
      *  3、就直接使用
      *
      *  案例:通过计算字符串长度的函数,来学习如何自定义UDF
      */
    object _02SparkSQLUDFOps {
        def main(args: Array[String]): Unit = {
            Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
            Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
            Logger.getLogger("org.project-spark").setLevel(Level.WARN)
     
            val spark = SparkSession.builder()
                .master("local[2]")
                .appName("_02SparkSQLUDFOps")
                //            .enableHiveSupport()
                .getOrCreate()
            //第二步:向SQLContext进行注册,该UDF
            spark.udf.register[Int, String]("strLen", str => strLen(str))
     
            val topnDF = spark.read.json("data/sql/people.json")
            topnDF.createOrReplaceTempView("people")
            //自定义字符串长度函数,来就去表中name的长度
            //第三步:就直接使用
            val sql =
                """
                  |select
                  | name,
                  | strLen(name) nameLen
                  |from people
                """.stripMargin
     
            spark.sql(sql).show()
            spark.stop()
        }
     
        //第一步:编写一个UDF函数,输入输出参数
        def strLen(str:String):Int = str.length
    }
    2、自定义聚集(UDAF)函数
    /**
      * 自定义UDAF操作:
      *  1、编写一个UDAF类,extends UserDefinedAggregateFunction
      *     复写其中的若干方法
      *  2、和UDF一样向SQLContext进行注册,该UDAF
      *  3、就直接使用
      *  模拟count函数
      *  可以参考在sparkCore中学习的combineByKey或者aggregateByKey
      */
    object _03SparkSQlUDAFOps {
        def main(args: Array[String]): Unit = {
            Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
            Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
            Logger.getLogger("org.project-spark").setLevel(Level.WARN)
     
            val spark = SparkSession.builder()
                .master("local[2]")
                .appName("_02SparkSQLUDFOps")
                //            .enableHiveSupport()
                .getOrCreate()
            //2、和UDF一样向SQLContext进行注册,该UDAF
            spark.udf.register("myCount", new MyCountUDAF())
     
            val topnDF = spark.read.json("data/sql/people.json")
            topnDF.createOrReplaceTempView("people")
            topnDF.show()
     
            println("------------------------------")
            //3、就直接使用
            val sql =
                """
                  |select
                  |  age,
                  |  myCount(age) countz
                  |from people
                  |group by age
                """.stripMargin
            spark.sql(sql).show()
            spark.stop()
        }
    }
    /**
      * 自定义UDAF
      */
    class MyCountUDAF extends UserDefinedAggregateFunction {
        //该udaf的输入的数据类型
        override def inputSchema: StructType = {
            StructType(List(
                StructField("age", DataTypes.IntegerType, false)
            ))
        }
        /**
          * 在该udaf聚合过程中的数据的类型Schema
          */
        override def bufferSchema: StructType = {
            StructType(List(
                StructField("age", DataTypes.IntegerType, false)
            ))
        }
     
        //该udaf的输出的数据类型
        override def dataType: DataType = DataTypes.IntegerType
     
        //确定性判断,通常特定输入和输出的类型一致
        override def deterministic: Boolean = true
        /**
            初始化的操作
          var sum = 1
          for(i <- 0 to 9) {
            sum += i
          }
          row.get(0)
          @param buffer 就是我们计算过程中的临时的存储了聚合结果的Buffer(extends Row)
         */
        override def initialize(buffer: MutableAggregationBuffer): Unit = {
            buffer.update(0, 0)//更新当前buffer数组中的第1列(索引为0)的值为0
        }
        /**
          * 分区内的数据聚合合并
          * @param buffer 就是我们在initialize方法中声明初始化的临时缓冲区
          * @param input  聚合操作新传入的值
          */
        override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
            val oldValue = buffer.getInt(0) //row.get(0)
            buffer.update(0,  oldValue + 1)
        }
        /**
          * 分区间的数据聚合合并
          * 聚合之后将结果传递给分区一
          * @param buffer1 分区一聚合的临时结果
          * @param buffer2 分区二聚合的临时结果
          *                reduce(v1, v2)
          */
        override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
            val pav1 = buffer1.getInt(0)
            val pav2 = buffer2.getInt(0)
            buffer1.update(0, pav1 + pav2)
        }
        /**
          * 该聚合函数最终要返回的值
          * @param buffer 数据就被存储在该buffer中
          */
        override def evaluate(buffer: Row): Any = {
            buffer.getInt(0)
        }
    }
    3、SparkSQL开窗函数的使用
     * SparkSQL中的开窗函数的使用:
      *     row_number()         --->分组topN(必须掌握)
      *     sum() over()         --->分组累加
      *     avg/min/max() over() --->分组求最大
      *
      */
    object _05SparkSQLWindowFuncOps {
        def main(args: Array[String]): Unit = {
            Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
            Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
            Logger.getLogger("org.project-spark").setLevel(Level.WARN)
            val spark = SparkSession.builder()
                .master("local[2]")
                .appName("_04SparkSQLUDTFOps")
    //            .enableHiveSupport()
                .getOrCreate()
            val topnDF = spark.read.json("data/sql/topn.json")
            topnDF.createOrReplaceTempView("stu_score")
            println("==================原始数据=========================")
            topnDF.show()
            println("===========计算各个科目学员成绩降序==================")
            val sql =
                """
                  |select
                  |    course,
                  |    name,
                  |    score,
                  |    row_number() over(partition by course order by score desc) rank
                  |from stu_score
                """.stripMargin
            spark.sql(sql).show()
            println("=========计算各个科目学员成绩降序Top3================")
    //        val topnSQL =
    //            """
    //              |select
    //              |    course,
    //              |    name,
    //              |    score,
    //              |    row_number() over(partition by course order by score desc) rank
    //              |from stu_score
    //              |having rank < 4
    //            """.stripMargin
            val topnSQL =
                """
                  |select
                  | tmp.*
                  |from (
                  |   select
                  |      course,
                  |      name,
                  |      score,
                  |      row_number() over(partition by course order by score desc) rank
                  |   from stu_score
                  |)tmp
                  |where tmp.rank < 4
                """.stripMargin
            spark.sql(topnSQL).show
            spark.stop()
        }
    }
    

    所用pom.xml文件的配置:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.qyl</groupId>
      <artifactId>sparkSQL</artifactId>
      <version>1.0-SNAPSHOT</version>
      <inceptionYear>2008</inceptionYear>
      <properties>
        <project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.2</spark.version>
        <hadoop.version>2.7.6</hadoop.version>
        <scala.compat.version>2.11</scala.compat.version>
      </properties>
     
      <repositories>
        <repository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </repository>
      </repositories>
     
      <pluginRepositories>
        <pluginRepository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
      </pluginRepositories>
     
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
        </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.4</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.specs</groupId>
          <artifactId>specs</artifactId>
          <version>1.2.5</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <!-- sparkStreaming -->
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc -->
        <dependency>
          <groupId>org.dom4j</groupId>
          <artifactId>dom4j</artifactId>
          <version>2.0.0</version>
        </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.12</version>
          <scope>compile</scope>
        </dependency>
      </dependencies>
     
      <build>
        <!--<sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>-->
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <version>2.15.0</version>
            <executions>
              <execution>
                <goals>
                  <goal>compile</goal>
                  <goal>testCompile</goal>
                </goals>
              </execution>
            </executions>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
              <args>
                <arg>-target:jvm-1.5</arg>
              </args>
            </configuration>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-eclipse-plugin</artifactId>
            <configuration>
              <downloadSources>true</downloadSources>
              <buildcommands>
                <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
              </buildcommands>
              <additionalProjectnatures>
                <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
              </additionalProjectnatures>
              <classpathContainers>
                <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
              </classpathContainers>
            </configuration>
          </plugin>
          <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
              <archive>
                <!--<manifest>
                  <mainClass></mainClass>
                </manifest>-->
              </archive>
            </configuration>
            <executions>
              <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                  <goal>single</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
              <source>1.8</source>
              <target>1.8</target>
            </configuration>
          </plugin>
          <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>build-helper-maven-plugin</artifactId>
            <version>1.10</version>
            <executions>
              <execution>
                <id>add-source</id>
                <phase>generate-sources</phase>
                <goals>
                  <goal>add-source</goal>
                </goals>
                <configuration>
                  <!-- 我们可以通过在这里添加多个source节点,来添加任意多个源文件夹 -->
                  <sources>
                    <source>src/main/java</source>
                    <source>src/main/scala</source>
                  </sources>
                </configuration>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
      <reporting>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
            </configuration>
          </plugin>
        </plugins>
      </reporting>
    </project>
    

    如何学习大数据?学习没有资料?

    想学习大数据开发技术,Hadoop,spark,云计算,数据分析、爬虫等技术,在这里向大家推荐一个学习资料分享群:894951460,里面有大牛已经整理好的相关学习资料,希望对你们有所帮助。

    相关文章

      网友评论

        本文标题:最全的SparkSql核心技术分享

        本文链接:https://www.haomeiwen.com/subject/hkearqtx.html