美文网首页
Spark的JDBC漫谈

Spark的JDBC漫谈

作者: 学师大术 | 来源:发表于2019-05-31 09:39 被阅读0次

一、Spark访问Jdbc的三种API

  • 接口一,并发度为1一次性获取所有数据。
  def jdbc(url: String, table: String, properties: Properties): DataFrame = {
    // properties should override settings in extraOptions.
    this.extraOptions = this.extraOptions ++ properties.asScala
    // explicit url and dbtable should override all
    this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
    format("jdbc").load()
  }
  • 接口二,指定并发分区数量,指定分区字段,指定分区上下边界。分区字段只能是数字型
  def jdbc(
      url: String,
      table: String,
      columnName: String,
      lowerBound: Long,
      upperBound: Long,
      numPartitions: Int,
      connectionProperties: Properties): DataFrame = {
    // columnName, lowerBound, upperBound and numPartitions override settings in extraOptions.
    this.extraOptions ++= Map(
      JDBCOptions.JDBC_PARTITION_COLUMN -> columnName,
      JDBCOptions.JDBC_LOWER_BOUND -> lowerBound.toString,
      JDBCOptions.JDBC_UPPER_BOUND -> upperBound.toString,
      JDBCOptions.JDBC_NUM_PARTITIONS -> numPartitions.toString)
    jdbc(url, table, connectionProperties)
  }
  • 接口三、自定位分区方法,
  def jdbc(
      url: String,
      table: String,
      predicates: Array[String],
      connectionProperties: Properties): DataFrame = {
    // connectionProperties should override settings in extraOptions.
    val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
    val options = new JDBCOptions(url, table, params)
    val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
      JDBCPartition(part, i) : Partition
    }
    val relation = JDBCRelation(parts, options)(sparkSession)
    sparkSession.baseRelationToDataFrame(relation)
  }

二、Spark访问Jdbc源码分析

1.jdbc方法入口,调用format

  def jdbc(url: String, table: String, properties: Properties): DataFrame = {
    // properties should override settings in extraOptions.
    this.extraOptions = this.extraOptions ++ properties.asScala
    // explicit url and dbtable should override all
    this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
    format("jdbc").load()
  }

2.format方法如下,这里只返回一个DataFrameReader ,并且将source指定为jdbc

  def format(source: String): DataFrameReader = {
    this.source = source
    this
  }

3.业务使用是,我会对DataFrameReader 的load方法获取dataframe。

  def load(): DataFrame = {
    load(Seq.empty: _*) // force invocation of `load(...varargs...)`
  }
//随后返回一个dataframe
  def load(paths: String*): DataFrame = {
    sparkSession.baseRelationToDataFrame(
      DataSource.apply(
        sparkSession,
        paths = paths,
        userSpecifiedSchema = userSpecifiedSchema,
        className = source,
        options = extraOptions.toMap).resolveRelation())
  }

4.从这里看spark的jdbc本质还是调用的spark的datasource,分析下datasource如何实现

相关文章

网友评论

      本文标题:Spark的JDBC漫谈

      本文链接:https://www.haomeiwen.com/subject/mieseqtx.html