使用Spark SQL的DataFrame接口,用户可以方便快速的从多种不同数据源(json/parquet/rdbms等),经过混合处理(比如json join parquet),再将处理结果以特定的格式(比如json/parquet等)写回到指定的系统(比如HDFS/S3)上去。
从Spark SQL 1.2引入了外部数据源的概念,有外部肯定有内部,内部数据源指的就是Spark SQL内置支持的数据源,包括json, parquet, jdbc, orc, libsvm, csv, text等
-
内置数据源
比如读取json格式的数据:
val peopleDF = spark.read.format("json").load("E:/ATempFile/people.json") //这是标准写法
parquet格式是Spark SQL默认处理数据格式,所以可以简写成如下形式:
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") //只读取两个字段,并写入新的parquet文件
关于默认处理parquet格式的数据,底层代码是这么写的:
val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default")
.doc("The default data source to use in input/output.")
.stringConf
.createWithDefault("parquet")
此外,如果你不想把文件转换成DataFrame进行操作,可以直接使用SQL查询文件,像这样:
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
下边以parquet和jdbc数据源举例介绍如何操作:
- 处理parquet文件
可以像上面说的一样:
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") //只读取两个字段,并 写入新的parquet文件
也可以在spark-sql --master local[2] 中:
CREATE TEMPORARY VIEW parquetTable1
USING org.apache.spark.sql.parquet
OPTIONS (
path "/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet"
)
SELECT * FROM parquetTable
注意上边USING的用法
- 处理MYSQL数据
spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/hive").option("dbtable", "hive.TBLS").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
val jdbcDF2 = spark.read.jdbc("jdbc:mysql://localhost:3306", "hive.TBLS", connectionProperties)
也可以使用spark-sql命令行:
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:mysql://localhost:3306",
dbtable "hive.TBLS",
user 'root',
password 'root',
driver 'com.mysql.jdbc.Driver'
)
-
外部数据源
Spark SQL 1.2引入了外部数据源,开发人员并不需要把外部数据源代码合并到spark中,而是可以通过--jars指定相关jar包即可,这样读取数据可以更加多种多样,使用也更加方便,具体有哪些外部数据源,可以打开这个网站:https://spark-packages.org,点击Data Source后可以看到支持的各种数据源。
比如操作avro文件,开发环境中只需引入(假设使用maven管理jar吧,sbt的同理):
<dependencies>
<!-- list of dependencies -->
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
如果是在生产环境,可以使用--packages
$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0
-
Save Modes
不管是什么类型的数据源,涉及到写操作的时候都有一个Save Modes,意思是要写入的文件已存在时该如何处理,下图截自官网,一目了然:
save modes
-
Saving to Persistent Tables
DataFrame 可以通过saveAsTable操作将数据作为持久表保存到Hive的元数据中。使用这个功能不一定需要Hive的部署。Spark将创建一个默认的本地的Hive的元数据保存(通过用Derby(一种数据库))。不同于createOrReplaceTempView,saveAsTable将实现DataFrame内容和创建一个指向这个Hive元数据的指针。持久表在你的spark程序重启后仍然存在,只要你保存你和元数据存储的连接。可以通过SparkSession调用table这个方法,来将DataFrame保存为一个持久表。
通过默认的saveAsTable 将会创建一个“管理表”,意思是数据的位置将被元数据控制。在数据表被删除的时候管理表也会被删除。
网友评论