0302 Data Sources

作者: Whaatfor | 来源:发表于2020-06-27 12:12 被阅读0次

    转载请注明出处,谢谢合作~

    该篇中的示例暂时只有 Scala 版本~

    数据源

    Spark SQL 支持通过 DataFrame 接口操作多种数据源。一个 DataFrame 可以通过关系型转换算子操作,还可以用来创建一个临时视图。将 DataFrame 注册成一个临时视图之后就可以通过 SQL 语句进行查询。本章节介绍了使用 Spark 数据源加载和存储数据的常用方法,之后给出内置数据源需要的不同的参数。

    常用读写函数

    最简单的情况,默认的数据源(parquet 除非配置了参数 spark.sql.sources.default)将会被使用。

    val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
    usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    手动指定参数

    可以通过手动指定数据源参数和相关的额外选项来创建 DataFrame。数据源通过全限定类名来指定(比如 org.apache.spark.sql.parquet),但是对于内置的数据源可以使用简称(json, parquet, jdbc, orc, libsvm, csv, text)。从任意数据源类型中加载的数据可以轻松的转换成其他支持的格式。

    可以通过 API 文档了解内置数据源的所有选项,例如 org.apache.spark.sql.DataFrameReaderorg.apache.spark.sql.DataFrameWriter。文档中的选项对于非 Scala 语言的 Spark API(比如 PySpark)也适用。对于其他的格式,参见相应的 API 文档。

    加载 JSON 文件:

    val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
    peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    加载 CSV 文件:

    val peopleDFCsv = spark.read.format("csv")
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .load("examples/src/main/resources/people.csv")
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    额外的选项对于写出操作也适用。例如,对于 ORC 文件可以通过选项控制布隆过滤器和字典编码。下面的 ORC 样例将会创建布隆过滤器并对 favorite_color 列使用字典编码。对于 Parquet 文件,也有一个 parquet.enable.dictionary 的选项。有关 ORC/Parquet 数据格式选项的更多详情请移步 Apache ORC/Parquet 官方网站。

    usersDF.write.format("orc")
      .option("orc.bloom.filter.columns", "favorite_color")
      .option("orc.dictionary.key.threshold", "1.0")
      .option("orc.column.encoding.direct", "name")
      .save("users_with_options.orc")
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    直接通过 SQL 操作文件

    除了使用 read API 加载文件成为一个 DataFrame,还可以通过 SQL 直接查询。

    val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    存储模式

    存储操作有一个可选的 SaveMode 类型的参数,该参数指定如何处理存储路径中已经存在的数据。需要注意的是存储模式并不加锁,也不是原子性的。此外,当存储模式设置为 Overwrite 时,在写入新数据之前会删除旧数据。

    Scala/Java Any Language Meaning
    SaveMode.ErrorIfExists (default) "error" or "errorifexists" (default) 如果输出路径中的数据已存在,会抛出异常。
    SaveMode.Append "append" 如果输出路径中的数据已存在,写出的数据会被追加到现有数据之后。
    SaveMode.Overwrite "overwrite" 如果输出路径中的数据已存在,会被新写出的数据替换。
    SaveMode.Ignore "ignore" 如果输出路径中的数据已存在,新写出的数据不会被存储,现有的数据也不会被更新。类似于 SQL 中的 CREATE TABLE IF NOT EXISTS

    持久化数据表

    DataFrames 还可以通过 saveAsTable 方法存储到 Hive 中。注意,并不需要事先安装好 Hive,Spark 会通过 Derby 创建一个默认的本地 Hive 元数据库。跟 createOrReplaceTempView 方法不同的是,saveAsTable 方法会将 DataFrame 中的数据落盘并在 Hive 元数据库中创建一条描述存储信息的记录。只要能够访问元数据库,持久化后的数据表在 Spark 应用程序重启之后依旧可以访问。在持久化数据表时可以通过调用 SparkSessiontable 方法指定表名称。

    基于文件的数据源,比如 text,parquet,json 等等,可以通过 path 选项指定存储路径,例如 df.write.option("path", "/some/path").saveAsTable("t")。当数据表被删除后,自定义存储路径中的数据不会被删除。如果不指定存储路径,Spark 会将数据写出到 warehouse 文件夹,这种情况下如果数据表被删除,数据也会被删除。

    从 Spark 2.1 开始,持久化的数据表的每个分区都会在 Hive 元数据库中有相关记录,这种方式带来一些优化:

    • 对于查询元数据库可以只返回所需要分区的信息,在第一次查询时不再需要加载所有的分区。
    • Datasource API 支持了像 ALTER TABLE PARTITION ... SET LOCATION 这样的 Hive DDL 语句。

    注意,在创建外部数据表(自定义了 path 选项)时,默认情况下分区信息并没有被收集,同步元数据库中的分区信息需要执行 MSCK REPAIR TABLE 语句。

    分桶,排序和分区

    对于基于文件的数据源,还可以对输出进行分桶,排序和分区。分桶和排序只适用于通过 saveAsTable 持久化数据表:

    peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    但是分区操作对于 savesaveAsTable 都适用:

    usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    可以同时在一张表中适用分桶和分区:

    usersDF
      .write
      .partitionBy("favorite_color")
      .bucketBy(42, "name")
      .saveAsTable("users_partitioned_bucketed")
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    就像 Partition Discovery 章节描述的那样,partitionBy 操作会创建一个目录结构。所以,以很大基数的字段分区不会是一个明智的选择。相反,bucketBy 操作将数据按照固定数量分桶,适用于数据技基数很大的场景。

    常用的文件数据源选项

    这些常用的选项只在适用文件类型的数据源时有效:parquet, orc, avro, json, csv, text。

    注意在示例中使用的目录结构如下:

    dir1/
     ├── dir2/
     │    └── file2.parquet (schema: <file: string>, content: "file2.parquet")
     └── file1.parquet (schema: <file, string>, content: "file1.parquet")
     └── file3.json (schema: <file, string>, content: "{'file':'corrupt.json'}")
    

    忽略损坏的文件

    在从文件读取数据的过程中可以通过设置参数 spark.sql.files.ignoreCorruptFiles 忽略损坏的文件,当设置为 true 时,Spark 作业在遇到损坏的文件的时候会继续运行,已经被读取的部分依旧有效。

    // enable ignore corrupt files
    spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
    // dir1/file3.json is corrupt from parquet's view
    val testCorruptDF = spark.read.parquet(
      "examples/src/main/resources/dir1/",
      "examples/src/main/resources/dir1/dir2/")
    testCorruptDF.show()
    // +-------------+
    // |         file|
    // +-------------+
    // |file1.parquet|
    // |file2.parquet|
    // +-------------+
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    忽略丢失的文件

    在从文件读取数据的过程中可以通过设置参数 spark.sql.files.ignoreMissingFiles 忽略丢失的文件。在这里丢失的文件是指在创建好 DataFrame 之后删除了的文件。当设置为 true 时,Spark 作业在遇到丢失的文件的时候会继续运行,已经被读取的部分依旧有效。

    全局文件过滤器

    选项 pathGlobFilter 用来只读取匹配目标模式的文件,语法与 org.apache.hadoop.fs.GlobFilter 相同,过滤操作不会改变分区发现的行为。

    val testGlobFilterDF = spark.read.format("parquet")
      .option("pathGlobFilter", "*.parquet") // json file should be filtered out
      .load("examples/src/main/resources/dir1")
    testGlobFilterDF.show()
    // +-------------+
    // |         file|
    // +-------------+
    // |file1.parquet|
    // +-------------+
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    递归文件搜索

    选项 recursiveFileLookup 用来递归查找目标路径下的匹配文件,但是会关闭分区推断机制,默认值为 false。如果在选项 recursiveFileLookup 为 true 时显示指定了 partitionSpec 选项,会抛出异常。

    val recursiveLoadedDF = spark.read.format("parquet")
      .option("recursiveFileLookup", "true")
      .load("examples/src/main/resources/dir1")
    recursiveLoadedDF.show()
    // +-------------+
    // |         file|
    // +-------------+
    // |file1.parquet|
    // |file2.parquet|
    // +-------------+
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    Parquet 文件

    Parquet 是一种被许多数据处理系统支持的列式存储格式。Spark SQL 提供了对 Parquet 文件读写的支持,文件中会保存原始数据的模式。当读取 Parquet 文件时,处于兼容性的考虑所有的列被自动转换为 nullable 类型。

    通过程序加载数据

    采用上述示例中的数据:

    // Encoders for most common types are automatically provided by importing spark.implicits._
    import spark.implicits._
    
    val peopleDF = spark.read.json("examples/src/main/resources/people.json")
    
    // DataFrames can be saved as Parquet files, maintaining the schema information
    peopleDF.write.parquet("people.parquet")
    
    // Read in the parquet file created above
    // Parquet files are self-describing so the schema is preserved
    // The result of loading a Parquet file is also a DataFrame
    val parquetFileDF = spark.read.parquet("people.parquet")
    
    // Parquet files can also be used to create a temporary view and then used in SQL statements
    parquetFileDF.createOrReplaceTempView("parquetFile")
    val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
    namesDF.map(attributes => "Name: " + attributes(0)).show()
    // +------------+
    // |       value|
    // +------------+
    // |Name: Justin|
    // +------------+
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    分区发现

    数据分区是一种常见的优化方式,比如在 Hive 中就是如此。对于分区表,数据通常存储在不同的目录,分区键的值会被编码到分区目录的名称中。所有内置的基于文件的数据源(Text/CSV/JSON/ORC/Parquet)都能够自动的发现和推断分区信息。例如,可以将之前用到的人口数据采用下面的目录结构存储到分区表中,分区键是 gendercountry 两列:

    path
    └── to
        └── table
            ├── gender=male
            │   ├── ...
            │   │
            │   ├── country=US
            │   │   └── data.parquet
            │   ├── country=CN
            │   │   └── data.parquet
            │   └── ...
            └── gender=female
                ├── ...
                │
                ├── country=US
                │   └── data.parquet
                ├── country=CN
                │   └── data.parquet
                └── ...
    

    将目录 path/to/table 传递给 SparkSession.read.parquet 或者 SparkSession.read.load 方法,Spark SQL 会从目录中自动提取分区信息,于是 DataFrame 的模式变成了:

    root
    |-- name: string (nullable = true)
    |-- age: long (nullable = true)
    |-- gender: string (nullable = true)
    |-- country: string (nullable = true)
    

    注意,分区键的数据类型是自动推断的,目前支持的有数字类型,日期类型,时间戳类型和字符串类型。有事用户不希望自动推断分区键的数据类型,对于此类需求,分区键类型推断可以由参数 spark.sql.sources.partitionColumnTypeInference.enabled 控制,默认值为 true。当类型推荐被禁用时,分区键会被指定为字符串类型。

    从 Spark 1.6.0 开始,默认情况下分区发现功能只会寻找指定路径下的分区。对于上述示例,如果用户将路径 path/to/table/gender=male 传递给 SparkSession.read.parquet 或者 SparkSession.read.load 方法,gender 不会被当做一个分区键。如果用户需要指定发现分区的根目录,可以设置 basePath 选项。例如,当给定路径为 path/to/table/gender=male 而选项 basePath 设置为 path/to/table/ 时,gender 也会被当做一个分区键。

    模式融合

    像 Protocol Buffer,Avro 和 Thrift 一样,Parquet 也支持模式演化。用户可以初始定义一个简单的模式,之后在需要的时候增加列。如此一来,可能最终会有多个模式不同但又相互兼容的 Parquet 文件。Parquet 数据源现在已经能够自动检测这种情况,并在需要时融合多个文件的模式。

    由于模式融合是一个开销较大的操作,而且在大多数情况下是不需要的,自 Spark 1.5.0 以来该功能默认情况下时关闭的,可以通过以下两种方式开启:

    1. 在读取 Parquet 文件时设置数据源选项 mergeSchematrue
    2. 设置全局的 SQL 参数 spark.sql.parquet.mergeSchematrue
    // This is used to implicitly convert an RDD to a DataFrame.
    import spark.implicits._
    
    // Create a simple DataFrame, store into a partition directory
    val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.parquet("data/test_table/key=1")
    
    // Create another DataFrame in a new partition directory,
    // adding a new column and dropping an existing column
    val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
    cubesDF.write.parquet("data/test_table/key=2")
    
    // Read the partitioned table
    val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
    mergedDF.printSchema()
    
    // The final schema consists of all 3 columns in the Parquet files together
    // with the partitioning column appeared in the partition directory paths
    // root
    //  |-- value: int (nullable = true)
    //  |-- square: int (nullable = true)
    //  |-- cube: int (nullable = true)
    //  |-- key: int (nullable = true)
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    Hive 元数据 Parquet 表转换

    当从 Hive 中读取 Parquet 格式的表和向 Hive 中写入不分区的 Parquet 表时,处于性能的考量 Spark SQL 会尝试使用自己的 Parquet 解析器而不是 Hive 的 SerDe 类库。这个机制由参数spark.sql.hive.convertMetastoreParquet` 控制,默认开启。

    Hive/Parquet 模式调和

    从模式解析的角度来看,Hive 和 Parquet 之间有两个重要的不同之处。

    1. Hive 是大小写不敏感的,而 Parquet 大小写敏感。
    2. Hive 中的所有列都默认是 nullable,但是空值在 Parquet 中是有意义的。

    因此,当把一个 Hive 中的 Parquet 表转换为 Spark SQL 中的 Parquet 表时必须调和两者模式之间的差异。规则如下:

    1. 无论是否可为空值,两者模式中相同名称的字段必须拥有相同的数据类型。为了满足可为空值的条件,被调和的字段应该具有 Parquet 端的数据类型。
    2. 被调和的模式需要完全包含 Hive 模式中的字段。
      • 在被调和的模式中,只出现在 Parquet 端的字段会被删除。Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
      • 在被调和的模式中,只出现在 Hive 模式中的字段会被添加为一个 nullable 字段。

    刷新元数据

    为了更好的性能,Spark SQL 会缓存 Parquet 表的元数据。开启 Hive 元数据 Parquet 表转换之后,被转换的的表的元数据也会被缓存。如果这些表被 Hive 或者外部工具更新,需要手动刷新来保持元数据的一致性。

    // spark is an existing SparkSession
    spark.catalog.refreshTable("my_table")
    

    配置项

    Parquet 的配置项可以通过 SparkSessionsetConf 方法或者使用 SQL 语法 SET key=value 来设置。

    Property Name Default Meaning Since Version
    spark.sql.parquet.binaryAsString false 一些处理 Parquet 文件的计算引擎,尤其是 Impala,HIve 和早期版本的 Spark SQL,在写出数据时并不区分二进制类型和自负串类型。该参数告诉 Spark SQL 将二进制数据解析为字符串来与那些系统保持兼容。 1.1.1
    spark.sql.parquet.int96AsTimestamp true 一些处理 Parquet 文件的计算引擎,尤其是 Impala 和 HIve,把时间戳存储为 INT96 类型。该参数告诉 Spark SQL 将 INT96 数据解析为时间戳来与那些系统保持兼容。 1.3.0
    spark.sql.parquet.compression.codec snappy 写出 Parquet 文件时使用的压缩格式。如果在表属性中同时设置了 compressionparquet.compression 属性,则配置生效的优先级从高到低依次为 compression, parquet.compression, spark.sql.parquet.compression.codec。可选的压缩格式有:snappy, gzip, lzo, brotli, lz4, zstd。注意,在 Hadoop 2.9.0 之前 zstd 格式需要事先安装 ZStandardCodecbrotli 格式需要事先安装 BrotliCodec 1.1.1
    spark.sql.parquet.filterPushdown true 是否开启 Parquet 谓词下推的优化机制。 1.2.0
    spark.sql.hive.convertMetastoreParquet true 设置为 false 时,Spark SQL 会使用 Hive SerDe 解析 Parquet 文件而不是使用内置的解析器。 1.1.1
    spark.sql.parquet.mergeSchema false 设置为 true 时,Parquet 数据源会融合所有数据文件的模式,否则模式将会从概要文件中推断,如果概要文件不存在,就从随机数据文件中推断。 1.5.0
    spark.sql.parquet.writeLegacyFormat false 如果设置为 true,数据将会以 Spark 1.4 之前的格式写出。例如,小数格式的数据将会以定长字节数组的格式写出,也是 Hive 和 Impala 的方式。如果写出失败,那么将会使用新的数据格式。例如,小数格式的数据会以 int 类型的格式写出。如果写出的 Parquet 文件是给不支持新格式的系统所用,请将该参数设置为 true。 1.6.0

    ORC 文件

    从 Spark 2.3 开始,Spark 支持以向量化的方式读取 ORC 文件,不过需要新增几个配置项。对于原生 ORC 数据表(使用 USING ORC 语句创建的数据表),需要设置参数 spark.sql.orc.implnative ,设置参数 spark.sql.orc.enableVectorizedReadertrue 来开启向量化读取。对于 Hive ORC serde 数据表(使用 USING HIVE OPTIONS (fileFormat 'ORC') 语句创建的数据表),需要设置参数 spark.sql.hive.convertMetastoreOrctrue 来开启向量化读取

    Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files. To do that, the following configurations are newly added. The vectorized reader is used for the native ORC tables (e.g., the ones created using the clause USING ORC) when spark.sql.orc.impl is set to native and spark.sql.orc.enableVectorizedReader is set to true. For the Hive ORC serde tables (e.g., the ones created using the clause USING HIVE OPTIONS (fileFormat 'ORC')), the vectorized reader is used when spark.sql.hive.convertMetastoreOrc is also set to true.

    Property Name Default Meaning Since Version
    spark.sql.orc.impl native ORC 的实现模式。可以是 nativehivenative 表示原生 ORC, hive 表示 Hive 中的 ORC 类库。 2.3.0
    spark.sql.orc.enableVectorizedReader true native 模式下开启向量化读取 ORC 文件。如果设置为 false,会使用非向量化的方式读取 ORC 文件。对于 hive 模式,该配置被忽略。 2.3.0

    JSON 文件

    Spark SQL 可以自动推断 JSON 文件的模式并将其加载为一个 Dataset[Row]。可以使用方法读取一个 Dataset[String] 或是 JSON 文件。

    注意所,读取的 JSON 文件不是经典的格式 JSON 文件。文件中每一行必须是一个独立的、自包含的、有效的 JSON 对象。详情参见 JSON Lines text format, also called newline-delimited JSON

    对于常见的多行 JSON 文件,请设置选项 multiLinetrue

    // Primitive types (Int, String, etc) and Product types (case classes) encoders are
    // supported by importing this when creating a Dataset.
    import spark.implicits._
    
    // A JSON dataset is pointed to by path.
    // The path can be either a single text file or a directory storing text files
    val path = "examples/src/main/resources/people.json"
    val peopleDF = spark.read.json(path)
    
    // The inferred schema can be visualized using the printSchema() method
    peopleDF.printSchema()
    // root
    //  |-- age: long (nullable = true)
    //  |-- name: string (nullable = true)
    
    // Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")
    
    // SQL statements can be run by using the sql methods provided by spark
    val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
    teenagerNamesDF.show()
    // +------+
    // |  name|
    // +------+
    // |Justin|
    // +------+
    
    // Alternatively, a DataFrame can be created for a JSON dataset represented by
    // a Dataset[String] storing one JSON object per string
    val otherPeopleDataset = spark.createDataset(
      """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    val otherPeople = spark.read.json(otherPeopleDataset)
    otherPeople.show()
    // +---------------+----+
    // |        address|name|
    // +---------------+----+
    // |[Columbus,Ohio]| Yin|
    // +---------------+----+
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    Hive 表

    Spark SQL 也支持读写存储在 Hive(Apache Hive)中的数据。然而,由于 Hive 有很多很多依赖,这些依赖默认没有包含在 Spark 发布的版本中。如果 Hive 依赖可以再 classpath 中找到,Spark 会自动加载它们。注意这些 Hive 依赖需要同时在所有的的 worker 节点上,为了方位存储在 Hive 中的数据需要用到 Hive 的序列化和反序列化类库(SerDes)。

    需要将 Hive 的配置文件 hive-site.xmlcore-site.xml(其中的安全配置)和 hdfs-site.xml(HDFS 配置) 放到 conf/ 目录下。

    集成 Hive 功能,必须在初始化 SparkSession 时配置开启 Hive 支持,包括一个跟 Hive metastore 的长连接,对 Hive serdes 的支持,和 Hive UDF。如果没有安装 Hive 也可以开启 Hive 支持,若没有配置 hive-site.xml,Spark 会在当前目录自动创建一个 metastore_db 文件和一个由参数 spark.sql.warehouse.dir 指定的目录,默认值是当前目录下的 spark-warehouse,这里所说的当前目录是指 Spark 应用程序启动的目录。注意从 Spark 2.0.0 开始,hive-site.xml 文件中的 hive.metastore.warehouse.dir 配置项已经被标记为弃用,目前使用的是参数 spark.sql.warehouse.dir 指定数据仓库的位置,启动程序的用户必须对该目录拥有写权限。

    import java.io.File
    
    import org.apache.spark.sql.{Row, SaveMode, SparkSession}
    
    case class Record(key: Int, value: String)
    
    // warehouseLocation points to the default location for managed databases and tables
    val warehouseLocation = new File("spark-warehouse").getAbsolutePath
    
    val spark = SparkSession
      .builder()
      .appName("Spark Hive Example")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .enableHiveSupport()
      .getOrCreate()
    
    import spark.implicits._
    import spark.sql
    
    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
    sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
    
    // Queries are expressed in HiveQL
    sql("SELECT * FROM src").show()
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...
    
    // Aggregation queries are also supported.
    sql("SELECT COUNT(*) FROM src").show()
    // +--------+
    // |count(1)|
    // +--------+
    // |    500 |
    // +--------+
    
    // The results of SQL queries are themselves DataFrames and support all normal functions.
    val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
    
    // The items in DataFrames are of type Row, which allows you to access each column by ordinal.
    val stringsDS = sqlDF.map {
      case Row(key: Int, value: String) => s"Key: $key, Value: $value"
    }
    stringsDS.show()
    // +--------------------+
    // |               value|
    // +--------------------+
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // ...
    
    // You can also use DataFrames to create temporary views within a SparkSession.
    val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
    recordsDF.createOrReplaceTempView("records")
    
    // Queries can then join DataFrame data with data stored in Hive.
    sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
    // +---+------+---+------+
    // |key| value|key| value|
    // +---+------+---+------+
    // |  2| val_2|  2| val_2|
    // |  4| val_4|  4| val_4|
    // |  5| val_5|  5| val_5|
    // ...
    
    // Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
    // `USING hive`
    sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
    // Save DataFrame to the Hive managed table
    val df = spark.table("src")
    df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
    // After insertion, the Hive managed table has data now
    sql("SELECT * FROM hive_records").show()
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...
    
    // Prepare a Parquet data directory
    val dataDir = "/tmp/parquet_data"
    spark.range(10).write.parquet(dataDir)
    // Create a Hive external Parquet table
    sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
    // The Hive external table should already have data
    sql("SELECT * FROM hive_bigints").show()
    // +---+
    // | id|
    // +---+
    // |  0|
    // |  1|
    // |  2|
    // ... Order may vary, as spark processes the partitions in parallel.
    
    // Turn on flag for Hive Dynamic Partitioning
    spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
    // Create a Hive partitioned table using DataFrame API
    df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
    // Partitioned column `key` will be moved to the end of the schema.
    sql("SELECT * FROM hive_part_tbl").show()
    // +-------+---+
    // |  value|key|
    // +-------+---+
    // |val_238|238|
    // | val_86| 86|
    // |val_311|311|
    // ...
    
    spark.stop()
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala」。

    指定 Hive 表的存储格式

    创建一个 Hive 表时,需要定义这个表该如何从文件系统中读取以及写入,即「input format」和「output format」。还需要定义这个表应该如何进行正反序列化的操作,即「serde」。下面的选项可以用来指定存储格式(「serde」,「input format」,「output format」),例如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。默认情况下,表数据将会以文本的格式读取。注意,目前在创建表时还不支持 Hive storage handler,你可以在 Hive 中适用 Hive storage handler 创建表,然后通过 Spark SQL 来读取。

    Property Name Meaning
    fileFormat 指定数据存储的文件格式,包括「serde」,「input format」,「output format」。目前支持 6 种文件格式:'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'。
    inputFormat, outputFormat 这两个选项指定 InputFormatOutputFormat 相应的全限定类名,例如: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。这两个选项必须成对出现,而且不能在设置了 fileFormat 选项的情况下再设置。
    serde 该选项指定 serde 的全限定类名。在选项 fileFormat 被指定的情况下,如果其中已经包含了 serde 信息,那么请不要设置该选项。目前「sequencefile」,「textfile」,「rcfile」格式并不包含 serde 信息,所以可以在这 3 种数据格式的情况下使用该选项。
    fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这项选项只适用于「textfile」文件格式,它们定义了如何将文本分割成数据行。

    所有其他的在 OPTIONS 中定义的属性都会被当做 Hive serde 属性。

    与不同版本的 Hive Metastore 交互

    Spark SQL 对 Hive 支持的一个很重要的点就是与 Hive metastore 进行交互,这可以让 Spark SQL 能够访问 Hive 表的元信息。从 Spark 1.4.0 开始,一个单独的 Spark SQL 构建版本可以用来跟不同版本的 Hive metastore 进行交互,请使用下面的配置。注意,与 Hive 的版本无关,这里说的的是 metastore 的版本,在 Spark SQL 内部会编译内建的 Hive 并使用那些类来执行内部流程(serdes, UDFs, UDAFs 等等)。

    下列选项用来配置 Hive 版本,以获取元数据:

    Property Name Default Meaning Since Version
    spark.sql.hive.metastore.version 2.3.7 Hive metastore 的版本,可选项从 0.12.02.3.7,从 3.0.03.1.2 1.4.0
    spark.sql.hive.metastore.jars builtin 初始化 HiveMetastoreClient 所需要的 Jar 包的位置。该选项有三种可选值:1、builtin 使用 Hive 2.3.7,跟 Spark 的集成构建绑定,编译时使用 -Phive 参数生效,此时参数 spark.sql.hive.metastore.version 的值必须是 2.3.7 或者未定义;2、maven 使用从 Maven 仓库下载的指定版本的 Hive 依赖,这种配置在生产环境不建议使用。3、一个适用于 JVM 的标准的 classpath 路径,该路径中必须包含所有 Hive 相关的类库及其依赖,包括正确版本的 Hadoop。只有 driver 程序需要这些依赖,但是如果使用 yarn cluster 模式启动应用程序,就需要把这些依赖装进你的应用程序 Jar 文件。 1.4.0
    spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc 以逗号分隔的全限定类名前缀列表,相应的类需要通过 Spark SQL 和指定版本的 Hive 共享的类加载器进行加载。例如其中一个需要被共享的依赖就是连接 metastore 需要用到的 JDBC 驱动包。其他需要被共享的依赖就是那些已经被共享的依赖,例如 log4j。 1.4.0
    spark.sql.hive.metastore.barrierPrefixes (empty) 以逗号分隔的全限定类名前缀列表,这些依赖需要被 Spark SQL 连接的 Hive 的相应版本重新加载。例如,定义了 Hive UDF 的依赖应该被共享(即 org.apache.spark.*)。 1.4.0

    JDBC 连接其他数据库

    Spark SQL 还提供了一个可以通过 JDBC 连接其他数据库的数据源,该功能需要通过 JdbcRDD 来实现。结果将会以 DataFrame 的形式返回,之后就可以通过 Spark SQL 进行处理或者与其他数据源进行连接。JDBC 数据源在 Java 和 Python 中用起来也很简单,并不要求提供一个 ClassTag。(注意 JDBC 数据源和 Spark SQL JDBC server 不是一回事,后者可以让应用程序通过 Spark SQL 执行查询)

    要使用 JDBC 数据源,需要将相应的 JDBC 驱动包放到 spark 的 classpath 中。例如,如果想从 Spark Shell 中连接 PostGRE 数据库需要执行以下命令:

    ./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
    

    远端数据库中的表可以通过数据源 API 被加载成为一个 DataFrame 或者 Spark SQL 临时视图。用户可以通过数据源选项指定 JDBC 连接的参数。userpassword 选项通常用来登录数据库,Spark 还支持下列大小写不敏感的选项:

    Property Name Meaning
    url 连接 JDBC 所用的 URL。有些选项可以包含在 URL 中。例如,jdbc:postgresql://localhost/test?user=fred&password=secret
    dbtable 需要读写的表。当该选项被用作读取时的参数时,任何 SQL 中有效的 FROM 语句都可以被使用。例如,可以不填写完整的表名而是一个用小括号括起来的子查询。不允许同时指定 dbtablequery 选项。
    query 一个从数据库中读取数据的查询语句。被指定的查询会被小括号括起来作为 FROM 语句的子查询,Spark 会为该子查询分配一个别名。例如,Spark 会组织一个这样的语句来访问 JDBC:SELECT <columns> FROM (<user_specified_query>) spark_gen_alias。下面是一些使用该选项的限制。不允许同时指定 dbtablequery 选项。不允许同时指定 partitionColumnquery 选项;当需要指定 partitionColumn 选项时,请使用 dbtable 来指定子查询,分区列可以通过子查询中的别名来指定。示例:spark.read.format("jdbc").option("url", jdbcUrl).option("query", "select c1, c2 from t1").load()
    driver JDBC 驱动的全限定类名。
    partitionColumn, lowerBound, upperBound 这些选项必须同时指定,此外,选项 numPartitions 也必须指定。这些选项一同定义了如何从数据库并行读取数据。partitionColumn 选项必须是一个数字,日期或者时间戳类型。注意 lowerBoundupperBound 选项只影响分区步距,并不过滤表中的数据。所以表中的所有数据会被分区之后读取,这些选项只在读取时有效。
    numPartitions 读写数据时可以被用到的最大分区数,该选项也决定了最大并发 JDBC 连接数。如果写出时的分区数超过了这个限制,会在写出之前调用 coalesce(numPartitions) 方法削减分区。
    queryTimeout JDBC 查询超时时间,单位为秒,零表示无限制。在写出时,该选项取决于 JDBC 驱动实现 setQueryTimeout API 的方式,例如,h2 JDBC 驱动每次查询都会检查超时时间,而不是对于整个 JDBC 批次检查。默认值为 0
    fetchsize JDBC 拉取数据的数量,决定了一次拉取多少行数据。该选项可以帮助提升那些默认拉取值很小 JDBC 的性能(例如 Oracle 是一次 10 行)。该选项只在读取数据时有效。
    batchsize JDBC 批次数据的数量,决定了一次写出多少数据。该选项可以帮助提升 JDBC 的性能。该选项只在写出数据时有效。默认值为 1000
    isolationLevel 事务隔离级别,应用于当前连接。可以是 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ,或者 SERIALIZABLE,与 JDBC 连接定义的标准事务隔离级别相对应,默认值是 READ_UNCOMMITTED。该选项只在写出数据时有效。详情参见 java.sql.Connection
    sessionInitStatement 在连接到远端数据库的会话开启之后,在读取数据之前,执行该选项所定义的 SQL 语句(或是 PL/SQL 块),可以通过该选项做一些会话初始化工作。示例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")
    truncate 当存储模式设置为 SaveMode.Overwrite 时,该选项会在写出时清空表而不是删除后再创建。这样会更高效一些,还避免了删除表的元数据(比如说索引)。然而在某些情况下该选项不会生效,比如在新数据跟原始表的模式不同的时候。默认值为 false。该选项只在写出数据时有效。
    cascadeTruncate 这是一个 JDBC 写出时选项。如果开启,同时数据库也支持(目前只有 PostgreSQL 和 Oracle),该选项在写出前会执行 TRUNCATE TABLE t CASCADE 语句(在 PostgreSQL 中是 TRUNCATE TABLE ONLY t CASCADE,来避免无意间清空了衍生表)。该选项会影响其他的表,需要谨慎使用。该选项只在写出数据时有效。默认值为相应数据库默认的 cascading truncate 行为,通过 JDBC 会话中的 isCascadeTruncate 参数指定。
    createTableOptions 这是一个 JDBC 写出时选项。该选项可以指定写出数据时的建表语句(例如 CREATE TABLE t (name string) ENGINE=InnoDB.)。该选项只在写出数据时有效。
    createTableColumnTypes 在建表时指定每列的数据类型而不是采用默认值。数据类型信息应该和 CREATE TABLE 语句中指定的一致(例如:"name CHAR(64), comments VARCHAR(1024)"))。被指定的类型应该是有效的 Spark SQL 数据类型。该选项只在写出数据时有效。
    customSchema 从数据库中读取数据时自定义数据模式,例如:"id DECIMAL(38, 0), name STRING"。可以只指定部分列,其他列使用默认类型映射,例如:"id DECIMAL(38, 0)"。列名应该和数据库表中的字段名相同。用户可以通过该选项指定 Spark SQL 中的相应类型而不是使用默认映射。该选项只在读取数据时有效。
    pushDownPredicate 是否开启谓词下推机制,默认值是 true,Spark 会尽力将过滤语句下推到数据库执行。否则,如果被设置为 false,不会有谓词下推,所有的过滤操作都会由 Spark 来执行。
    // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
    // Loading data from a JDBC source
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .load()
    
    val connectionProperties = new Properties()
    connectionProperties.put("user", "username")
    connectionProperties.put("password", "password")
    val jdbcDF2 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // Specifying the custom data types of the read schema
    connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
    val jdbcDF3 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    
    // Saving data to a JDBC source
    jdbcDF.write
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .save()
    
    jdbcDF2.write
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    
    // Specifying create table column data types on write
    jdbcDF.write
      .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    

    完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

    Avro 文件

    从 Spark 2.4 开始, Spark SQL 对 Avro 数据的读写提供了原生支持。

    部署

    spark-avro 是额外的模块,默认情况下并不包含在 spark-submit 或者 spark-shell 的 classpath 中。

    对于 Spark 应用程序来说, spark-submit 脚本用来启动应用程序。spark-avro_2.12 和它的依赖可以通过添加到 spark-submit 脚本的参数中,就像:

    ./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.0.0 ...
    

    对于 spark-shell 脚本,也可以通过 --packages 参数直接添加 org.apache.spark:spark-avro_2.12 和它的依赖:

    ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.0 ...
    

    更多有关启动应用程序添加额外依赖的详情参见 Application Submission Guide

    读写函数

    由于 spark-avro 模块是外部的,在 DataFrameReader 或者 DataFrameWriter 中并不存在 .avro API。

    读写 Avro 格式的数据,需要指定数据源选项 formatavro(或者 org.apache.spark.sql.avro)。

    val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
    usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
    

    to_avro() 和 from_avro()

    Avro 依赖提供了 to_avro() 函数来编码一列数据为 Avro 格式,以及 from_avro() 函数来解码 Avro 数据成为一列对象。两个函数都将一列转换为另外一列,输入输出的 SQL 数据类型可以是复杂类型或者基础类型。

    当从一个像 Kafka 一样的流失数据源读写数据时,使用 Avro record 作为一列很方便。每一个 Kafka 键值对记录会跟一些元信息一起被读取进来,比如 Kafka 收录这条记录时的时间戳,记录偏移量等等。

    • 如果字段中包含的数据是 Avro 格式,可以使用 from_avro() 函数来抽取数据,改进,清洗,然后再输出到 Kafka 中去 或者写入到文件。
    • 函数可以用来将结构化数据转换为 Avro record。对于需要将多列融合为一列写出到 Kafka 的场景很适用。

    两个函数目前已支持 Scala 和 Java 语言。

    import org.apache.spark.sql.avro.functions._
    
    // `from_avro` requires Avro schema in JSON string format.
    val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
    
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "topic1")
      .load()
    
    // 1. Decode the Avro data into a struct;
    // 2. Filter by column `favorite_color`;
    // 3. Encode the column `name` in Avro format.
    val output = df
      .select(from_avro('value, jsonFormatSchema) as 'user)
      .where("user.favorite_color == \"red\"")
      .select(to_avro($"user.name") as 'value)
    
    val query = output
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("topic", "topic2")
      .start()
    

    数据源选项

    Avro 数据源选项可以通过以下两种方式设置:

    • DataFrameReader 或者 DataFrameWriter 中的 .option 方法。
    • 函数 from_avro 中的 options 参数。
    Property Name Default Meaning Scope
    avroSchema None 可选,用户提供的 JSON 格式的模式。在读取 Avro 数据时,此选项可以用来定义一个演化的模式,可以和实际的 Avro 模式不同,但必须兼容。反序列化的模式会和演化模式一致。 例如,如果设置一个拥有一个额外字段(有默认值)的演化模式,Spark 读取的结果中也会包含这个新字段。当写出数据为 Avro 时,该选项可以调和目标模式与 Spark 转换的模式不匹配的情况。例如,目标模式中有一个列式枚举类型,而 Spark 生成的模式中该字段是字符串类型。 read, write and function from_avro
    recordName topLevelRecord 写出数据时的 record 名称,Avro 需要指定。 write
    recordNamespace "" 写出数据时的 record 命名空间。 write
    ignoreExtension true 该选项控制在读取数据时是否忽略文件的 .avro 后缀。如果开启,所有的文件都会被加载(有或者没有 .avro 后缀)。该项选已被弃用,会在将来的版本中移除。请使用更通用的数据源选项 pathGlobFilter 来过滤文件。 read
    compression snappy compression 选项指定在写出数据时的压缩格式,目前支持的选项有 uncompressed, snappy, deflate, bzip2xz。如果没有指定,会使用配置中的 spark.sql.avro.compression.codec 参数。 write
    mode FAILFAST mode 选项指定 from_avro 函数的解析模式,目前支持的模式有 FAILFAST:在处理损坏文件时抛出异常;PERMISSIVE:损坏的记录被当做空值处理。所以,数据模式会被强制改变为可以为空值,这有可能与用户提供的模式不同。 function from_avro

    配置项

    可以通过 SparkSession 对象的 setConf 方法或者执行 SQL 中的 SET key=value 命令来改变的配置项。

    Property Name Default Meaning Since Version
    spark.sql.legacy.replaceDatabricksSparkAvro.enabled true 如果设置为 true,为了向后兼容,数据源 provider com.databricks.spark.avro 会被作为外部数据源模块映射为内置模式。 2.4.0
    spark.sql.avro.compression.codec snappy 写出 Avro 文件时使用的压缩格式。支持的压缩格式有:uncompressed, deflate, snappy, bzip2 和 xz。默认值为 snappy。 2.4.0
    spark.sql.avro.deflate.level -1 写出 Avro 文件时石笋的压缩级别,有效值必须在 1 到9 之间或者 -1。默认值为 -1,在目前的版本中相当于 6。 2.4.0

    与 Databricks spark-avro 的兼容

    该 Avro 数据源模块衍生于 Databricks 的开源版本 spark-avro,并与之保持兼容。

    默认情况下 spark.sql.legacy.replaceDatabricksSparkAvro.enabled 配置是开启的,数据源 provider com.databricks.spark.avro 会被映射为内置的 Avro 模块。对于通过 com.databricks.spark.avro 中的 Provider 创建的表,其元数据可以被内置 Avro 模块读取。

    注意,在 Databricks 的 spark-avro 中,隐式类 AvroDataFrameWriterAvroDataFrameReader 可以通过函数 .avro() 创建。而在内置的模块中,两个隐式类都被移除了。请在 DataFrameWriterDataFrameReader 对象中使用 .format("avro") 方法,足够简洁明了。

    如果你更想使用自己构建的 spark-avro jar 文件,可以将配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled 设置为 false,通过 --jars 参数来部署应用程序。详情参见 Advanced Dependency Management

    支持的数据类型 Avro -> Spark SQL

    目前 Spark 支持读取 Avro record 中所有的基础数据类型(primitive types)和复杂数据类型(complex types)。

    Avro type Spark SQL type
    boolean BooleanType
    int IntegerType
    long LongType
    float FloatType
    double DoubleType
    string StringType
    enum StringType
    fixed BinaryType
    bytes BinaryType
    record StructType
    array ArrayType
    map MapType
    union See below

    除了上面列出的类型之外,还支持读取 union 类型,下面三种类型被解析为 union 类型:

    1. union(int, long) 会被映射为 LongType。
    2. union(float, double) 会被映射为 DoubleType。
    3. union(something, null),其中 something 是任意支持的 Avro 类型。该类型会被映射为与 something 相关的 Spark SQL 类型(nullable 设置为 true)。所有其他的类型会被当做复杂数据类型,会被映射为 StructType,其中的字段名称分别为 member0,member1 等等,数量与 union 中的类型数量一致。在 Avro 和 Parquet 之间相互转换时也遵循同样的原则。

    还支持读取下列 Avro 逻辑类型(logical types):

    Avro logical type Avro type Spark SQL type
    date int DateType
    timestamp-millis long TimestampType
    timestamp-micros long TimestampType
    decimal fixed DecimalType
    decimal bytes DecimalType

    目前,Spark SQL 会忽略 Avro 文件的 docs,aliases 和其他属性。

    支持的数据类型 Spark SQL -> Avro conversion

    Spark 支持写入 Spark SQL 数据类型的数据到 Avro。对于大多数数据类型,从 Spark SQL 数据类型到 Avro 类型的映射是很直接的(比如说 IntegerType 映射为 int);然而还有一些特殊的情况:

    Spark SQL type Avro type Avro logical type
    ByteType int
    ShortType int
    BinaryType bytes
    DateType int date
    TimestampType long timestamp-micros
    DecimalType fixed decimal

    还可以通过选项 avroSchema 指定完整的 Avro 模式,这样 Spark SQL 数据类型会被转换为另一种 Avro 数据类型。下列转换不能通过默认映射执行,需要用户指定 Avro 模式:

    Spark SQL type Avro type Avro logical type
    BinaryType fixed
    StringType enum
    TimestampType long timestamp-millis
    DecimalType bytes decimal

    二进制文件

    从 Spark 3.0 开始,Spark 支持二进制文件数据源,可以读取二进制文件,将每个文件转换为一条记录,记录中包含了完整的文件数据。生成的 DataFrame 会包含以下字段以及可能的分区字段:

    • path: StringType
    • modificationTime: TimestampType
    • length: LongType
    • content: BinaryType

    读取完整的二进制文件需要指定数据源选项 formatbinaryFile,可以通过数据源选项 pathGlobFilter 来过滤需要加载的文件同时不影响分区发现的功能。例如下面的代码读取指定路径中所有的 PNG 文件:

    spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data")
    

    二进制文件数据源不支持将一个 DataFrame 写回到原来的文件。

    疑难解答

    • JDBC 驱动包必须可以被 client 和所有 executor 的 Application ClassLoader 加载。这是因为 Java 的 DriverManager 类会进行安全检查,会在打开数据库连接时忽略所有无法被 Application ClassLoader 加载的类。将相关的 Jar 文件加入到所有节点上的 compute_classpath.sh 脚本中是一种简便的解决方案。
    • 某些数据库,比如说 H2,将所有的名称转换为大写,此时在 Spark SQL 中引用那些名称时也需要用大写形式。
    • 用户可以在数据源选项中指定 JDBC 驱动包供应商提供的特殊属性。例如 spark.read.format("jdbc").option("url", oracleJdbcUrl).option("oracle.jdbc.mapDateToTimestamp", "false")oracle.jdbc.mapDateToTimestamp 默认为 true,用户通常会关闭该选项来避免 Oracle 中的 date 类型转换为 timestamp。

    相关文章

      网友评论

        本文标题:0302 Data Sources

        本文链接:https://www.haomeiwen.com/subject/mfbmfktx.html