数据加载方式
Spark中操作Parquet文件有两种方式
1、直接加载文件
spark.read.parquet("/path/")
2、通过Hive metastore 表来load表,确保hive-site.xml可以在classpath中加载到
version:spark 2.4.0
/etc/spark2/conf
-rw-r--r-- 1 root root 996 May 11 2018 docker.properties.template
-rw-r--r-- 1 root root 1105 May 11 2018 fairscheduler.xml.template
lrwxrwxrwx 1 root root 14 Oct 18 2020 hbase-site.xml -> hbase-site.xml
-rw-r--r-- 1 spark spark 2878 Oct 17 2020 hive-site.xml
-rw-r--r-- 1 spark spark 1240 Oct 16 2020 log4j.properties
-rw-r--r-- 1 root root 2025 May 11 2018 log4j.properties.template
-rw-r--r-- 1 spark spark 4956 Oct 16 2020 metrics.properties
-rw-r--r-- 1 root root 7801 May 11 2018 metrics.properties.template
-rw-r--r-- 1 root root 865 May 11 2018 slaves.template
-rw-r--r-- 1 spark spark 856 Dec 27 2020 spark-defaults.conf
-rw-r--r-- 1 root root 1292 May 11 2018 spark-defaults.conf.template
-rw-r--r-- 1 spark spark 2319 Oct 30 2020 spark-env.sh
-rwxr-xr-x 1 root root 4221 May 11 2018 spark-env.sh.template
-rwxr-xr-x 1 spark spark 244 Oct 16 2020 spark-thrift-fairscheduler.xml
-rw-r--r-- 1 hive hadoop 2577 Dec 27 2020 spark-thrift-sparkconf.conf
//创建sparkConf
val spark = SparkSession
.builder()
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator",classOf[ToKryoRegistrator].getName)
.enableHiveSupport()
.getOrCreate()
sql_define= ""
sql(sql_define)
兼容问题
1、hdfs上parquet文件之间的schema不一致
2、Hive Metastore Schema与Parquet文件Schema不一致
解决1
package org.apache.spark.sql.execution.datasources.parquet inferSchema(基于文件推断模式schema)
val filesToTouch =
if (shouldMergeSchemas) {
// Also includes summary files, 'cause there might be empty partition directories.
// If mergeRespectSummaries config is true, we assume that all part-files are the same for
// their schema with summary files, so we ignore them when merging schema.
// If the config is disabled, which is the default setting, we merge all part-files.
// In this mode, we only need to merge schemas contained in all those summary files.
// You should enable this configuration only if you are very sure that for the parquet
// part-files to read there are corresponding summary files containing correct schema.
// As filed in SPARK-11500, the order of files to touch is a matter, which might affect
// the ordering of the output columns. There are several things to mention here.
//
// 1. If mergeRespectSummaries config is false, then it merges schemas by reducing from
// the first part-file so that the columns of the lexicographically first file show
// first.
//
// 2. If mergeRespectSummaries config is true, then there should be, at least,
// "_metadata"s for all given files, so that we can ensure the columns of
// the lexicographically first file show first.
//
// 3. If shouldMergeSchemas is false, but when multiple files are given, there is
// no guarantee of the output order, since there might not be a summary file for the
// lexicographically first file, which ends up putting ahead the columns of
// the other files. However, this should be okay since not enabling
// shouldMergeSchemas means (assumes) all the files have the same schemas.
val needMerged: Seq[FileStatus] =
if (mergeRespectSummaries) {
Seq.empty
} else {
filesByType.data
}
needMerged ++ filesByType.metadata ++ filesByType.commonMetadata
} else {
// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
// don't have this.
filesByType.commonMetadata.headOption
// Falls back to "_metadata"
.orElse(filesByType.metadata.headOption)
// Summary file(s) not found, the Parquet file is either corrupted, or different part-
// files contain conflicting user defined metadata (two or more values are associated
// with a same key in different files). In either case, we fall back to any of the
// first part-file, and just assume all schemas are consistent.
.orElse(filesByType.data.headOption)
.toSeq
}
ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
源码中分析
1、如果spark.sql.hive.convertMetastoreParquet.mergeSchema=false 默认
,直接取集合中第一个文件作为
本次读取的数据的schema
2、如果spark.sql.hive.convertMetastoreParquet.mergeSchema=true
则合并所有文件的schema,但是并行读取所有文件存在有性能问题,建议不这么用
注意:也可以在read.parquet指定schema,如果注意数据类型问题,否则报错
解决2
spark sql 直接读取hive metastore中的路径分区信息的元数据信息
package org.apache.spark.sql.hive mergeWithMetastoreSchema(hive合并schema)
def mergeWithMetastoreSchema(
metastoreSchema: StructType,
inferredSchema: StructType): StructType = try {
// Find any nullable fields in mestastore schema that are missing from the inferred schema.
val metastoreFields = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
val missingNullables = metastoreFields
.filterKeys(!inferredSchema.map(_.name.toLowerCase).contains(_))
.values
.filter(_.nullable)
// Merge missing nullable fields to inferred schema and build a case-insensitive field map.
val inferredFields = StructType(inferredSchema ++ missingNullables)
.map(f => f.name.toLowerCase -> f).toMap
StructType(metastoreSchema.map(f => f.copy(name = inferredFields(f.name).name)))
} catch {
case NonFatal(_) =>
val msg = s"""Detected conflicting schemas when merging the schema obtained from the Hive
| Metastore with the one inferred from the file format. Metastore schema:
|${metastoreSchema.prettyJson}
|
|Inferred schema:
|${inferredSchema.prettyJson}
""".stripMargin
throw new SparkException(msg)
}
源码分析
1、metastore中的字段名称和推断schema中的字段类型信息合成新schema,合并信息类型必须一致否则报错
2、合并schema 结果只hive metastore中包含的字段,如果parquet文件中没有,则为null
网友评论