转载请注明出处,谢谢合作~
该篇中的示例暂时只有 Scala 版本~
上手 Spark SQL
- 入口:SparkSession(Starting Point: SparkSession)
- 创建 DataFrame(Creating DataFrames)
- 类型无关的 DataFrame 算子(Untyped Dataset Operations (aka DataFrame Operations))
- 编程中使用 SQL 查询(Running SQL Queries Programmatically)
- 全局临时视图(Global Temporary View)
- 创建 Dataset(Creating Datasets)
- 与 RDD 交互(Interoperating with RDDs)
- 通过反射推断表结构(Inferring the Schema Using Reflection)
- 编程指定表结构(Programmatically Specifying the Schema)
- 标量函数(Scalar Functions)
- 聚合函数(Aggregate Functions)
入口:SparkSession
Spark 应用程序的编程入口是 SparkSession
类,可以通过 SparkSession.builder()
创建一个基础的 SparkSession
:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。
Spark 2.0 中的 SparkSession
内置了对 Hive 的支持,包括使用 HiveSQL 编写查询语句,使用 Hive UDF,以及从 Hive 表中读取数据。这些功能需要首先安装好 Hive。
创建 DataFrame
应用程序可以使用 SparkSession
通过一个现有的 RDD(existing RDD
),通过 Hive 表,或者通过 Spark 数据源(Spark data sources)创建 DataFrame。
下面的示例通过一个 JSON 文件创建 DataFrame:
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。
类型无关的 DataFrame 算子
DataFrame 针对操作结构化的数据提供了特定的算子(Scala, Java, Python 和 R)。
上文提到,对于 Spark 2.0 中的 Scala 和 Java API, DataFrame 只是 Row
类型的 Dataset。相较于强类型相关的 Dataset,这些算子是类型无关的。
这里我们展示一些使用 Dataset 进行 结构化数据处理的基础样例:
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。
此类操作 Dataset 的算子的完整列表详见 API Documentation。
除了简单的列引用和表达式,Dataset 还有一个强大的函数库,包括操作字符串,日期计算,常见的数据计算等等。完整的函数列表参见 DataFrame Function Reference。
编程中使用 SQL 查询
SparkSession
的 sql
方法可以让应用程序通过编程使用 SQL 查询,返回值一个 DataFrame
。
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。
全局临时视图
Spark SQL 中的临时视图在当前 SparkSession
存在范围内有效,一旦 SparkSession
结束,临时视图就消失了。如果需要在不同的应用程序之间共享临时视图,即使 SparkSession
结束依旧存在,可以使用全局临时视图。全局临时视图与一个系统保留数据库 global_temp
绑定,必须使用全限定名称来使用,比如 SELECT * FROM global_temp.view1
。
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。
创建 Dataset
Dataset 跟 RDD 类似,但是不像 RDD 那样使用 Java 或者 Kryo 序列化器,在计算以及网络传输过程中 Dataset 使用一个特定的 Encoder 来序列化对象。尽管 encoder 和标准的序列化器都是用来将一个对象转换为字节,encoder 采用的是动态代码生成的,并且采用了一种特殊的格式,Spark 可以对这种格式进行像过滤、排序和哈希运行而不用将其反序列化为对象。
case class Person(name: String, age: Long)
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。
与 RDD 交互
Spark SQL 支持两种不同的方式把一个现有的 RDD 转换为 Dataset。第一种方式是通过反射推断一个定义了类型的 RDD 的表结构。这种基于反射的方式可以使代码简洁,在已知表结构的场景下工作良好。
第二种方式是通过编程的方式构建一个表结构对象,并把它赋予一个现有的 RDD。尽管这种方式相对复杂,但是能够在无法得知运行时类型的情况下创建 Dataset。
通过反射推断表结构
Spark SQL 的 Scala 接口自动支持将一个样本类类型的 RDD 转换为一个 DataFrame。样本类定义了表结构,通过反射获取类中的字段名称并将其应用为列名。样本类可以嵌套,还可以包含像 Seq
和 Array
这样的复杂类型。该 RDD 会被隐式转换为一个 DataFrame,之后可以注册成一张表,该表可以通过 SQL 进行查询。
// For implicit conversions from RDDs to DataFrames
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」
编程指定表结构
如果样本类无法被实现创建(例如,一行数据以字符串的格式编码,或者是需要被解析的文本类型的数据集,以及对于不同用户来说需要抽取的字段不同),可以分三步编程创建一个 DataFrame
。
- 通过一个 RDD 创建一个
Row
类型的 RDD; - 创建一个
StructType
类型的表结构对象,需要与第 1 步Row
中的数据相对应; - 通过
SparkSession
提供的createDataFrame
方法将第 1 步生成的 RDD 和第 2 步生成的表结构结合起来。
例如:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。
标量函数
标量函数通过一行数据只返回一个单值,而不是像聚合函数那样接收多行数据返回一个单值。Spark SQL 支持许多内置标量函数(Built-in Scalar Functions),同事也支持自定义标量函数(User Defined Scalar Functions)。
聚合函数
聚合函数接收多行数据返回一个单值。内置的聚合函数(Built-in Aggregation Functions)提供了常见的聚合函数,比如 count()
, countDistinct()
, avg()
, max()
, min()
等等。用户不用受预定义聚合函数的限制,可以定义自己的聚合函数,详情参见 User Defined Aggregate Functions。
网友评论