美文网首页技术文章Spark
Spark SQL(三)DataSource

Spark SQL(三)DataSource

作者: Sx_Ren | 来源:发表于2018-03-18 09:31 被阅读0次

    使用Spark SQL的DataFrame接口,用户可以方便快速的从多种不同数据源(json/parquet/rdbms等),经过混合处理(比如json join parquet),再将处理结果以特定的格式(比如json/parquet等)写回到指定的系统(比如HDFS/S3)上去。

    从Spark SQL 1.2引入了外部数据源的概念,有外部肯定有内部,内部数据源指的就是Spark SQL内置支持的数据源,包括json, parquet, jdbc, orc, libsvm, csv, text等

    • 内置数据源

    比如读取json格式的数据:

    val peopleDF = spark.read.format("json").load("E:/ATempFile/people.json") //这是标准写法
    

    parquet格式是Spark SQL默认处理数据格式,所以可以简写成如下形式:

    val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
    usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") //只读取两个字段,并写入新的parquet文件
    

    关于默认处理parquet格式的数据,底层代码是这么写的:

    val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default")
        .doc("The default data source to use in input/output.")
        .stringConf
        .createWithDefault("parquet")
    

    此外,如果你不想把文件转换成DataFrame进行操作,可以直接使用SQL查询文件,像这样:

    val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
    

    下边以parquet和jdbc数据源举例介绍如何操作:

    1. 处理parquet文件
      可以像上面说的一样:
    val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
    usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") //只读取两个字段,并 写入新的parquet文件
    

    也可以在spark-sql --master local[2] 中:

    CREATE TEMPORARY VIEW parquetTable1
    USING org.apache.spark.sql.parquet
    OPTIONS (
      path "/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet"
    )
    SELECT * FROM parquetTable
    

    注意上边USING的用法

    1. 处理MYSQL数据
    spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/hive").option("dbtable", "hive.TBLS").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
    
    import java.util.Properties
    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "root")
    connectionProperties.put("driver", "com.mysql.jdbc.Driver")
    
    val jdbcDF2 = spark.read.jdbc("jdbc:mysql://localhost:3306", "hive.TBLS", connectionProperties)
    

    也可以使用spark-sql命令行:

    CREATE TEMPORARY VIEW jdbcTable
    USING org.apache.spark.sql.jdbc
    OPTIONS (
      url "jdbc:mysql://localhost:3306",
      dbtable "hive.TBLS",
      user 'root',
      password 'root',
      driver 'com.mysql.jdbc.Driver'
    )
    
    • 外部数据源

    Spark SQL 1.2引入了外部数据源,开发人员并不需要把外部数据源代码合并到spark中,而是可以通过--jars指定相关jar包即可,这样读取数据可以更加多种多样,使用也更加方便,具体有哪些外部数据源,可以打开这个网站:https://spark-packages.org,点击Data Source后可以看到支持的各种数据源。
    比如操作avro文件,开发环境中只需引入(假设使用maven管理jar吧,sbt的同理):

    <dependencies>
      <!-- list of dependencies -->
      <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>4.0.0</version>
      </dependency>
    </dependencies>
    

    如果是在生产环境,可以使用--packages

    $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0
    
    • Save Modes

    不管是什么类型的数据源,涉及到写操作的时候都有一个Save Modes,意思是要写入的文件已存在时该如何处理,下图截自官网,一目了然:


    save modes
    • Saving to Persistent Tables

    DataFrame 可以通过saveAsTable操作将数据作为持久表保存到Hive的元数据中。使用这个功能不一定需要Hive的部署。Spark将创建一个默认的本地的Hive的元数据保存(通过用Derby(一种数据库))。不同于createOrReplaceTempView,saveAsTable将实现DataFrame内容和创建一个指向这个Hive元数据的指针。持久表在你的spark程序重启后仍然存在,只要你保存你和元数据存储的连接。可以通过SparkSession调用table这个方法,来将DataFrame保存为一个持久表。

    通过默认的saveAsTable 将会创建一个“管理表”,意思是数据的位置将被元数据控制。在数据表被删除的时候管理表也会被删除。

    相关文章

      网友评论

        本文标题:Spark SQL(三)DataSource

        本文链接:https://www.haomeiwen.com/subject/jdzdfftx.html