美文网首页
SparkSql之编程方式

SparkSql之编程方式

作者: 万事万物 | 来源:发表于2021-07-19 08:30 被阅读0次

    什么是SparkSql?

    • SparkSql作用
      主要用于用于处理结构化数据,底层就是将SQL语句转成RDD执行
    • SparkSql的数据抽象
      1.DataFrame
      2.DataSet

    SparkSession

    在老的版本中,SparkSQL提供两种SQL查询起始点:

    • 一个叫SQLContext,用于Spark自己提供的SQL查询;
    • 一个叫HiveContext,用于连接Hive的查询。

    SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

    SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。

    引入依赖

        <dependencies>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>compile</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>3.0.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>3.0.0</version>
            </dependency>
    
        </dependencies>
    

    创建SparkSession

    导包

    import org.apache.spark.sql.SparkSession
    

    SparkSession 构造器

    @Stable
    class SparkSession private(
        @transient val sparkContext: SparkContext,
        @transient private val existingSharedState: Option[SharedState],
        @transient private val parentSessionState: Option[SessionState],
        @transient private[sql] val extensions: SparkSessionExtensions)
      extends Serializable with Closeable with Logging {...}
    

    SparkSession 主构造器已被私有化,无法通过常规的new创建对象。在SparkSession伴生对象中,有个Builder类及builder方法

    第一种方式:
    创建Builder 对象获取SparkSession 实例

    // 创建Builder实例
    val builder = new spark.sql.SparkSession.Builder
    // 调用getOrCreate获取 SparkSession 实例
    val session: SparkSession = builder.getOrCreate()
    

    第二种方式:
    通过SparkSession调用builder()函数获取Builder的实例

    // 通过调用 builder() 获取 Builder实例
    val builder: SparkSession.Builder = SparkSession.builder()
    // 调用getOrCreate获取 SparkSession 实例
    val session: SparkSession = builder.getOrCreate()
    

    在使用SparkContext时 可以在SparkConf指定masterappName
    如:

    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)
    

    Builder也是可以

    val builder: SparkSession.Builder = SparkSession.builder()
    builder.master("local[4]")
    builder.appName("test")
    

    创建好SparkSession就可以开始下面的工作了。


    spark sql 编程有两种方式

    • 声明式:SQL
    • 命令式:DSL

    声明式:SQL

    使用声明式,需要注册成表注册成表的四种方式

    • createOrReplaceTempView:创建临时视图,如果视图已经存在则覆盖[只能在当前sparksession中使用] 【重点】

    • createTempView: 创建临时视图,如果视图已经存在则报错[只能在当前sparksession中使用]

    示例:
    注册成表;viewName指定表名

     df.createGlobalTempView(viewName="表名")
    

    编写sql

    sparksession.sql("sql语句")
    

    案例:

    @Test
      def sparkSqlBySql(): Unit ={
        val female=List(
          Student(2,"绣花",16,"女",1),
          Student(5,"翠花",19,"女",2),
          Student(9,"王菲菲",20,"女",1),
          Student(11,"小惠",23,"女",1),
          Student(12,"梦雅",25,"女",3)
        )
    
        val boys=List(
          Student(1,"张三",18,"男",3),
          Student(3,"李四",18,"男",2),
          Student(4,"王五",18,"男",2),
          Student(7,"张鹏",14,"男",1),
          Student(8,"刘秀",13,"男",2),
          Student(10,"乐乐",21,"男",1)
        )
    
        // 导入隐式转换
        import sparkSession.implicits._
    
        val femaleDf: DataFrame = female.toDF()
        val boysDf: DataFrame = boys.toDF()
    
        //合并
        val unionAll=femaleDf.unionAll(boysDf)
    
    
        // 注册成表
        unionAll.createOrReplaceTempView(viewName = "student")
    
    
        //编写sql
    
        // 统计男女人数
        sparkSession.sql(
          """
            |select sex,count(*) sex_count from student
            |group by sex
            |""".stripMargin).show()
    
      }
    
    +---+---------+
    |sex|sex_count|
    +---+---------+
    | 男|        6|
    | 女|        5|
    +---+---------+
    

    也可以支持开窗

        // 统计男女人数
        sparkSession.sql(
          """
            |select *,row_number() over(partition by sex order by age)as rn from student
            |""".stripMargin).show()
    
    +---+------+---+---+-------+---+
    | id|  name|age|sex|classId| rn|
    +---+------+---+---+-------+---+
    |  8|  刘秀| 13| 男|      2|  1|
    |  7|  张鹏| 14| 男|      1|  2|
    |  1|  张三| 18| 男|      3|  3|
    |  3|  李四| 18| 男|      2|  4|
    |  4|  王五| 18| 男|      2|  5|
    | 10|  乐乐| 21| 男|      1|  6|
    |  2|  绣花| 16| 女|      1|  1|
    |  5|  翠花| 19| 女|      2|  2|
    |  9|王菲菲| 20| 女|      1|  3|
    | 11|  小惠| 23| 女|      1|  4|
    | 12|  梦雅| 25| 女|      3|  5|
    +---+------+---+---+-------+---+
    

    • createOrReplaceGlobalTempView: 创建全局视图,如果视图已经存在则覆盖[能够在多个sparksession中使用]

    • createGlobalTempView: 创建全局视图,如果视图已经存在则报错[能够在多个sparksession中使用]

    注意:使用createOrReplaceGlobalTempViewcreateGlobalTempView创建的表后续查询的时候必须通过 global_temp.表名 方式使用

        // 统计男女人数
        sparkSession.sql(
          """
            |select *,row_number() over(partition by sex order by age)as rn from global_temp.student
            |""".stripMargin).show()
    
        // 获取一个新的sparkSession
        val sparkSession2: SparkSession = sparkSession.newSession()
        sparkSession2.sql(
          """
            |select *,row_number() over(partition by sex order by age)as rn from global_temp.student
            |""".stripMargin).show()
    

    结果都是一样,略...


    命令式:DSL

    通过算子操作数据
    参考:https://blog.csdn.net/dabokele/article/details/52802150

    DataFrame对象上Action操作

    1. show:展示数据
    2. collect:获取所有数据到数组
    3. collectAsList:获取所有数据到List
    4. describe(cols: String*):获取指定字段的统计信息
    5. first, head, take, takeAsList:获取若干行记录

    DataFrame对象上的条件查询和join等操作

    • where条件相关
      1.where(conditionExpr: String):SQL语言中where关键字后的条件
      2.filter:根据字段进行筛选
    • 查询指定字段
      1.select:获取指定字段值
      2.electExpr:可以对指定字段进行特殊处理
      3.col:获取指定字段
      4.apply:获取指定字段
      5.drop:去除指定字段,保留其他字段
    • limit
      limit方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take与head不同的是,limit方法不是Action操作。
    • order by
      1.orderBy和sort:按指定字段排序,默认为升序
      2.sortWithinPartitions
        和上面的sort方法功能类似,区别在于sortWithinPartitions方法返回的是按Partition排好序的DataFrame对象。
    • group by
      1.groupBy:根据字段进行group by操作
      2.cube和rollup:group by的扩展
      3.GroupedData对象
        该方法得到的是GroupedData类型对象,在GroupedData的API中提供了group by之后的操作,比如,
        max(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段
        min(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段
        mean(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段
        sum(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段
        count()方法,获取分组中的元素个数
    • distinct
      1.distinct:返回一个不包含重复记录的DataFrame
      2.dropDuplicates:根据指定字段去重
    • 聚合
      1.聚合操作调用的是agg方法,该方法有多种调用方式。一般与groupBy方法配合使用。
    • union
      1.unionAll方法:对两个DataFrame进行组合
    • join
      1.笛卡尔积
      2.using一个字段形式
      3.using多个字段形式
      4.指定join类型
      5.使用Column类型来join
      6.在指定join字段同时指定join类型
    • 获取指定字段统计信息
      1.stat方法可以用于计算指定字段或指定字段之间的统计信息,比如方差,协方差等。这个方法返回一个DataFramesStatFunctions类型对象。
    • 获取两个DataFrame中共有的记录
      1.intersect方法可以计算出两个DataFrame中相同的记录,
    • 获取一个DataFrame中有另一个DataFrame中没有的记录
      1.使用 except
    • 操作字段名
      1.withColumnRenamed:重命名DataFrame中的指定字段名
        如果指定的字段名不存在,不进行任何操作
      2.withColumn:往当前DataFrame中新增一列
        whtiColumn(colName: String , col: Column)方法根据指定colName往DataFrame中新增一列,如果colName已存在,则会覆盖当前列。
    • 行转列
      1.有时候需要根据某个字段内容进行分割,然后生成多行,这时可以使用explode方法
    • 其他操作
      API中还有na, randomSplit, repartition, alias, as方法。

    相关文章

      网友评论

          本文标题:SparkSql之编程方式

          本文链接:https://www.haomeiwen.com/subject/gnuaultx.html