简介
Spark SQL 是 Spark 处理结构化数据的一个模块。Spark SQL 提供了查询结构化数据及计算结果等信息的接口。一个 Dataset 是一个分布式的数据集合 ,Dataset API 在Scala和Java是可用的,Python 不支持 Dataset API。一个 DataFrame 是一个 Dataset 组成的指定列.它的概念与一个在关系型数据库或者在 R/Python 中的表是相等的。
创建SparkSession
Spark SQL中所有功能的入口点是 SparkSession类.
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("name")
.config("key", "value")
.getOrCreate()
#支持hive
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", "path")
.enableHiveSupport()
.getOrCreate()
创建DataFrame
val df = spark.read.json("people.json")
val usersDF = spark.read.load("users.parquet")
val peopleDF = spark.read.format("json").load("people.json")
val parquetFileDF = spark.read.parquet("people.parquet")
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
#写数据
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
创建DataSet
case class Person(name: String, age: Long)
val caseClassDS = Seq(Person("Andy", 32)).toDS()
val peopleDS = spark.read.json(path).as[Person]#dataframe转dataset
RDD转DataFrame
import spark.implicits._
#第一种方式
val peopleDF = spark.sparkContext
.textFile("people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
#第二种方式
val rowRDD =spark.sparkContext
.textFile("people.txt")
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
val schema = StructType(Array(
StructField("name", StringType, nullable = true),
StructField("age", StringType, nullable = true)
))
val peopleDF = spark.createDataFrame(rowRDD, schema)
聚合
#第一种方式,继承UserDefinedAggregateFunction抽象类
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object MyAverage extends UserDefinedAggregateFunction {
// Data types of input arguments of this aggregate function
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
// Data types of values in the aggregation buffer
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// The data type of the returned value
def dataType: DataType = DoubleType
// Whether this function always returns the same output on the identical input
def deterministic: Boolean = true
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// Calculates the final result
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
spark.udf.register("myAverage", MyAverage)
#第二种方式,继承Aggregator抽象类
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
网友评论