美文网首页我爱编程
Spark SQL(二)DataFrame和DataSet

Spark SQL(二)DataFrame和DataSet

作者: Sx_Ren | 来源:发表于2018-03-18 09:31 被阅读0次
  • DataSet:
    A Dataset is a distributed collection of data:分布式的数据集(since Spark 1.6)
  • DataFrame:
    A DataFrame is a Dataset organized into named columns:以列(列名、列的类型、列值)的形式构成的分布式数据集,按照列赋予不同的名称,It is conceptually equivalent to a table in a relational database or a data frame in R/Python(概念上等于关系数据库中的表)

DataFrame和DataSet的关系为:DataFrame = Dataset[Row]

DataFrame它不是Spark SQL提出的,而是早起在R、Pandas语言就已经有了的。

怎样得到一个DataFrame呢,Spark 1.x.时使用SQLContext作为entry point:

 val sqlContext = new SQLContext(sc)
 val people = sqlContext.read.format("json").load(path) //peopel就是一个DataFrame

从Spark 2.0开始,使用SparkSession代替了SQLContext作为entry point:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

val df = spark.read.json("examples/src/main/resources/people.json") //df就是一个DataFrame
df.show()

DataFrame常用操作包括:

df.printSchema() // Print the schema in a tree format
df.select("name").show() // Select only the "name" column
df.select($"name", $"age" + 1).show() // Select everybody, but increment the age by 1
df.filter($"age" > 21).show() // Select people older than 21
df.groupBy("age").count().show() // Count people by age

还可以把DataFrame转换位临时表,达到使用sql语句操作文件的目的:

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

如上Temporary views仅是session-scoped的,session销毁了临时表就不存在了,想要创建可以在多个session中共享的表,以达到当前Spark application停掉时内部创建的临时表仍然有效的目的,可以创建全局临时表:

//Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

怎样得到一个DataSet呢,如下:

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

DataFrame和RDD互操作,有两种方式:

  1. Inferring the Schema Using Reflection:即反射,case class 前提:事先需要知道你的字段、字段类型
  2. Programmatically Specifying the Schema:编程,Row 这种代码比较繁琐,如果第一种情况不能满足你的要求(事先不知道列)
    选型:优先考虑第一种
  • Inferring the Schema Using Reflection
case class Person(name: String, age: Long)

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
  • Programmatically Specifying the Schema
    这种方式代码繁琐一些,有三部曲:
    1. Create an RDD of Rows from the original RDD;
    2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1
    3. Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

除了上边官网给的例子,再举一个:

val rdd = spark.sparkContext.textFile("E:/ATempFile/info.txt")
val infoRDD = rdd.map(_.split(",")).map(line=>Row(line(0).toInt,line(1),line(2).toInt))
val structType = StructType(Array(StructField("id",IntegerType,true),
        StructField("name",StringType,true),
        StructField("age",IntegerType,true)))
        
val infoDF = spark.createDataFrame(infoRDD,structType)

很明显,第一种方式代码更加简洁、方便。

相关文章

网友评论

    本文标题:Spark SQL(二)DataFrame和DataSet

    本文链接:https://www.haomeiwen.com/subject/wgjrfftx.html