美文网首页大数据大数据 爬虫Python AI Sql玩转大数据
慕课网Spark SQL日志分析 - 5.DateFrame&a

慕课网Spark SQL日志分析 - 5.DateFrame&a

作者: 9c0ddf06559c | 来源:发表于2018-07-12 22:32 被阅读35次

    5.DateFrame&Dataset

    1.DateFrame产生背景

    DataFrame 不是Spark Sql提出的。而是在早起的Python、R、Pandas语言中就早就有了的。

    Spark诞生之初一个目标就是给大数据生态圈提供一个基于通用语言的,简单易用的API。

    1.如果想使用SparkRDD进行编程,必须先学习Java,Scala,Python,成本较高
    2.R语言等的DataFrame只支持单机的处理,随着Spark的不断壮大,需要拥有更广泛的受众群体利用Spark进行分布式的处理。

    2.DataFrame概述

    A Dataset is a distributed collection of data. - 分布式的数据集
    A DataFrame is a Dataset organized into named columns.(RDD with Schema) - 以列(列名、列的类型、列值)的形式构成的分布式数据集,依据列赋予不同的名称

    It is conceptually equivalent to a table in a relational database or a data frame in R/Python.but with richer optimizations under the hood.

    image.png

    3.DataFrame和RDD的对比

    RDD:分布式的可以进行并行处理的集合
    java/scala ==> JVM
    python ==> python runtime

    DataFrame:也是一个分布式的数据集,他更像一个传统的数据库的表,他除了数据之外,还能知道列名,列的值,列的属性。他还能支持一下复杂的数据结构。
    java/scala/python ==> logic plan

    从易用的角度来看,DataFrame的学习成本更低。由于R语言,Python都有DataFrame,所以开发起来很方便

    image.png

    4.DataFrame基本API操作

    image.png

    看下load方法的源码

    /**
    * Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by
    * a local or distributed file system).
    *
    * @since 1.4.0
    */
    // 返回的就是一个DataFrame
    def load(path: String): DataFrame = {
    option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)`
    }
    
    package com.gwf.spark
    
    import org.apache.spark.sql.SparkSession
    
    object DataFrameApp {
    
    def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder()
    .appName("DataFrameApp").master("local[2]").getOrCreate()
    
    // 将json文件加载成一个dataframe
    val peopleDF = spark.read.format("json").load("file:///Users/gaowenfeng/software/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
    
    // 输出dataframe对应的schema信息
    peopleDF.printSchema()
    // root
    // |-- age: long (nullable = true)
    // |-- name: string (nullable = true)
    
    // 输出数据集的前20条记录
    peopleDF.show()
    // +----+-------+
    // | age| name|
    // +----+-------+
    // |null|Michael|
    // | 30| Andy|
    // | 19| Justin|
    // +----+-------+
    
    // 查询某列的所有数据 select name from table
    peopleDF.select("name").show()
    // +-------+
    // | name|
    // +-------+
    // |Michael|
    // | Andy|
    // | Justin|
    // +-------+
    
    // 查询某几列所有的数据,并对列进行计算 select name, age+10 as age2 from table
    peopleDF.select(peopleDF.col("name"),(peopleDF.col("age")+10).as("age2")).show()
    // +-------+----+
    // | name|age2|
    // +-------+----+
    // |Michael|null|
    // | Andy| 40|
    // | Justin| 29|
    // +-------+----+
    
    // 根据每一列的值进行过滤 select * from table where age > 19
    peopleDF.filter(peopleDF.col("age")>19).show()
    // +---+----+
    // |age|name|
    // +---+----+
    // | 30|Andy|
    // +---+----+
    
    // 根据每一列的值进行分组,然后聚合 select age,count(1) from table group by age
    peopleDF.groupBy("age").count().show()
    // +----+-----+
    // | age|count|
    // +----+-----+
    // | 19| 1|
    // |null| 1|
    // | 30| 1|
    // +----+-----+
    
    spark.stop()
    }
    
    }
    
    

    5.DataFrame与RDD交互操作方式

    image.png

    1.通过反射的方式

    前提:实现需要你知道你的字段,类型

    package com.gwf.spark
    
    import org.apache.spark.sql.SparkSession
    
    /**
    * DataFrameRDD的互操作
    */
    object DataFrameRDDAPP {
    
    def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("DataFrameRDDAPP").master("local[2]").getOrCreate()
    
    val rdd = spark.sparkContext.textFile("file:///Users/gaowenfeng/project/idea/MySparkSqlProject/src/main/resources/infos.txt")
    
    // 需要导入隐式转换
    import spark.implicits._
    val infoDf = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()
    
    infoDf.printSchema()
    
    infoDf.filter(infoDf.col("age") > 30).show()
    
    // Creates a local temporary view using the given name. The lifetime of this
    // temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
    infoDf.createOrReplaceTempView("infos")
    
    spark.sql("select * from infos where age > 30").show()
    }
    
    case class Info(id: Int, name: String, age: Int)
    
    }
    
    

    2.编程方式

    如果第一种不能满足你的要求(事先不知道)

    val rdd = spark.sparkContext.textFile("file:///Users/gaowenfeng/project/idea/MySparkSqlProject/src/main/resources/infos.txt")
    
    // 1.Create an RDD of Rows from the original RDD;
    val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))
    
    // 2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
    val structType = StructType(Array(
    StructField("id",IntegerType, true),
    StructField("name",StringType, true),
    StructField("age",IntegerType, true)))
    
    // 3.Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
    val infoDF = spark.createDataFrame(infoRDD, structType)
    
    infoDF.printSchema()
    

    3.选型,优先考虑第一种

    6.DataSet 概述与使用

    A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

    A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

    image.png
    SQL:
    seletf name from table compile ok result no
    DF:
    df.seletc("name") compile no
    df.select("naem") compile ok result no
    DS:
    ds.select("naem") compile no
    

    DataFrame = DataSet[Row]
    DataSet 强类型 typed case class
    DataFrame 弱类型

    相关文章

      网友评论

        本文标题:慕课网Spark SQL日志分析 - 5.DateFrame&a

        本文链接:https://www.haomeiwen.com/subject/ghrdpftx.html