美文网首页
185、Spark 2.0之SparkSession、Dataf

185、Spark 2.0之SparkSession、Dataf

作者: ZFH__ZJ | 来源:发表于2019-02-11 13:51 被阅读0次

    Spark SQL介绍

    Spark SQL是Spark的一个模块,主要用于处理结构化的数据。与基础的Spark RDD API不同的是,Spark SQL的接口会向提供更多的信息,包括数据结构以及要执行的计算操作等。在Spark SQL内部,会使用这些信息执行一些额外的优化。使用Spark SQL有两种方式,包括SQL语句以及Dataset API。但是在计算的时候,无论你是用哪种接口去进行计算,它们使用的底层执行引擎是完全一模一样的。这种底层执行机制的统一,就意味着我们可以在不同的方式之间任意来回切换,只要我们可以灵活地运用不同的方式来最自然地表达我们要执行的计算操作就可以了。

    Spark SQL之SQL介绍

    Spark SQL的一个主要的功能就是执行SQL查询语句。Spark 2.0开始,最大的一个改变,就是支持了SQL 2003标准语法,还有就是支持子查询。Spark SQL也可以用来从Hive中查询数据。当我们使用某种编程语言开发的Spark作业来执行SQL时,返回的结果是Dataframe/Dataset类型的。当然,我们也可以通过Spark SQL的shell命令行工具,或者是JDBC/ODBC接口来访问。

    Spark SQL之Dataframe/Dataset介绍

    Dataset是一个分布式的数据集。Dataset是Spark 1.6开始新引入的一个接口,它结合了RDD API的很多优点(包括强类型,支持lambda表达式等),以及Spark SQL的优点(优化后的执行引擎)。Dataset可以通过JVM对象来构造,然后通过transformation类算子(map,flatMap,filter等)来进行操作。Scala和Java的API中支持Dataset,但是Python不支持Dataset API。不过因为Python语言本身的天然动态特性,Dataset API的不少feature本身就已经具备了(比如可以通过row.columnName来直接获取某一行的某个字段)。R语言的情况跟Python也很类似。

    Dataframe就是按列组织的Dataset。在逻辑概念上,可以大概认为Dataframe等同于关系型数据库中的表,或者是Python/R语言中的data frame,但是在底层做了大量的优化。Dataframe可以通过很多方式来构造:比如结构化的数据文件,Hive表,数据库,已有的RDD。Scala,Java,Python,R等语言都支持Dataframe。在Scala API中,Dataframe就是Dataset[Row]的类型别名。在Java中,需要使用Dataset<Row>来代表一个Dataframe。

    新的入口,SparkSession

    从Spark 2.0开始,一个最大的改变就是,Spark SQL的统一入口就是SparkSession,SQLContext和HiveContext未来会被淘汰。可以通过SparkSession.builder()来创建一个SparkSession,如下代码所示。SparkSession内置就支持Hive,包括使用HiveQL语句查询Hive中的数据,使用Hive的UDF函数,以及从Hive表中读取数据等。

    val spark = SparkSession
      .builder()
      .appName("Spark SQL Example")
      .master("local") 
      .config("spark.sql.warehouse.dir", "C:\\Users\\Administrator\\Desktop\\spark-warehouse")  
      .getOrCreate()
    
    import spark.implicits._
    

    Dataframe的untyped操作

    有了SparkSession之后,就可以通过已有的RDD,Hive表,或者其他数据源来创建Dataframe,比如说通过json文件来创建。Dataframe提供了一种domain-specific language来进行结构化数据的操作,这种操作也被称之为untyped操作,与之相反的是基于强类型的typed操作。

    val df = spark.read.json("people.json")
    df.show()
    df.printSchema()
    df.select("name").show()
    df.select($"name", $"age" + 1).show()
    df.filter($"age" > 21).show()
    df.groupBy("age").count().show()
    

    SparkSession:运行SQL查询

    SparkSession的sql()函数允许我们执行SQL语句,得到的结果是一个Dataframe。
    df.createOrReplaceTempView("people")
    val sqlDF = spark.sql("SELECT * FROM people")
    sqlDF.show()

    Dataset的typed操作

    Dataset与RDD比较类似,但是非常重要的一点不同是,RDD的序列化机制是基于Java序列化机制或者是Kryo的,而Dataset的序列化机制基于一种特殊的Encoder,来将对象进行高效序列化,以进行高性能处理或者是通过网络进行传输。Dataset除了Encoder,也同时支持Java序列化机制,但是encoder的特点在于动态的代码生成,同时提供一种特殊的数据格式,来让spark不将对象进行反序列化,即可直接基于二进制数据执行一些常见的操作,比如filter、sort、hash等。

    case class Person(name: String, age: Long)
    val caseClassDS = Seq(Person("Andy", 32)).toDS()
    caseClassDS.show()
    
    val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS.map(_ + 1).collect()
    
    val path = "people.json"
    val peopleDS = spark.read.json(path).as[Person]
    peopleDS.show()
    

    Hive操作

    在Spark 2.0中,是支持读写hive中存储的数据的。但是,因为hive有较多的依赖,所以默认情况下,这些依赖没有包含在spark的发布包中。如果hive依赖可以在classpath路径中,那么spark会自动加载这些依赖。这些hive依赖必须在所有的worker node上都放一份,因为worker node上运行的作业都需要使用hive依赖的序列化与反序列化包来访问hive中的数据。

    只要将hive-site.xml、hdfs-site.xml和core-site.xml都放入spark/conf目录下即可。

    如果要操作Hive,那么构建SparkSession的时候,就必须启用Hive支持,包括连接到hive的元数据库,支持使用hive序列化与反序列化包,以及支持hive udf函数。如果我们没有安装hive,也是可以启用hive支持的。如果我们没有放置hive-site.xml到spark/conf目录下,SparkSession就会自动在当前目录创建元数据库,同时创建一个spark.sql.warehouse.dir参数设置的目录,该参数的值默认是当前目录下的spark-warehouse目录。在spark 2.0中,hive.metastore.warehouse.dir属性已经过时了,现在使用 spark.sql.warehouse.dir属性来指定hive元数据库的位置。

    case class Record(key: Int, value: String)
    val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
    
    val spark = SparkSession
      .builder()
      .appName("Spark Hive Example")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .enableHiveSupport()
      .getOrCreate()
    
    import spark.implicits._
    import spark.sql
    
    
    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
    sql("LOAD DATA LOCAL INPATH 'kv1.txt' INTO TABLE src")
    sql("SELECT * FROM src").show()
    sql("SELECT COUNT(*) FROM src").show()
    
    val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
    val stringsDS = sqlDF.map {
      case Row(key: Int, value: String) => s"Key: $key, Value: $value"
    }
    stringsDS.show()
    
    val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
    recordsDF.createOrReplaceTempView("records")
    sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
    
    

    其中,上述用到的数据如下

    people.json

    {"name":"Michael"}
    {"name":"Andy", "age":30}
    {"name":"Justin", "age":19}
    

    kv1.txt

    238 val_238
    86 val_86
    311 val_311
    27 val_27
    165 val_165
    409 val_409
    255 val_255
    

    相关文章

      网友评论

          本文标题:185、Spark 2.0之SparkSession、Dataf

          本文链接:https://www.haomeiwen.com/subject/cikqeqtx.html