Spark SQL概述

作者: 盗梦者_56f2 | 来源:发表于2018-08-06 17:15 被阅读42次

    简介

    Spark SQL 是 Spark 处理结构化数据的一个模块。Spark SQL 提供了查询结构化数据及计算结果等信息的接口。一个 Dataset 是一个分布式的数据集合 ,Dataset API 在Scala和Java是可用的,Python 不支持 Dataset API。一个 DataFrame 是一个 Dataset 组成的指定列.它的概念与一个在关系型数据库或者在 R/Python 中的表是相等的。

    创建SparkSession

    Spark SQL中所有功能的入口点是 SparkSession类.

    import org.apache.spark.sql.SparkSession
    val spark = SparkSession
      .builder()
      .appName("name")
      .config("key", "value")
      .getOrCreate()
    #支持hive
    val spark = SparkSession
      .builder()
      .appName("Spark Hive Example")
      .config("spark.sql.warehouse.dir", "path")
      .enableHiveSupport()
      .getOrCreate()
    

    创建DataFrame

    val df = spark.read.json("people.json")
    val usersDF = spark.read.load("users.parquet")
    val peopleDF = spark.read.format("json").load("people.json")
    val parquetFileDF = spark.read.parquet("people.parquet")
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .load()
    #写数据
    jdbcDF.write
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .save()
    

    创建DataSet

    case class Person(name: String, age: Long)
    val caseClassDS = Seq(Person("Andy", 32)).toDS()
    val peopleDS = spark.read.json(path).as[Person]#dataframe转dataset
    

    RDD转DataFrame

    import spark.implicits._
    #第一种方式
    val peopleDF = spark.sparkContext
      .textFile("people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    #第二种方式
    val rowRDD =spark.sparkContext
      .textFile("people.txt")
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim))
    val schema = StructType(Array(
      StructField("name", StringType, nullable = true),
      StructField("age", StringType, nullable = true)
    ))
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    

    聚合

    #第一种方式,继承UserDefinedAggregateFunction抽象类
    import org.apache.spark.sql.expressions.MutableAggregationBuffer
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SparkSession
    
    object MyAverage extends UserDefinedAggregateFunction {
      // Data types of input arguments of this aggregate function
      def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
      // Data types of values in the aggregation buffer
      def bufferSchema: StructType = {
        StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
      }
      // The data type of the returned value
      def dataType: DataType = DoubleType
      // Whether this function always returns the same output on the identical input
      def deterministic: Boolean = true
      // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
      // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
      // the opportunity to update its values. Note that arrays and maps inside the buffer are still
      // immutable.
      def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0L
        buffer(1) = 0L
      }
      // Updates the given aggregation buffer `buffer` with new input data from `input`
      def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        if (!input.isNullAt(0)) {
          buffer(0) = buffer.getLong(0) + input.getLong(0)
          buffer(1) = buffer.getLong(1) + 1
        }
      }
      // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
      def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
        buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
      }
      // Calculates the final result
      def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
    }
    spark.udf.register("myAverage", MyAverage)
    #第二种方式,继承Aggregator抽象类
    import org.apache.spark.sql.expressions.Aggregator
    import org.apache.spark.sql.Encoder
    import org.apache.spark.sql.Encoders
    import org.apache.spark.sql.SparkSession
    
    case class Employee(name: String, salary: Long)
    case class Average(var sum: Long, var count: Long)
    
    object MyAverage extends Aggregator[Employee, Average, Double] {
      // A zero value for this aggregation. Should satisfy the property that any b + zero = b
      def zero: Average = Average(0L, 0L)
      // Combine two values to produce a new value. For performance, the function may modify `buffer`
      // and return it instead of constructing a new object
      def reduce(buffer: Average, employee: Employee): Average = {
        buffer.sum += employee.salary
        buffer.count += 1
        buffer
      }
      // Merge two intermediate values
      def merge(b1: Average, b2: Average): Average = {
        b1.sum += b2.sum
        b1.count += b2.count
        b1
      }
      // Transform the output of the reduction
      def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
      // Specifies the Encoder for the intermediate value type
      def bufferEncoder: Encoder[Average] = Encoders.product
      // Specifies the Encoder for the final output value type
      def outputEncoder: Encoder[Double] = Encoders.scalaDouble
    }
    

    相关文章

      网友评论

        本文标题:Spark SQL概述

        本文链接:https://www.haomeiwen.com/subject/lqrgvftx.html