spark sql 和 hive 关于 parquet sche

作者: 邵红晓 | 来源:发表于2021-07-19 17:34 被阅读0次


2、通过Hive metastore 表来load表,确保hive-site.xml可以在classpath中加载到
version:spark 2.4.0

-rw-r--r-- 1 root  root    996 May 11  2018 docker.properties.template
-rw-r--r-- 1 root  root   1105 May 11  2018 fairscheduler.xml.template
lrwxrwxrwx 1 root  root     14 Oct 18  2020 hbase-site.xml -> hbase-site.xml
-rw-r--r-- 1 spark spark  2878 Oct 17  2020 hive-site.xml
-rw-r--r-- 1 spark spark  1240 Oct 16  2020 log4j.properties
-rw-r--r-- 1 root  root   2025 May 11  2018 log4j.properties.template
-rw-r--r-- 1 spark spark  4956 Oct 16  2020 metrics.properties
-rw-r--r-- 1 root  root   7801 May 11  2018 metrics.properties.template
-rw-r--r-- 1 root  root    865 May 11  2018 slaves.template
-rw-r--r-- 1 spark spark   856 Dec 27  2020 spark-defaults.conf
-rw-r--r-- 1 root  root   1292 May 11  2018 spark-defaults.conf.template
-rw-r--r-- 1 spark spark  2319 Oct 30  2020 spark-env.sh
-rwxr-xr-x 1 root  root   4221 May 11  2018 spark-env.sh.template
-rwxr-xr-x 1 spark spark   244 Oct 16  2020 spark-thrift-fairscheduler.xml
-rw-r--r-- 1 hive  hadoop 2577 Dec 27  2020 spark-thrift-sparkconf.conf

val spark = SparkSession
sql_define= ""


2、Hive Metastore Schema与Parquet文件Schema不一致


package org.apache.spark.sql.execution.datasources.parquet inferSchema(基于文件推断模式schema)

val filesToTouch =
      if (shouldMergeSchemas) {
        // Also includes summary files, 'cause there might be empty partition directories.

        // If mergeRespectSummaries config is true, we assume that all part-files are the same for
        // their schema with summary files, so we ignore them when merging schema.
        // If the config is disabled, which is the default setting, we merge all part-files.
        // In this mode, we only need to merge schemas contained in all those summary files.
        // You should enable this configuration only if you are very sure that for the parquet
        // part-files to read there are corresponding summary files containing correct schema.

        // As filed in SPARK-11500, the order of files to touch is a matter, which might affect
        // the ordering of the output columns. There are several things to mention here.
        //  1. If mergeRespectSummaries config is false, then it merges schemas by reducing from
        //     the first part-file so that the columns of the lexicographically first file show
        //     first.
        //  2. If mergeRespectSummaries config is true, then there should be, at least,
        //     "_metadata"s for all given files, so that we can ensure the columns of
        //     the lexicographically first file show first.
        //  3. If shouldMergeSchemas is false, but when multiple files are given, there is
        //     no guarantee of the output order, since there might not be a summary file for the
        //     lexicographically first file, which ends up putting ahead the columns of
        //     the other files. However, this should be okay since not enabling
        //     shouldMergeSchemas means (assumes) all the files have the same schemas.

        val needMerged: Seq[FileStatus] =
          if (mergeRespectSummaries) {
          } else {
        needMerged ++ filesByType.metadata ++ filesByType.commonMetadata
      } else {
        // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
        // don't have this.
            // Falls back to "_metadata"
            // Summary file(s) not found, the Parquet file is either corrupted, or different part-
            // files contain conflicting user defined metadata (two or more values are associated
            // with a same key in different files).  In either case, we fall back to any of the
            // first part-file, and just assume all schemas are consistent.
    ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)

1、如果spark.sql.hive.convertMetastoreParquet.mergeSchema=false 默认,直接取集合中第一个文件作为


spark sql 直接读取hive metastore中的路径分区信息的元数据信息
package org.apache.spark.sql.hive mergeWithMetastoreSchema(hive合并schema)

 def mergeWithMetastoreSchema(
      metastoreSchema: StructType,
      inferredSchema: StructType): StructType = try {
    // Find any nullable fields in mestastore schema that are missing from the inferred schema.
    val metastoreFields = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
    val missingNullables = metastoreFields
    // Merge missing nullable fields to inferred schema and build a case-insensitive field map.
    val inferredFields = StructType(inferredSchema ++ missingNullables)
      .map(f => f.name.toLowerCase -> f).toMap
    StructType(metastoreSchema.map(f => f.copy(name = inferredFields(f.name).name)))
  } catch {
    case NonFatal(_) =>
      val msg = s"""Detected conflicting schemas when merging the schema obtained from the Hive
         | Metastore with the one inferred from the file format. Metastore schema:
         |Inferred schema:
      throw new SparkException(msg)

2、合并schema 结果只hive metastore中包含的字段,如果parquet文件中没有,则为null



