一、Spark连接SQL数据源需要为连接器提供合适的JAR包
# 以读写 PostgreSQL 数据库为例
./bin/sprk-shell \
--driver-class-path postgresql-9.4.1207.jar \
--jars postgresql-9.4.1207.jar
二、JDBC 数据源选项
属性名称 | 说明 |
---|---|
Url | 表示连接的JDBC URL,可以在URL中指定特定源的连接属性,例如jdbc:postgresql://localhost/test?user=fred&password=secret |
dbtable | 表示要读取的JDBC 表,请注意, 可以使用SQL查询的FROM子句中任何有效内容,例如你可以在圆括号中使用子查询,而不是全表查询 |
driver | 用于连接到此URL的JDBC驱动器的类名 |
partitionColumn,lowerBound,upperBound | 如果指定了这些选项中的任何一个,则必须设置其他所有选项。另外,还必须指定numPartition。这些属性描述了如何在从多个worker并行读取时对表格进行划分。partitionColumn是要分区的列,必须是整类型。请注意,lowerBound和upperBound仅用于确定分区跨度,而不用于过滤表中的行。因此表中所有的行都将被划分返回 |
numPartitions | 在读取和写入数据表时,数据表可用于并行的最大分区数,这也决定了并发JDBC连接的最大数目。如果要写入的分区数超过此限制,则通过在写入时调用coalesce(numPartitions)来将分区数降到符合此限制 |
fetchsize | 表示JDBC每次读取多少条记录。这个设置与JDBC驱动器的性能有关系,JDBC驱动器默认值低获取行数。该选项仅适用于读操作 |
batchsize | 表示JDBC批处理大小。用于指定每次写入多少条记录。这个选项与JDBC驱动器性能有关系,该选项仅适用于写操作,默认值为1000 |
isolationLevel | 表示数据库的事务隔离级别(适用于当前连接)。它可以取值NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ或SERIALIZABLE,分别对应于JDBC的Connection对象定义的标准事务隔离级别。默认值为READ_UNCOMMITTED,此选项仅适用于写操作 |
truncate | 这是一个和JDBC写入相关的选项。当Spark要执行覆盖表操作时,即启用SaveMode.Overwrite,Spark将截取现有表,而不是删除之后再重新创建它。这样可以提高效率,并防止表元数据被删除。但是,在某些情况下,例如新数据具有不同的schema时,它并不起作用。默认值为false,该选项仅用于写操作。 |
createTableOptions | 这是一个JDBC写入相关的选项。用于在创建表时设置特定数据库表和分区选项。例如,CREATE TABLE t (name string) ENGINE=InnoDB。该选项仅适用于写操作 |
createTableColumnTypes | 表示创建表时使用的数据库列数据类型,而不是默认值。应该使用与CREATE TABLE列语法相同(例如,"name char(64), comments varchar(1024)")来指定数据类型信息,指定的类型应该是有效的SparkSQL数据类型。该选项仅适用于写操作 |
三、从SQL数据库中读取数据
// 这里以PostgreSQL数据库为例
val pgDf = spark.read
.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://database_server")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "my-secret-password")
.load()
pgDf.select("DEST_COUNTRY_NAME").distinct().show(5)
四、查询下推
- 在创建DataFrame之前,Spark会尽力过滤数据库中的数据,在某些查询中,Spark实际上会做的更好,例如我们在DataFrame上指定一个filter(过滤器),Spark就会将过滤器函数下推到数据库端
pgDf.select("DEST_COUNTRY_NAME").distinct().explain
- 但是Spark不能把它的所有函数转换为你所使用的SQL数据库中的函数,因此有时候需要用SQL表达式整个查询并将结果作为DataFrame返回
val pushDownQuery =
"""
|(select distinct(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info
|""".stripMargin
val dbDataFrame = spark.read.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://database_server")
.option("dbtable", pushDownQuery)
.load()
五、并行度数据库
- 可以通过指定最大分区数量来限制并行读写的最大数量
// 当数据不多时,仍然会作为一个分区,但是此配置可帮助你确保在读取和写入数据时不会导致数据库过载
val dbDataFrame = spark.read.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://database_server")
.option("dbtable", "tablename")
.option("numPartitions", 10)
.load()
- 可以通过在连接中显示地将谓词下推到SQL数据库中执行,这有利于通过指定谓词来控制分区数据的物理存放位置.
val props = new java.util.Properties
props.setProperty("driver", "org.sqlite.JDBC")
// 但是如果指定的谓词集合不相交,则会出现大量重复的行
val predicates = Array(
"DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
"DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'"
)
spark.read.jdbc("url", "tablename", predicates, props).show()
spark.read.jdbc("url", "tablename", predicates, props).rdd.getNumPartitions
六、基于滑动窗口的分区
现在基于谓词进行分区,基于数值型的count进行分区,我们为第一个分区和最后一个分区分别指定一个最小值和一个最大值,超出该范围的数据将存放到第一个分区或最后一个分区。接下来指定分区总数
val props = new java.util.Properties
props.setProperty("driver", "org.sqlite.JDBC")
val colName = "count"
val lowerBound = 0L
val upperBound = 348113L
val numPartitions = 10
// 根据count列数值,从小到大均匀划分10个间隔区间的数据,之后每个区间的数据被分到一个分区
spark.read.jdbc(
"url", "tablename", colName, lowerBound, upperBound, numPartitions, props
).count()
七、写入SQL数据库
val newPath = "jdbc:sqlite://tmp/my-sqlite.db"
pgDf.write.mode("overwrite").jdbc(newPath, "tablename", props)
网友评论