困惑
1、spark sql
读取parquet 文件,stage生成任务4个task,只有一个task处理数据,其它无
2、spark
任务执行apache iceberg rewriteDataFiles
合并小文件(parquet文件),发现偶然无变化
Parquet文件
一个Parquet文件是由一个header以及一个或多个block块组成,以一个footer结尾。
header中只包含一个4个字节的数字PAR1用来识别整个Parquet文件格式。
文件中所有的metadata都存在于footer中。
footer中的metadata包含了格式的版本信息,schema信息、key-value paris以及所有block中的metadata信息。
footer中最后两个字段为一个以4个字节长度的footer的metadata,以及同header中包含的一样的PAR1。

-
Parquet文件格式
image.png
上图展示了一个Parquet文件的结构
- 一个文件中可以存储多个行组,文件的首位都是该文件的Magic Code,用于校验它是否是一个Parquet文件
- Footer length存储了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和当前文件的Schema信息。
- 每一页的开始都会存储该页的元数据,在Parquet中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引
存储格式:Parquet 的存储模型主要由行组(Row Group 默认128M
)、列块(Column Chuck)、页(Page)组成。 -
支持数据嵌套模型
Parquet支持嵌套的数据模型,类似于Protocol Buffers
image.png
图可以看出在Schema中所有的基本类型字段都是叶子节点,在这个Schema中一共存在6个叶子节点,如果把这样的Schema转换成扁平式的关系模型,就可以理解为该表包含六个列
Parquet中没有Map、Array这样的复杂数据结构每一个数据模型的schema包含多个字段,每一个字段又可以包含多个字段,每一个字段有三个属性:重复数、数据类型和字段名,
重复数可以是以下三种:required(出现1次),repeated(出现0次或多次),optional(出现0次或1次)。
每一个字段的数据类型可以分成两种:group(复杂类型)和primitive(基本类型)。
以上实现列式存储
,但是无法将其恢复到原来的数据行的结构形式,Parquet采用了Dremel中(R, D, V)模型
R,即Repetition Level,用于表达一个列有重复,即有多个值的情况,其值为重复是在第几层上发生。
D,即Definition Level,用于表达某个列是否为空、在哪里为空,其值为当前列在第几层上有值
V,表示数据值
1、行组,Row Group:Parquet 在水平方向上将数据划分为行组,默认行组大小与 HDFS Block 块大小对齐,Parquet 保证一个行组会被一个 Mapper 处理。
2、列块,Column Chunk:行组中每一列保存在一个列块中,一个列块具有相同的数据类型,不同的列块可以使用不同的压缩。
3、页,Page:Parquet 是页存储方式,每一个列块包含多个页,一个页是最小的编码的单位
,同一列块的不同页可以使用不同的编码方式。
小结
- Parquet 是一种支持嵌套结构的列式存储格式,非常适用于 OLAP 场景,按列存储和扫描
- 列存使得更容易对每个列使用高效的压缩和编码(
一个页是最小的编码的单位
),降低磁盘空间。 - 映射下推,这是列式存储最突出的优势,是指在获取数据时只需要扫描需要的列,不用全部扫描。
- 谓词下推,是指通过将一些过滤条件尽可能的在最底层执行以减少结果集。谓词就是指这些过滤条件,即返回
spark 2.4.0读取parquet文件
spark.read.parquet("")
org.apache.spark.sql.DataFrameReader.java
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
val jdbc = classOf[JdbcRelationProvider].getCanonicalName
val json = classOf[JsonFileFormat].getCanonicalName
val parquet = classOf[ParquetFileFormat].getCanonicalName
val csv = classOf[CSVFileFormat].getCanonicalName
val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
val socket = classOf[TextSocketSourceProvider].getCanonicalName --->DataSourceV2
val rate = classOf[RateStreamProvider].getCanonicalName --->DataSourceV2
private def loadV1Source(paths: String*) = {
// Code path for data source v1.
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation
->getOrInferFileFormatSchema()
**Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list.**
private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
}
InMemoryFileIndex.refresh0()
InMemoryFileIndex.listLeafFiles()
InMemoryFileIndex.bulkListLeafFiles()
val parallelPartitionDiscoveryParallelism =
private[sql] def bulkListLeafFiles(
...
spark.sql.sources.parallelPartitionDiscovery.parallelism 默认10000
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
**设置并行度来防止下面的文件列表生成许多任务**
**in case of large defaultParallelism.**
**val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)**
val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
val statusMap = try {
val description = paths.size match {
case 0 =>
s"Listing leaf files and directories 0 paths"
case 1 =>
s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"
case s =>
s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."
}
sparkContext.setJobDescription(description)
sparkContext
.parallelize(serializedPaths, numParallelism)
.mapPartitions { pathStrings =>
val hadoopConf = serializableConfiguration.value
pathStrings.map(new Path(_)).toSeq.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, None))
}.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
val blockLocations = status match {
case f: LocatedFileStatus =>
f.getBlockLocations.map { loc =>
SerializableBlockLocation(
loc.getNames,
loc.getHosts,
loc.getOffset,
loc.getLength)
}
case _ =>
Array.empty[SerializableBlockLocation]
}
SerializableFileStatus(
status.getPath.toString,
status.getLen,
status.isDirectory,
status.getReplication,
status.getBlockSize,
status.getModificationTime,
status.getAccessTime,
blockLocations)
}
(path.toString, serializableStatuses)
}.collect()
...
)
真正读取数据是DataSourceScanExec
注意这里有DataSourceV2ScanExec v2版本,经上面代码分析,parquet,orc 使用的是v1版org.apache.spark.sql.execution.DataSourceScanExec.scala
Physical plan node for scanning data from HadoopFsRelations.
FileSourceScanExec
private lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
relation.bucketSpec match {
case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
case _ =>
createNonBucketedReadRDD(readFile, selectedPartitions, relation)
}
}
private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
128M
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
4M
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
上面代码sparkcontent设置的
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
切文件
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
val blockLocations = getBlockLocations(file)
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, file.getPath)) {
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(blockLocations, offset, size)
PartitionedFile(
partition.values, file.getPath.toUri.toString, offset, size, hosts)
}
} else {
val hosts = getBlockHosts(blockLocations, 0, file.getLen)
Seq(PartitionedFile(
partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
}
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L
/** Close the current partition and move to the next. */
合并小文件,大文件就直接变为partition了。一路下来会以为会切大文件,然而并不会。
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
val newPartition =
FilePartition(
partitions.size,
currentFiles.toArray.toSeq) // Copy to a new Array.
partitions += newPartition
}
currentFiles.clear()
currentSize = 0
}
// Assign files to partitions using "Next Fit Decreasing"
splitFiles.foreach { file =>
这里遇到大文件直接放入partitions分区,小文件是几个大小达到maxSplitBytes,放入一个分区提高
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
}
closePartition()
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}
小结
1、spark 2.4.0读取parquet,使用的是loadV1Source
2、spark 读取文件默认task任务数(分区数
)最大10000,最小是path的个数(注意并行度和任务数分区数
区别)
3、createNonBucketedReadRDD 中Bucketed理解,是指hive表中的分区下面的分桶
4、rdd分区数确认:合并小文件,大文件就直接变为partition了,注意大文件没有切,目的提高cpu利用率
接着来到FileScanRDD 和 parquet jar本身提供的读写api
org.apache.spark.sql.execution.datasources.FileScanRDD
private def readCurrentFile(): Iterator[InternalRow] = {
try {
readFunction(currentFile)
} catch {
case e: FileNotFoundException =>
throw new FileNotFoundException(
e.getMessage + "\n" +
"It is possible the underlying files have been updated. " +
"You can explicitly invalidate the cache in Spark by " +
"running 'REFRESH TABLE tableName' command in SQL or " +
"by recreating the Dataset/DataFrame involved.")
}
}
ParquetFileFormat.buildReaderWithPartitionValues(该方法上面有提)构造reader,
override def buildReaderWithPartitionValues(
...
if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion lister before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
} else {
...
reader.initialize(split, hadoopAttemptContext)
}
vectorizedReader.initialize(split, hadoopAttemptContext)
->SpecificParquetRecordReaderBase.initialize
->ParquetMetadata footer = readFooter(config, file, range(0, length));注意这里传入的range
->ParquetMetadataConverter.converter.readParquetMetadata(f, filter)
public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
@Override
public FileMetaData visit(NoFilter filter) throws IOException {
return readFileMetaData(from);
}
@Override
public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
return readFileMetaData(from, true);
}
@Override
public FileMetaData visit(OffsetMetadataFilter filter) throws IOException {
return filterFileMetaDataByStart(readFileMetaData(from), filter);
}
@Override
public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
return filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
}
});
LOG.debug("{}", fileMetaData);
ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
return parquetMetadata;
}
RangeMetadataFilter filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {
List<RowGroup> rowGroups = metaData.getRow_groups();
List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
for (RowGroup rowGroup : rowGroups) {
long totalSize = 0;
long startIndex = getOffset(rowGroup.getColumns().get(0));
for (ColumnChunk col : rowGroup.getColumns()) {
totalSize += col.getMeta_data().getTotal_compressed_size();
}
long midPoint = startIndex + totalSize / 2;
if (filter.contains(midPoint)) {
newRowGroups.add(rowGroup);
}
}
metaData.setRow_groups(newRowGroups);
return metaData;
}
到这里分割的关键点找到
现在假设我们有一个40m 的文件, 只有一个 row group, 10m 一分, 那么将会有4个 partitions
但是只有一个 partition 会占有这个 row group 的中点, 所以也只有这一个 partition 会有数据
小结
1、spark 读取parquet文件默认用enableVectorizedReader,向量读
2、根据DataSourceScanExec代码中划分的partitions, 但不是所有partitions 最后都会有数据
3、对于parquet文件,对于一个大的文件只含有一个rowgroup,task中谁拥有这个文件的中点谁处理这个rowgroup,这样解决文章开头的疑惑
参考
https://zhuanlan.zhihu.com/p/83006243
https://my.oschina.net/tjt/blog/2250953
网友评论