一、Spark访问Jdbc的三种API
- 接口一,并发度为1一次性获取所有数据。
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
// properties should override settings in extraOptions.
this.extraOptions = this.extraOptions ++ properties.asScala
// explicit url and dbtable should override all
this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
format("jdbc").load()
}
- 接口二,指定并发分区数量,指定分区字段,指定分区上下边界。分区字段只能是数字型
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame = {
// columnName, lowerBound, upperBound and numPartitions override settings in extraOptions.
this.extraOptions ++= Map(
JDBCOptions.JDBC_PARTITION_COLUMN -> columnName,
JDBCOptions.JDBC_LOWER_BOUND -> lowerBound.toString,
JDBCOptions.JDBC_UPPER_BOUND -> upperBound.toString,
JDBCOptions.JDBC_NUM_PARTITIONS -> numPartitions.toString)
jdbc(url, table, connectionProperties)
}
- 接口三、自定位分区方法,
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame = {
// connectionProperties should override settings in extraOptions.
val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
val options = new JDBCOptions(url, table, params)
val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
JDBCPartition(part, i) : Partition
}
val relation = JDBCRelation(parts, options)(sparkSession)
sparkSession.baseRelationToDataFrame(relation)
}
二、Spark访问Jdbc源码分析
1.jdbc方法入口,调用format
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
// properties should override settings in extraOptions.
this.extraOptions = this.extraOptions ++ properties.asScala
// explicit url and dbtable should override all
this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
format("jdbc").load()
}
2.format方法如下,这里只返回一个DataFrameReader ,并且将source指定为jdbc
def format(source: String): DataFrameReader = {
this.source = source
this
}
3.业务使用是,我会对DataFrameReader 的load方法获取dataframe。
def load(): DataFrame = {
load(Seq.empty: _*) // force invocation of `load(...varargs...)`
}
//随后返回一个dataframe
def load(paths: String*): DataFrame = {
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}
4.从这里看spark的jdbc本质还是调用的spark的datasource,分析下datasource如何实现
网友评论