SparkSQL

作者: 大数据技术宅 | 来源:发表于2018-05-06 18:42 被阅读5次

    原文链接:SparkSQL—用之惜之

    更多精彩内容请关注笔者公众号:大数据技术宅


    SparkSql作为Spark的结构化数据处理模块,提供了非常强大的API,让分析人员用一次,就会为之倾倒,为之着迷,为之至死不渝。在内部,SparkSQL使用额外结构信息来执行额外的优化。在外部,可以使用SQL和DataSet 的API与之交互。本文笔者将带你走进SparkSql的世界,领略SparkSql之诸多妙处。

    DataSet和DataFrame

    当使用编程语言对结构化数据进行操作时候,SparkSql中返回的数据类型是DataSet/DataFrame,因此开篇笔者就先对这两种数据类型进行简单的介绍。

    Dataset 是分布式的数据集合。是Spark 1.6中添加的一个新接口,是特定域对象中的强类型集合,它可以使用函数或者相关操作并行地进行转换等操作,数据集可以由JVM对象构造,然后使用函数转换(map、flatmap、filter等)进行操作。Dataset 支持Scala和javaAPI,不支持Python API。

    DataFrame是由列组成的数据集,它在概念上等同于关系数据库中的表或R/Python中的data frame,但在查询引擎上进行了丰富的优化。DataFrame可以由各种各样的源构建,例如:结构化数据文件、hive中的表、外部数据库或现有的RDD。

    SparkSQL基于DataFrame的操作

    1importorg.apache.spark.sql.SparkSession

    2val spark = SparkSession

    3.builder()

    4.appName("Spark SQL basic example")

    5.getOrCreate()

    6//引入Spark的隐式类型转换,如将RDD转换成 DataFrame

    7importspark.implicits._

    8val df = spark.read.json("/data/tmp/SparkSQL/people.json")

    9df.show()//将DataFrame的内容进行标准输出

    10//+---+-------+

    11//|age|   name|

    12//+---+-------+

    13//|   |Michael|

    14//| 19|   Andy|

    15//| 30| Justin|

    16//+---+-------+

    17

    18df.printSchema()//打印出DataFrame的表结构

    19//root

    20// |-- age: string (nullable = true)

    21// |-- name: string (nullable = true)

    22

    23df.select("name").show()

    24//类似于select name from DataFrame的SQL语句

    25

    26df.select($"name", $"age"+1).show()

    27//类似于select name,age+1 from DataFrame的SQL语句

    28//此处注意,如果对列进行操作,所有列名前都必须加上$符号

    29

    30df.filter($"age">21).show()

    31//类似于select * from DataFrame where age>21 的SQL语句

    32

    33df.groupBy("age").count().show()

    34//类似于select age,count(age) from DataFrame group by age;

    35

    36//同时也可以直接写SQL进行DataFrame数据的分析

    37df.createOrReplaceTempView("people")

    38val sqlDF = spark.sql("SELECT * FROM people")

    39sqlDF.show()

    SparkSQL基于DataSet的操作

    由于DataSet吸收了RDD和DataFrame的优点,所有可以同时向操作RDD和DataFrame一样来操作DataSet。看下边一个简单的例子。

    1caseclassPerson(name: String, age: Long)

    2// 通过 case类创建DataSet

    3val caseClassDS

    = Seq(Person("Andy",32)).toDS()

    4caseClassDS.show()

    5// +----+---+

    6// |name|age|

    7// +----+---+

    8// |Andy| 32|

    9// +----+---+

    10

    11// 通过基本类型创建DataSet

    12importing spark.implicits._

    13val primitiveDS = Seq(1,2,3).toDS()

    14primitiveDS.map(_ +1).collect()

    15// Returns: Array(2, 3, 4)

    16

    17// 将DataFrames转换成DataSet

    18val path ="examples/src/main/resources/people.json"

    19val peopleDS = spark.read.json(path).as[Person]

    20peopleDS.show()

    21// +----+-------+

    22// | age|   name|

    23// +----+-------+

    24// |null|Michael|

    25// |  30|   Andy|

    26// |  19| Justin|

    27// +----+-------+

    在上边的例子中能够发现DataSet的创建是非常简单的,但是笔者需要强调一点,DataSet是强类型的,也就是说DataSet的每一列都有指定的列标识符和数据类型。下边的列子将进一步介绍DataSet与RDD的交互。

    1importspark.implicits._

    2//将RDD转换成DataFrame

    3val peopleDF = spark.sparkContext

    4.textFile("examples/src/main/resources/people.txt")

    5.map(_.split(","))

    6.map(attributes=>Person(attributes(0),attributes(1).trim.toInt))

    7.toDF()

    8// 将RDD注册为一个临时视图

    9peopleDF.createOrReplaceTempView("people")

    10//对临时视图进行Sql查询

    11val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

    12

    13// 对teenagersDF 对应的DataFrame进行RDD的算子map操作

    14teenagersDF.map(teenager =>"Name: "+ teenager(0)).show()

    15// +------------+

    16// |       value|

    17// +------------+

    18// |Name: Justin|

    19// +------------+

    20

    21// 与上一条语句效果一样

    22teenagersDF.map(teenager =>"Name: "+ teenager.getAs[String]("name")).show()

    23// +------------+

    24// |       value|

    25// +------------+

    26// |Name: Justin|

    27// +------------+

    SparkSQL操作HIve表

    Spark SQL支持读取和写入存储在Apache HIVE中的数据。然而,由于Hive具有大量的依赖关系,默认情况下这些依赖性不包含在Spark分布中。如果能在classpath路径找到Hive依赖文件,Spark将自动加载它们。另外需要注意的是,这些Hive依赖项须存在于所有Spark的Worker节点上,因为它们需要访问Hive序列化和反序列化库(SerDes),以便访问存储在Hive中的数据。

     1importjava.io.File

     2importorg.apache.spark.sql.{Row, SaveMode, SparkSession}

     3

     4caseclassRecord(key: Int, value: String)

     5

     6// 设置hive数据库默认的路径

     7val warehouseLocation

    =newFile("spark-warehouse").getAbsolutePath

     8

     9val spark = SparkSession

    10.builder()

    11.appName("Spark Hive Example")

    12.config("spark.sql.warehouse.dir", warehouseLocation)

    13.enableHiveSupport()

    14.getOrCreate()

    15

    16importspark.implicits._

    17importspark.sql

    18

    19//创建hive表,导入数据,并且查询数据

    20sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")

    21sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

    22sql("SELECT * FROM src").show()

    23

    24// +---+-------+

    25// |key|  value|

    26// +---+-------+

    27// |238|val_238|

    28// | 86| val_86|

    29// |311|val_311|

    30// ...

    31

    32//对hive表数据进行聚合操作

    33sql("SELECT COUNT(*) FROM src").show()

    34// +--------+

    35// |count(1)|

    36// +--------+

    37// |    500 |

    38// +--------+

    39

    40// sql执行的查询结果返回DataFrame类型数据,支持常用的RDD操作

    41val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

    42val stringsDS = sqlDF.map {

    43caseRow(key: Int, value: String)=> s"Key: $key, Value: $value"

    44}

    45stringsDS.show()

    46// +--------------------+

    47// |               value|

    48// +--------------------+

    49// |Key: 0, Value: val_0|

    50// |Key: 0, Value: val_0|

    51// |Key: 0, Value: val_0|

    52// ...

    53

    54// 通过DataFrames创建一个临时视图val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))

    55recordsDF.createOrReplaceTempView("records")

    56

    57// 查询操作可以将临时的视图与HIve表中数据进行关联查询

    58sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()

    59// +---+------+---+------+

    60// |key| value|key| value|

    61// +---+------+---+------+

    62// |  2| val_2|  2| val_2|

    63// |  4| val_4|  4| val_4|

    64// |  5| val_5|  5| val_5|

    65// ...

    66

    67// 创建一个Hive表,并且以parquet格式存储数据

    68sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")

    69// 讲DataFrame中数据保存到Hive表里

    70val df = spark.table("src")

    71df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")

    72sql("SELECT * FROM hive_records").show()

    73// +---+-------+

    74// |key|  value|

    75// +---+-------+

    76// |238|val_238|

    77// | 86| val_86|

    78// |311|val_311|

    79// ...

    80

    81// 在指定路径创建一个Parquet文件并且写入数据

    82val dataDir ="/tmp/parquet_data"

    83spark.range(10).write.parquet(dataDir)

    84// 创建HIve外部表

    85sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")

    86sql("SELECT * FROM hive_ints").show()

    87// +---+

    88// |key|

    89// +---+

    90// |  0|

    91// |  1|

    92// |  2|

    93// ...

    94

    95// Turn on flag for Hive Dynamic Partitioning

    96spark.sqlContext.setConf("hive.exec.dynamic.partition","true")

    97spark.sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")

    98// 通过DataFrame的API创建HIve分区表

    99df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")

    100sql("SELECT * FROM hive_part_tbl").show()

    101// +-------+---+

    102// |  value|key|

    103// +-------+---+

    104// |val_238|238|

    105// | val_86| 86|

    106// |val_311|311|

    107// ...

    108

    109spark.stop()

    当然SparkSql的操作远不止这些,它可以直接对文件快执行Sql查询,也可以通过JDBC连接到关系型数据库,对关系型数据库中的数据进行一些运算分析操作。如果读者感觉不过瘾,可以留言与笔者交流,也可以通过Spark官网查阅相关例子进行学习。下一篇关于Spark的文章,笔者将详细的介绍Spark的常用算子,以满足渴望进行数据分析的小伙伴们的求知的欲望。

    相关文章

      网友评论

        本文标题:SparkSQL

        本文链接:https://www.haomeiwen.com/subject/nowurftx.html