美文网首页
spark sql 和 hive 关于 parquet sche

spark sql 和 hive 关于 parquet sche

作者: 邵红晓 | 来源:发表于2021-07-19 17:34 被阅读0次

    数据加载方式

    Spark中操作Parquet文件有两种方式
    1、直接加载文件
    spark.read.parquet("/path/")
    2、通过Hive metastore 表来load表,确保hive-site.xml可以在classpath中加载到
    version:spark 2.4.0

    /etc/spark2/conf
    -rw-r--r-- 1 root  root    996 May 11  2018 docker.properties.template
    -rw-r--r-- 1 root  root   1105 May 11  2018 fairscheduler.xml.template
    lrwxrwxrwx 1 root  root     14 Oct 18  2020 hbase-site.xml -> hbase-site.xml
    -rw-r--r-- 1 spark spark  2878 Oct 17  2020 hive-site.xml
    -rw-r--r-- 1 spark spark  1240 Oct 16  2020 log4j.properties
    -rw-r--r-- 1 root  root   2025 May 11  2018 log4j.properties.template
    -rw-r--r-- 1 spark spark  4956 Oct 16  2020 metrics.properties
    -rw-r--r-- 1 root  root   7801 May 11  2018 metrics.properties.template
    -rw-r--r-- 1 root  root    865 May 11  2018 slaves.template
    -rw-r--r-- 1 spark spark   856 Dec 27  2020 spark-defaults.conf
    -rw-r--r-- 1 root  root   1292 May 11  2018 spark-defaults.conf.template
    -rw-r--r-- 1 spark spark  2319 Oct 30  2020 spark-env.sh
    -rwxr-xr-x 1 root  root   4221 May 11  2018 spark-env.sh.template
    -rwxr-xr-x 1 spark spark   244 Oct 16  2020 spark-thrift-fairscheduler.xml
    -rw-r--r-- 1 hive  hadoop 2577 Dec 27  2020 spark-thrift-sparkconf.conf
    
    //创建sparkConf
    val spark = SparkSession
          .builder()
          .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
          .config("spark.kryo.registrator",classOf[ToKryoRegistrator].getName)
          .enableHiveSupport()
          .getOrCreate()
    sql_define= ""
    sql(sql_define)
    

    兼容问题

    1、hdfs上parquet文件之间的schema不一致
    2、Hive Metastore Schema与Parquet文件Schema不一致

    解决1

    package org.apache.spark.sql.execution.datasources.parquet inferSchema(基于文件推断模式schema)

    val filesToTouch =
          if (shouldMergeSchemas) {
            // Also includes summary files, 'cause there might be empty partition directories.
    
            // If mergeRespectSummaries config is true, we assume that all part-files are the same for
            // their schema with summary files, so we ignore them when merging schema.
            // If the config is disabled, which is the default setting, we merge all part-files.
            // In this mode, we only need to merge schemas contained in all those summary files.
            // You should enable this configuration only if you are very sure that for the parquet
            // part-files to read there are corresponding summary files containing correct schema.
    
            // As filed in SPARK-11500, the order of files to touch is a matter, which might affect
            // the ordering of the output columns. There are several things to mention here.
            //
            //  1. If mergeRespectSummaries config is false, then it merges schemas by reducing from
            //     the first part-file so that the columns of the lexicographically first file show
            //     first.
            //
            //  2. If mergeRespectSummaries config is true, then there should be, at least,
            //     "_metadata"s for all given files, so that we can ensure the columns of
            //     the lexicographically first file show first.
            //
            //  3. If shouldMergeSchemas is false, but when multiple files are given, there is
            //     no guarantee of the output order, since there might not be a summary file for the
            //     lexicographically first file, which ends up putting ahead the columns of
            //     the other files. However, this should be okay since not enabling
            //     shouldMergeSchemas means (assumes) all the files have the same schemas.
    
            val needMerged: Seq[FileStatus] =
              if (mergeRespectSummaries) {
                Seq.empty
              } else {
                filesByType.data
              }
            needMerged ++ filesByType.metadata ++ filesByType.commonMetadata
          } else {
            // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
            // don't have this.
            filesByType.commonMetadata.headOption
                // Falls back to "_metadata"
                .orElse(filesByType.metadata.headOption)
                // Summary file(s) not found, the Parquet file is either corrupted, or different part-
                // files contain conflicting user defined metadata (two or more values are associated
                // with a same key in different files).  In either case, we fall back to any of the
                // first part-file, and just assume all schemas are consistent.
                .orElse(filesByType.data.headOption)
                .toSeq
          }
        ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
    

    源码中分析
    1、如果spark.sql.hive.convertMetastoreParquet.mergeSchema=false 默认,直接取集合中第一个文件作为
    本次读取的数据的schema
    2、如果spark.sql.hive.convertMetastoreParquet.mergeSchema=true则合并所有文件的schema,但是并行读取所有文件存在有性能问题,建议不这么用
    注意:也可以在read.parquet指定schema,如果注意数据类型问题,否则报错

    解决2

    spark sql 直接读取hive metastore中的路径分区信息的元数据信息
    package org.apache.spark.sql.hive mergeWithMetastoreSchema(hive合并schema)

     def mergeWithMetastoreSchema(
          metastoreSchema: StructType,
          inferredSchema: StructType): StructType = try {
        // Find any nullable fields in mestastore schema that are missing from the inferred schema.
        val metastoreFields = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
        val missingNullables = metastoreFields
          .filterKeys(!inferredSchema.map(_.name.toLowerCase).contains(_))
          .values
          .filter(_.nullable)
        // Merge missing nullable fields to inferred schema and build a case-insensitive field map.
        val inferredFields = StructType(inferredSchema ++ missingNullables)
          .map(f => f.name.toLowerCase -> f).toMap
        StructType(metastoreSchema.map(f => f.copy(name = inferredFields(f.name).name)))
      } catch {
        case NonFatal(_) =>
          val msg = s"""Detected conflicting schemas when merging the schema obtained from the Hive
             | Metastore with the one inferred from the file format. Metastore schema:
             |${metastoreSchema.prettyJson}
             |
             |Inferred schema:
             |${inferredSchema.prettyJson}
           """.stripMargin
          throw new SparkException(msg)
      }
    

    源码分析
    1、metastore中的字段名称和推断schema中的字段类型信息合成新schema,合并信息类型必须一致否则报错
    2、合并schema 结果只hive metastore中包含的字段,如果parquet文件中没有,则为null

    相关文章

      网友评论

          本文标题:spark sql 和 hive 关于 parquet sche

          本文链接:https://www.haomeiwen.com/subject/bgycmltx.html