美文网首页
2020-12-13-Spark-21(Spark-SQL)

2020-12-13-Spark-21(Spark-SQL)

作者: 冰菓_ | 来源:发表于2020-12-13 08:12 被阅读0次

    SQL的结构
    udf udaf的简单使用

    1.结构的学习

    image.png
    image.png

    问题:RDD DataFrame DataSet之间的关系

    转换操作:

    object Test1 {
      def main(args: Array[String]): Unit = {
        //Todo 环境
        val conf = new SparkConf().setAppName("SQL1").setMaster("local[*]")
        val session = SparkSession.builder().config(conf).getOrCreate()
        import session.implicits._
        //todo DataFrame执行
        //1.DataFrame => sql
        val dsql = session.read.json("src/main/resources/1.json")
        //dsql.show()
        // dsql.createOrReplaceTempView("user")
        //session.sql("select * from user").show()
        //2.DataFrame => DSL
        // dsql.select("name").show()
        //dsql.select($"name").show()
        // dsql.select('age + 1,'age).show()
        //TODO DataSet执行
        //val seq = Seq(1, 2, 3, 4, 5)
        //val ds = seq.toDS()
        //ds.show()
        //TODO RDD => DataFrame
        val rdd = session.sparkContext.makeRDD(List(("小明", 12, 2000), ("小华", 14, 9000)))
        val frame: DataFrame = rdd.toDF("name", "age", "money")
        //val rdd1: RDD[Row] = frame.rdd
        //frame.show()
        //rdd1.foreach(println)
        //TODO DataFrame => DataSet
        // val value: Dataset[User] = frame.as[User]
        // value.show()
        // val frame1: DataFrame = value.toDF()
        //TODO  RDD => DataSet
        val value1: Dataset[User] = rdd.map(data => data match {
          case (x, y, z) => User(x, y, z)
        }).toDS()
        // value1.show()
        // val rdd2: RDD[User] = value.rdd
        session.stop()
      }
    }
    
    case class User(name: String, age: Int, money: Double)
    

    2.UDF函数

    //简单的UDF函数
    object Test2 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
        val spark = SparkSession.builder().config(conf).getOrCreate()
        val framejson = spark.read.json("src/main/resources/1.json")
        framejson.createOrReplaceTempView("user")
        spark.udf.register("udfname",(name:String) => {
             "Name:" + name
        })
        spark.sql("select udfname(name) , age from user").show()
        spark.stop()
      }
    }
    

    3.UDAF

    弱类型实现

    object Spark_Udaf1 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
        val spark = SparkSession.builder().config(conf).getOrCreate()
        val frame = spark.read.json("src/main/resources/1.json")
        frame.createOrReplaceTempView("user")
        spark.udf.register("avgAge", new MyAvg)
        val sql: String = "select avgAge(age) from user"
        spark.sql(sql).show()
        Thread.sleep(Int.MaxValue)
        spark.stop()
      }
    }
    
    //统计年龄的总和,统计数据的条数
    class MyAvg extends UserDefinedAggregateFunction {
      //输入数据的类型
      override def inputSchema: StructType = {
        StructType(Array(StructField("age", LongType)))
      }
    
      //缓冲区的类型
      override def bufferSchema: StructType = {
        //第一个值:年龄的总和  第二个值:条数的总和
        StructType(Array(StructField("total", LongType), StructField("count", LongType)))
      }
    
      //输出的数据的类型
      override def dataType: DataType = LongType
    
      //函数是否稳定
      override def deterministic: Boolean = true
    
      //缓冲区初始数据
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0L
        buffer(1) = 0L
      }
    
      //更新缓冲区
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        buffer(0) = buffer.getLong(0) + input.getLong(0)
    
        buffer.update(1, buffer.getLong(1) + 1)
        //上一个相当于下一个,底层调用了update方法
        println(buffer.getLong(1))
      }
    
      //缓冲区的合并,存在多个缓存区
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        //todo (x,y) =>第一个值是初始值或者中间值
        buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
        buffer1.update(1,buffer1.getLong(1) + buffer2.getLong(1))
      }
    
      //输出数据的类型
      override def evaluate(buffer: Row): Any = {
        println(buffer.getLong(0))
        println(buffer.getLong(1))
        buffer.getLong(0) / buffer.getLong(1)
      }
    }
    

    强类型实现

    object Spark_Udaf2 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
        val spark = SparkSession.builder().config(conf).getOrCreate()
        val frame = spark.read.json("src/main/resources/1.json")
        frame.createOrReplaceTempView("user")
        spark.udf.register("myavg", functions.udaf(new MyAvg1))
        spark.sql("select myavg(age) from user").show()
        spark.stop()
      }
    }
    case class Buff(var total: Long, var count: Long)
    
    class MyAvg1 extends Aggregator[Long, Buff, Long] {
      override def zero: Buff = {
        Buff(0L, 0L)
      }
      //根据输入的数据更新缓冲区数据
      override def reduce(buff: Buff, input: Long): Buff = {
        buff.total = buff.total + input
        buff.count = buff.count + 1
        buff
      }
      //合并多个缓冲区数据
      override def merge(buff1: Buff, buff2: Buff): Buff = {
        buff1.total = buff1.total + buff2.total
        buff1.count = buff1.count + buff2.count
        buff1
      }
      //输出的结果
      override def finish(reduction: Buff): Long = {
        reduction.total / reduction.count
      }
      //下面的两个是序列化
      override def bufferEncoder: Encoder[Buff] = Encoders.product
    
      override def outputEncoder: Encoder[Long] = Encoders.scalaLong
    }
    

    相关文章

      网友评论

          本文标题:2020-12-13-Spark-21(Spark-SQL)

          本文链接:https://www.haomeiwen.com/subject/xwstgktx.html