美文网首页我爱编程
Spark SQL(二)DataFrame和DataSet

Spark SQL(二)DataFrame和DataSet

作者: Sx_Ren | 来源:发表于2018-03-18 09:31 被阅读0次
    • DataSet:
      A Dataset is a distributed collection of data:分布式的数据集(since Spark 1.6)
    • DataFrame:
      A DataFrame is a Dataset organized into named columns:以列(列名、列的类型、列值)的形式构成的分布式数据集,按照列赋予不同的名称,It is conceptually equivalent to a table in a relational database or a data frame in R/Python(概念上等于关系数据库中的表)

    DataFrame和DataSet的关系为:DataFrame = Dataset[Row]

    DataFrame它不是Spark SQL提出的,而是早起在R、Pandas语言就已经有了的。

    怎样得到一个DataFrame呢,Spark 1.x.时使用SQLContext作为entry point:

     val sqlContext = new SQLContext(sc)
     val people = sqlContext.read.format("json").load(path) //peopel就是一个DataFrame
    

    从Spark 2.0开始,使用SparkSession代替了SQLContext作为entry point:

    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    
    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._
    
    val df = spark.read.json("examples/src/main/resources/people.json") //df就是一个DataFrame
    df.show()
    

    DataFrame常用操作包括:

    df.printSchema() // Print the schema in a tree format
    df.select("name").show() // Select only the "name" column
    df.select($"name", $"age" + 1).show() // Select everybody, but increment the age by 1
    df.filter($"age" > 21).show() // Select people older than 21
    df.groupBy("age").count().show() // Count people by age
    

    还可以把DataFrame转换位临时表,达到使用sql语句操作文件的目的:

    // Register the DataFrame as a SQL temporary view
    df.createOrReplaceTempView("people")
    val sqlDF = spark.sql("SELECT * FROM people")
    sqlDF.show()
    

    如上Temporary views仅是session-scoped的,session销毁了临时表就不存在了,想要创建可以在多个session中共享的表,以达到当前Spark application停掉时内部创建的临时表仍然有效的目的,可以创建全局临时表:

    //Register the DataFrame as a global temporary view
    df.createGlobalTempView("people")
    // Global temporary view is cross-session
    spark.newSession().sql("SELECT * FROM global_temp.people").show()
    

    怎样得到一个DataSet呢,如下:

    // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
    // you can use custom classes that implement the Product interface
    case class Person(name: String, age: Long)
    
    // Encoders are created for case classes
    val caseClassDS = Seq(Person("Andy", 32)).toDS()
    caseClassDS.show()
    
    // Encoders for most common types are automatically provided by importing spark.implicits._
    val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
    
    // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
    val path = "examples/src/main/resources/people.json"
    val peopleDS = spark.read.json(path).as[Person]
    peopleDS.show()
    

    DataFrame和RDD互操作,有两种方式:

    1. Inferring the Schema Using Reflection:即反射,case class 前提:事先需要知道你的字段、字段类型
    2. Programmatically Specifying the Schema:编程,Row 这种代码比较繁琐,如果第一种情况不能满足你的要求(事先不知道列)
      选型:优先考虑第一种
    • Inferring the Schema Using Reflection
    case class Person(name: String, age: Long)
    
    // For implicit conversions from RDDs to DataFrames
    import spark.implicits._
    
    // Create an RDD of Person objects from a text file, convert it to a Dataframe
    val peopleDF = spark.sparkContext
      .textFile("examples/src/main/resources/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    // Register the DataFrame as a temporary view
    peopleDF.createOrReplaceTempView("people")
    
    • Programmatically Specifying the Schema
      这种方式代码繁琐一些,有三部曲:
      1. Create an RDD of Rows from the original RDD;
      2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1
      3. Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
    import org.apache.spark.sql.types._
    // Create an RDD
    val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
    // The schema is encoded in a string
    val schemaString = "name age"
    // Generate the schema based on the string of schema
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    // Convert records of the RDD (people) to Rows
    val rowRDD = peopleRDD
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim))
    // Apply the schema to the RDD
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    

    除了上边官网给的例子,再举一个:

    val rdd = spark.sparkContext.textFile("E:/ATempFile/info.txt")
    val infoRDD = rdd.map(_.split(",")).map(line=>Row(line(0).toInt,line(1),line(2).toInt))
    val structType = StructType(Array(StructField("id",IntegerType,true),
            StructField("name",StringType,true),
            StructField("age",IntegerType,true)))
            
    val infoDF = spark.createDataFrame(infoRDD,structType)
    

    很明显,第一种方式代码更加简洁、方便。

    相关文章

      网友评论

        本文标题:Spark SQL(二)DataFrame和DataSet

        本文链接:https://www.haomeiwen.com/subject/wgjrfftx.html