SparkSQL(四)

作者: 八爪鱼下水 | 来源:发表于2021-04-04 13:21 被阅读0次

    什么是sparksql

    • SparkCore撰写代码非常复杂,引入SparkSQL处理结构化数据
    • SparkSQL基于Hive之上做了改进

    sparksql数据结构

    • SparkCore数据结构:RDD
    • SparkSQL数据结构:DataFrame和DataSet
    DataFrame(1.3之后)

    DataFrame Operators

    • 常用操作
    map, flatMap 
    sample, filter 
    sort 
    pipe 
    groupBy, groupByKey, cogroup 
    reduce, reduceByKey, fold 
    partitionBy ▫ zip, union 
    join, crossJoin, leftOuterJoin, rightOuterJoin 
    count, save 
    first, take
    
    • 基本操作
    1、 cache()同步数据的内存
    2、 columns 返回一个string类型的数组,返回值是所有列的名字
    3、 dtypes返回一个string类型的二维数组,返回值是所有列的名字以及类型
    4、 explan()打印执行计划  物理的
    5、 explain(n:Boolean) 输入值为 false 或者true ,返回值是unit  默认是false ,如果输入true 将会打印 逻辑的和物理的
    6、 isLocal 返回值是Boolean类型,如果允许模式是local返回true 否则返回false
    7、 persist(newlevel:StorageLevel) 返回一个dataframe.this.type 输入存储模型类型
    8、 printSchema() 打印出字段名称和类型 按照树状结构来打印
    9、 registerTempTable(tablename:String) 返回Unit ,将df的对象只放在一张表里面,这  个表随着对象的删除而删除了
    10、 schema 返回structType 类型,将字段名称和类型按照结构体类型返回
    11、 toDF()返回一个新的dataframe类型的
    12、 toDF(colnames:String*)将参数中的几个字段返回一个新的dataframe类型的,
    13、 unpersist() 返回dataframe.this.type 类型,去除模式中的数据
    14、 unpersist(blocking:Boolean)返回dataframe.this.type类型 true 和unpersist是一样的作用false 是去除RDD
    
    • 聚合查询类操作
    1、 agg(expers:column*) 返回dataframe类型 ,同数学计算求值
          df.agg(max("age"), avg("salary"))
          df.groupBy().agg(max("age"), avg("salary"))
    2、 agg(exprs: Map[String, String])  返回dataframe类型 ,同数学计算求值 map类型的
        df.agg(Map("age" -> "max", "salary" -> "avg"))
        df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
    3、 agg(aggExpr: (String, String), aggExprs: (String, String)*)  返回dataframe类型 ,同数学计算求值
        df.agg(Map("age" -> "max", "salary" -> "avg"))
        df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
    4、 apply(colName: String) 返回column类型,捕获输入进去列的对象
    5、 as(alias: String) 返回一个新的dataframe类型,就是原来的一个别名
    6、 col(colName: String)  返回column类型,捕获输入进去列的对象
    7、 cube(col1: String, cols: String*) 返回一个GroupedData类型,根据某些字段来汇总
    8、 distinct 去重 返回一个dataframe类型
    9、 drop(col: Column) 删除某列 返回dataframe类型
    10、 dropDuplicates(colNames: Array[String]) 删除相同的列 返回一个dataframe
    11、 except(other: DataFrame) 返回一个dataframe,返回在当前集合存在的在其他集合不存在的
    12、 explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]) 返回值是dataframe类型,这个 将一个字段进行更多行的拆分
        df.explode("name","names") {name :String=> name.split(" ")}.show();
        将name字段根据空格来拆分,拆分的字段放在names里面
    13、 filter(conditionExpr: String): 刷选部分数据,返回dataframe类型 df.filter("age>10").show();  df.filter(df("age")>10).show();   df.where(df("age")>10).show(); 都可以
    14、 groupBy(col1: String, cols: String*) 根据某写字段来汇总返回groupedate类型   df.groupBy("age").agg(Map("age" ->"count")).show();df.groupBy("age").avg().show();都可以
    15、 intersect(other: DataFrame) 返回一个dataframe,在2个dataframe都存在的元素
    16、 join(right: DataFrame, joinExprs: Column, joinType: String)
      一个是关联的dataframe,第二个关联的条件,第三个关联的类型:inner, outer, left_outer, right_outer, leftsemi
        df.join(ds,df("name")===ds("name") and  df("age")===ds("age"),"outer").show();
    17、 limit(n: Int) 返回dataframe类型  去n 条数据出来
    18、 na: DataFrameNaFunctions ,可以调用dataframenafunctions的功能区做过滤 df.na.drop().show(); 删除为空的行
    19、 orderBy(sortExprs: Column*) 做alise排序
    20、 select(cols:string*) dataframe 做字段的刷选 df.select($"colA", $"colB" + 1)
    21、 selectExpr(exprs: String*) 做字段的刷选 df.selectExpr("name","name as names","upper(name)","age+1").show();
    22、 sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 默认是asc
    23、 unionAll(other:Dataframe) 合并 df.unionAll(ds).show();
    24、 withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show();
    25、 withColumn(colName: String, col: Column) 增加一列 df.withColumn("aa",df("name")).show();
    
    • 行动操作
    1、 collect() ,返回值是一个数组,返回dataframe集合所有的行
    2、 collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的行
    3、 count() 返回一个number类型的,返回dataframe集合的行数
    4、 describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以传多个参数,中间用逗号分隔,如果有字段为空,那么不参与运算,只这对数值类型的字段。例如df.describe("age", "height").show()
    5、 first() 返回第一行 ,类型是row类型
    6、 head() 返回第一行 ,类型是row类型
    7、 head(n:Int)返回n行  ,类型是row 类型
    8、 show()返回dataframe集合的值 默认是20行,返回类型是unit
    9、 show(n:Int)返回n行,,返回值类型是unit
    10、 table(n:Int) 返回n行  ,类型是row 类型
    
    DataSet(1.6之后)
    如何创建DataSet
    #Ds 有 flatMap方法
     value.flatMap(_.split("\\s+"))
    
    • 通过一个已经存在scala集合去构建
      val ds1=spark.createDataset(List(1,2,3,4,5))
      val ds2=List(1,2,3,4,5).toDS

    • 通过一个已经存在的rdd去构建
      val ds=spark.createDataset(sc.textFile("/person.txt"))

    • dataFrame转换成dataSet
      val ds=df.as[强类型]

    • 通过dataSet中的方法生成一个新的dataSet

    Spark2.0中两者统一

    • DataSet包含了DataFrame的功能
    • DataFrame其实就是Dateset[Row]

    Dateset ==> DataFrame + 泛型
    Dateset ==> RDD + Schema + 方便的SQL操作 + 优化

    • Dateset是特殊的DataFrame
    • DataFrame是特殊的RDD
    • Dateset是一个分布式的表

    类型转化

    RDD - > DataFrame
    样例类

     val spark: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          .getOrCreate()
     val sparkContext: SparkContext = spark.sparkContext
     val value: RDD[String] = sparkContext.textFile("data/words.txt")
    #第一种方法 直接在泛型上加上自定义类型
    case class People(id: Int, name: String, age: Int)
    val fileRDD: RDD[String] = sc.textFile("data/baseinput/sql/people1.txt")
    #泛型上增加
    val peopleRDD: RDD[People] = fileRDD.map(_.split("\\s+")).map(x => People(x(0).toInt, x(1), x(2).toInt))
    #已经转化了
    peopleRDD.toDF()
    
    

    动态增加字段,可以使用strucedType的方式

    val fileRDD: RDD[String] = sc.textFile("data/baseinput/sql/people1.txt")
    #一个row对象就是一行数据
    val peopleRDD: RDD[Row] = fileRDD.map(_.split("\\s+")).map(x => Row(x(0).toInt, x(1), x(2).toInt))
    #需要引入structedFiled
        val schema: StructType = new StructType()
          .add("id", DataTypes.IntegerType, true)
          .add("name", "string", true)
          .add("age", "int", true)
    #spark 创建 createDataFrame 一个DataFrame 
     val peopleDF: DataFrame = spark.createDataFrame(peopleRDD, schema)
    
     #引入隐式反馈转化为df
        val peopleDF: DataFrame = peopleRDD.toDF("id","name","age")
    

    Shuffle分区数目

    • 运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。
        // 构建SparkSession实例对象
            val spark: SparkSession = SparkSession.builder()
                .master("local[4]")
                .appName(this.getClass.getSimpleName.stripSuffix("$"))
                // TODO: 设置shuffle时分区数目
                .config("spark.sql.shuffle.partitions", "4")
                .getOrCreate()
            // 导入隐式转换
            import spark.implicits._
    
    图片1.png

    Spark自定义函数

    1.UDF函数 : 一对一关系 常用
    2.UDAF聚合函数 : 多对一关系
    3.UDTF聚合函数 : 一对多关系
    1.UDF函数 : 一对一关系 常用
    
     //DSL方式register一个UDF函数
     spark.udf.register("wordToBigger",(line:String)=>{
          line.toUpperCase()
        })
     //SQL方式使用 自定义函数
    wordsDF.createOrReplaceTempView("word_view")
      val result1: DataFrame = spark.sql("select line,wordToBigger(line) as bigger from   
      word_view")
      result1.show()
     //DSL方式使用自定义函数
    val result2: DataFrame = wordsDF.select('line,
          callUDF("wordToBigger", 'line).as("bigger"))
    result2.show()
    
    

    SparkSql执行解析成RDD

    1)解析逻辑计划
    2)分析逻辑计划
    3)优化逻辑计划
    4)优化物理计划生成RDD

    • Catalyst 优化器引擎
      SparkSQL 和 RDD 主要就是有Schema

    简单来说:
    Catalyst 是sparkSQL的核心优化器,替代了hive的优化器.

    • Spark 会通过一些 API 接受 SQL 语句
    • Catalyst 负责解析 SQL, 生成执行计划等,Catalyst 输出RDD的执行计划.交给集群执行.

    具体流程:
    Step 1 : 解析 SQL, 并且生成 AST (抽象语法树)

    Step 2 : 在 AST 中加入元数据信息, 做这一步主要是为了一些优化

    Step 3 : 对已经加入元数据的 AST, 输入优化器, 进行优化, 从两种常见的优化开始, 例如: 谓词下推, 列值裁剪,还有其他二百多种优化.

    Step 4 : 上面的过程生成的 AST 其实最终还没办法直接运行, 这个 AST 叫做逻辑计划, 结束后, 需要生成 物理计划, 从而生成 RDD 来运行

    • 可以使用 queryExecution 方法查看逻辑执行计划, 使用 explain 方法查看物理执行计划

    Catalyst 的主要运作原理是分为三步, 先对 SQL 或者 Dataset 的代码解析,
    生成逻辑计划,
    后对逻辑计划进行优化, 再生成物理计划, 最后生成代码到集群中以 RDD 的形式运行

    RBO是基于规则,得到多个可选物理计划.Rule Based Optimizer
    CBO是基于多个物理计划选择最优物理计划,Cost Based Optimizer
    最后通过CodeGeneration代码生成器转化为RDD


    图片1.png
    图片2.png
    图片3.png

    相关文章

      网友评论

        本文标题:SparkSQL(四)

        本文链接:https://www.haomeiwen.com/subject/fyqlkltx.html