美文网首页
spark sql 和 hive 关于 parquet sche

spark sql 和 hive 关于 parquet sche

作者: 邵红晓 | 来源:发表于2021-07-19 17:34 被阅读0次

数据加载方式

Spark中操作Parquet文件有两种方式
1、直接加载文件
spark.read.parquet("/path/")
2、通过Hive metastore 表来load表,确保hive-site.xml可以在classpath中加载到
version:spark 2.4.0

/etc/spark2/conf
-rw-r--r-- 1 root  root    996 May 11  2018 docker.properties.template
-rw-r--r-- 1 root  root   1105 May 11  2018 fairscheduler.xml.template
lrwxrwxrwx 1 root  root     14 Oct 18  2020 hbase-site.xml -> hbase-site.xml
-rw-r--r-- 1 spark spark  2878 Oct 17  2020 hive-site.xml
-rw-r--r-- 1 spark spark  1240 Oct 16  2020 log4j.properties
-rw-r--r-- 1 root  root   2025 May 11  2018 log4j.properties.template
-rw-r--r-- 1 spark spark  4956 Oct 16  2020 metrics.properties
-rw-r--r-- 1 root  root   7801 May 11  2018 metrics.properties.template
-rw-r--r-- 1 root  root    865 May 11  2018 slaves.template
-rw-r--r-- 1 spark spark   856 Dec 27  2020 spark-defaults.conf
-rw-r--r-- 1 root  root   1292 May 11  2018 spark-defaults.conf.template
-rw-r--r-- 1 spark spark  2319 Oct 30  2020 spark-env.sh
-rwxr-xr-x 1 root  root   4221 May 11  2018 spark-env.sh.template
-rwxr-xr-x 1 spark spark   244 Oct 16  2020 spark-thrift-fairscheduler.xml
-rw-r--r-- 1 hive  hadoop 2577 Dec 27  2020 spark-thrift-sparkconf.conf

//创建sparkConf
val spark = SparkSession
      .builder()
      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .config("spark.kryo.registrator",classOf[ToKryoRegistrator].getName)
      .enableHiveSupport()
      .getOrCreate()
sql_define= ""
sql(sql_define)

兼容问题

1、hdfs上parquet文件之间的schema不一致
2、Hive Metastore Schema与Parquet文件Schema不一致

解决1

package org.apache.spark.sql.execution.datasources.parquet inferSchema(基于文件推断模式schema)

val filesToTouch =
      if (shouldMergeSchemas) {
        // Also includes summary files, 'cause there might be empty partition directories.

        // If mergeRespectSummaries config is true, we assume that all part-files are the same for
        // their schema with summary files, so we ignore them when merging schema.
        // If the config is disabled, which is the default setting, we merge all part-files.
        // In this mode, we only need to merge schemas contained in all those summary files.
        // You should enable this configuration only if you are very sure that for the parquet
        // part-files to read there are corresponding summary files containing correct schema.

        // As filed in SPARK-11500, the order of files to touch is a matter, which might affect
        // the ordering of the output columns. There are several things to mention here.
        //
        //  1. If mergeRespectSummaries config is false, then it merges schemas by reducing from
        //     the first part-file so that the columns of the lexicographically first file show
        //     first.
        //
        //  2. If mergeRespectSummaries config is true, then there should be, at least,
        //     "_metadata"s for all given files, so that we can ensure the columns of
        //     the lexicographically first file show first.
        //
        //  3. If shouldMergeSchemas is false, but when multiple files are given, there is
        //     no guarantee of the output order, since there might not be a summary file for the
        //     lexicographically first file, which ends up putting ahead the columns of
        //     the other files. However, this should be okay since not enabling
        //     shouldMergeSchemas means (assumes) all the files have the same schemas.

        val needMerged: Seq[FileStatus] =
          if (mergeRespectSummaries) {
            Seq.empty
          } else {
            filesByType.data
          }
        needMerged ++ filesByType.metadata ++ filesByType.commonMetadata
      } else {
        // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
        // don't have this.
        filesByType.commonMetadata.headOption
            // Falls back to "_metadata"
            .orElse(filesByType.metadata.headOption)
            // Summary file(s) not found, the Parquet file is either corrupted, or different part-
            // files contain conflicting user defined metadata (two or more values are associated
            // with a same key in different files).  In either case, we fall back to any of the
            // first part-file, and just assume all schemas are consistent.
            .orElse(filesByType.data.headOption)
            .toSeq
      }
    ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)

源码中分析
1、如果spark.sql.hive.convertMetastoreParquet.mergeSchema=false 默认,直接取集合中第一个文件作为
本次读取的数据的schema
2、如果spark.sql.hive.convertMetastoreParquet.mergeSchema=true则合并所有文件的schema,但是并行读取所有文件存在有性能问题,建议不这么用
注意:也可以在read.parquet指定schema,如果注意数据类型问题,否则报错

解决2

spark sql 直接读取hive metastore中的路径分区信息的元数据信息
package org.apache.spark.sql.hive mergeWithMetastoreSchema(hive合并schema)

 def mergeWithMetastoreSchema(
      metastoreSchema: StructType,
      inferredSchema: StructType): StructType = try {
    // Find any nullable fields in mestastore schema that are missing from the inferred schema.
    val metastoreFields = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
    val missingNullables = metastoreFields
      .filterKeys(!inferredSchema.map(_.name.toLowerCase).contains(_))
      .values
      .filter(_.nullable)
    // Merge missing nullable fields to inferred schema and build a case-insensitive field map.
    val inferredFields = StructType(inferredSchema ++ missingNullables)
      .map(f => f.name.toLowerCase -> f).toMap
    StructType(metastoreSchema.map(f => f.copy(name = inferredFields(f.name).name)))
  } catch {
    case NonFatal(_) =>
      val msg = s"""Detected conflicting schemas when merging the schema obtained from the Hive
         | Metastore with the one inferred from the file format. Metastore schema:
         |${metastoreSchema.prettyJson}
         |
         |Inferred schema:
         |${inferredSchema.prettyJson}
       """.stripMargin
      throw new SparkException(msg)
  }

源码分析
1、metastore中的字段名称和推断schema中的字段类型信息合成新schema,合并信息类型必须一致否则报错
2、合并schema 结果只hive metastore中包含的字段,如果parquet文件中没有,则为null

相关文章

网友评论

      本文标题:spark sql 和 hive 关于 parquet sche

      本文链接:https://www.haomeiwen.com/subject/bgycmltx.html