背景
本文基于delta 0.7.0
spark 3.0.1
我们知道spark或者mapreduce在写文件的时候么,都会写入的文件目录中写入一个临时目录_temporary,用来存储正在写入的文件,那么这是怎么实现的呢以及是怎么控制的,这部分了解了可以避免在多实例写同一个目录下的冲突问题,之后我们再分析一下delta是怎么实现spark多实例下怎么避免文件冲突,这部分是理解delta ACID事务的前提。
分析
直接进入主题FileFormatWriter.write,这个是spark写入文件的入口:
def write(
sparkSession: SparkSession,
plan: SparkPlan,
fileFormat: FileFormat,
committer: FileCommitProtocol,
outputSpec: OutputSpec,
hadoopConf: Configuration,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
statsTrackers: Seq[WriteJobStatsTracker],
options: Map[String, String])
: Set[String] = {
因为delta是基于parquet实现的,
所以我们fileformat选择分析ParquetFileFormat,
而对于FileCommitProtocol,我们分析SQLHadoopMapReduceCommitProtocol
- 该write方法实现比较长,我们讲重点 :
committer.setupJob(job)
这个做一些job提交前的准备工作,比如设置jobId,taskId,设置OutputCommitter,OutputCommitter是用来。。
override def setupJob(jobContext: JobContext): Unit = {
// Setup IDs
val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
val taskId = new TaskID(jobId, TaskType.MAP, 0)
val taskAttemptId = new TaskAttemptID(taskId, 0)
// Set up the configuration object
jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)
jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString)
jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)
val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
committer = setupCommitter(taskAttemptContext)
committer.setupJob(jobContext)
}
ParquetFileFormat对应的OutputCommitter是ParquetOutputCommitter,我们看一下方法:format.getOutputCommitter(context)
,ParquetOutputCommitter为:
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new ParquetOutputCommitter(output, context);
}
return committer;
}
而最终调用了父类的构造方法:
public FileOutputCommitter(Path outputPath,
TaskAttemptContext context) throws IOException {
this(outputPath, (JobContext)context);
if (outputPath != null) {
workPath = getTaskAttemptPath(context, outputPath);
}
}
注意这里的workPath(全局变量)赋值为$outputPath/_temporary,在以下newTaskTempFile方法中会用到
接着进行setupJob操作:
public void setupJob(JobContext context) throws IOException {
if (hasOutputPath()) {
Path jobAttemptPath = getJobAttemptPath(context);
FileSystem fs = jobAttemptPath.getFileSystem(
context.getConfiguration());
if (!fs.mkdirs(jobAttemptPath)) {
LOG.error("Mkdirs failed to create " + jobAttemptPath);
}
} else {
LOG.warn("Output Path is null in setupJob()");
而getJobAttemptPath中引用到$path/_temporary目录(其中path是文件输出目录),且建立该目录
接下来是进行任务的提交:
sparkSession.sparkContext.runJob(
rddWithNonEmptyPartitions,
(taskContext: TaskContext, iter: Iterator[InternalRow]) => {
executeTask(
description = description,
jobIdInstant = jobIdInstant,
sparkStageId = taskContext.stageId(),
sparkPartitionId = taskContext.partitionId(),
sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
committer,
iterator = iter)
},
rddWithNonEmptyPartitions.partitions.indices,
(index, res: WriteTaskResult) => {
committer.onTaskCommit(res.commitMsg)
ret(index) = res
})
其中重点看看executeTask方法:
committer.setupTask(taskAttemptContext)
val dataWriter =
if (sparkPartitionId != 0 && !iterator.hasNext) {
// In case of empty job, leave first partition to save meta for file format like parquet.
new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)
} else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {
new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
} else {
new DynamicPartitionDataWriter(description, taskAttemptContext, committer)
}
- 对于SQLHadoopMapReduceCommitProtocol:setupTask实现如下:
committer = setupCommitter(taskContext)
committer.setupTask(taskContext)
addedAbsPathFiles = mutable.Map[String, String]()
partitionPaths = mutable.Set[String]()
而committer.setupTask(taskContext),对应到ParquetOutputCommitter为空实现,
- 之后看数据写入的最终执行者dataWriter,
如果是没有分区,则是SingleDirectoryDataWriter:
class SingleDirectoryDataWriter(
description: WriteJobDescription,
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol)
extends FileFormatDataWriter(description, taskAttemptContext, committer) {
private var fileCounter: Int = _
private var recordsInFile: Long = _
// Initialize currentWriter and statsTrackers
newOutputWriter()
private def newOutputWriter(): Unit = {
recordsInFile = 0
releaseResources()
val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)
val currentPath = committer.newTaskTempFile(
taskAttemptContext,
None,
f"-c$fileCounter%03d" + ext)
currentWriter = description.outputWriterFactory.newInstance(
path = currentPath,
dataSchema = description.dataColumns.toStructType,
context = taskAttemptContext)
statsTrackers.foreach(_.newFile(currentPath))
}
override def write(record: InternalRow): Unit = {
if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) {
fileCounter += 1
assert(fileCounter < MAX_FILE_COUNTER,
s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")
newOutputWriter()
}
currentWriter.write(record)
statsTrackers.foreach(_.newRow(record))
recordsInFile += 1
}
}
这里写文件是哪里呢?
val currentPath = committer.newTaskTempFile(
taskAttemptContext,
None,
f"-c$fileCounter%03d" + ext)
对应到HadoopMapReduceCommitProtocol到newTaskTempFile方法为:
override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val filename = getFilename(taskContext, ext)
val stagingDir: Path = committer match {
case _ if dynamicPartitionOverwrite =>
assert(dir.isDefined,
"The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
partitionPaths += dir.get
this.stagingDir
// For FileOutputCommitter it has its own staging path called "work path".
case f: FileOutputCommitter =>
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
case _ => new Path(path)
}
dir.map { d =>
new Path(new Path(stagingDir, d), filename).toString
}.getOrElse {
new Path(stagingDir, filename).toString
}
}
如果开启partitionOverwriteMode,则设置为new Path(path, ".spark-staging-" + jobId)
如果没有开启partitionOverwriteMode,且FileOutputCommitter的子类,如果workpath存在则设置为workPath,否则为path,注意我们之前FileOutputCommitter构造方法中已经设置了workPath,所以最终的输出目录为$path/_temporary
所以job向该目录写入数据。
DynamicPartitionDataWriter的分析,读者可以进行类似的分析,只不过目录则加了分区信息,只写入自己的分区目录中
- 如果写入成功的话执行如下:
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
// Execute the task to write rows out and commit the task.
while (iterator.hasNext) {
dataWriter.write(iterator.next())
}
dataWriter.commit()
})(catchBlock = {
// If there is an error, abort the task
dataWriter.abort()
logError(s"Job $jobId aborted.")
}, finallyBlock = {
dataWriter.close()
})
dataWriter.commit()如下:
override def commit(): WriteTaskResult = {
releaseResources()
val summary = ExecutedWriteSummary(
updatedPartitions = updatedPartitions.toSet,
stats = statsTrackers.map(_.getFinalStats()))
WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
}
首先会释放资源,也就是关闭writer
之后调用FileCommitProtocol.commitTask();
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
val attemptId = taskContext.getTaskAttemptID
logTrace(s"Commit task ${attemptId}")
SparkHadoopMapRedUtil.commitTask(
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
}
而SparkHadoopMapRedUtil.commitTask最终调用FileOutputCommitter的commitTask方法把PATH下
之后返回统计的数值,数据格式如下:
case class BasicWriteTaskStats(
numPartitions: Int,
numFiles: Int,
numBytes: Long,
numRows: Long)
extends WriteTaskStats
-
之后会committer.onTaskCommit(res.commitMsg)操作,
对于SQLHadoopMapReduceCommitProtocol的实现为:
logDebug(s"onTaskCommit($taskCommit)")
-
下一步committer.commitJob(job, commitMsgs):
...
committer.commitJob(jobContext)
...
for ((src, dst) <- filesToMove) {
fs.rename(new Path(src), new Path(dst))
}
...
fs.delete(stagingDir, true)
这里主要涉及清理job,以及把task所产生的文件(writer输出的临时文件)移动到path目录下,且清理临时目录,至此文件真正的写入到了path目录下
- 指标记录
private[datasources] def processStats(
statsTrackers: Seq[WriteJobStatsTracker],
statsPerTask: Seq[Seq[WriteTaskStats]])
: Unit = {
val numStatsTrackers = statsTrackers.length
assert(statsPerTask.forall(_.length == numStatsTrackers),
s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker.
|There are $numStatsTrackers statsTrackers, but some task returned
|${statsPerTask.find(_.length != numStatsTrackers).get.length} results instead.
""".stripMargin)
val statsPerTracker = if (statsPerTask.nonEmpty) {
statsPerTask.transpose
} else {
statsTrackers.map(_ => Seq.empty)
}
statsTrackers.zip(statsPerTracker).foreach {
case (statsTracker, stats) => statsTracker.processStats(stats)
}
}
主要是把刚才job的指标通过statsTrackers传给driver,而目前的statsTracker实现类为BasicWriteJobStatsTracker,也就是说最终会通过listenerbus以事件的形式传播,
如下代码:
class BasicWriteJobStatsTracker(
serializableHadoopConf: SerializableConfiguration,
@transient val metrics: Map[String, SQLMetric])
extends WriteJobStatsTracker {
...
override def processStats(stats: Seq[WriteTaskStats]): Unit = {
...
metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)
metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(numPartitions)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
}
}
至此整个spark parquet write文件的数据流程我们就已经全部过了一遍了,部分细节没有展示。
最终的数据流如下:
实例化Job对象
|
v
FileCommitProtocol.setupJob -> OutputCommitter.setupJob 进行作业运行前的准备,如建立临时目录_temporary等
|
v
executeTask()-> FileCommitProtocol.setupTask -> OutputCommitter.setupTask 目前为空实现
|
v
FileCommitProtocol.newTaskTempFile/newTaskTempFileAbsPath 建立写任务的临时目录
|
v
dataWriter.write()
|
v
dataWriter.commit() 释放资源以及返回写入文件的指标信息 -> HadoopMapReduceCommitProtocol.commitTask
|
v
SparkHadoopMapRedUtil.commitTask 完成mv $PATH/_temporary文件 到$PATH目录,以及做outputCommitCoordination
|
v
返回需要额外临时目录的信息
|
v
FileCommitProtocol.onTaskCommit
|
v
FileCommitProtocol.commitJob -> OutputCommitter.commitJob 清理$PATH/_temporary目录且把写额外临时目录下的文件mv到最终path目录下
|
v
processStats,处理写入的文件指标
那对应到delta中,spark写入delta数据是怎么写入的呢?其实流程和以上的流程一模一样,唯一不同的是FileCommitProtocol类的实现,直接到TransactionalWrite.writeFiles:
def writeFiles(
data: Dataset[_],
writeOptions: Option[DeltaOptions],
isOptimize: Boolean): Seq[AddFile] = {
hasWritten = true
...
val committer = getCommitter(outputPath)
...
FileFormatWriter.write(
sparkSession = spark,
plan = physicalPlan,
fileFormat = snapshot.fileFormat, // TODO doesn't support changing formats.
committer = committer,
outputSpec = outputSpec,
hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),
partitionColumns = partitioningColumns,
bucketSpec = None,
statsTrackers = statsTrackers,
options = Map.empty)
}
committer.addedStatuses
}
而这里的commiter为DelayedCommitProtocol,如下:
new DelayedCommitProtocol("delta", outputPath.toString, None)
我们来看一下DelayedCommitProtocol方法:
override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val filename = getFileName(taskContext, ext)
val partitionValues = dir.map(parsePartitions).getOrElse(Map.empty[String, String])
val relativePath = randomPrefixLength.map { prefixLength =>
getRandomPrefix(prefixLength) // Generate a random prefix as a first choice
}.orElse {
dir // or else write into the partition directory if it is partitioned
}.map { subDir =>
new Path(subDir, filename)
}.getOrElse(new Path(filename)) // or directly write out to the output path
addedFiles.append((partitionValues, relativePath.toUri.toString))
new Path(path, relativePath).toString
}
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
if (addedFiles.nonEmpty) {
val fs = new Path(path, addedFiles.head._2).getFileSystem(taskContext.getConfiguration)
val statuses: Seq[AddFile] = addedFiles.map { f =>
val filePath = new Path(path, new Path(new URI(f._2)))
val stat = fs.getFileStatus(filePath)
AddFile(f._2, f._1, stat.getLen, stat.getModificationTime, true)
}
new TaskCommitMessage(statuses)
} else {
new TaskCommitMessage(Nil)
}
}
override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[AddFile]]).toArray
addedStatuses ++= fileStatuses
}
- 其中newTaskTempFile生成的文件中多了一个UUID.randomUUID.toString,这能减少文件的冲突
- newTaskTempFile目前直接是返回了输出目录,而不是_temporary目录
- commitTask只是记录增加的文件
- commitJob并没有真正的提交job,只是把AddFile保存到了内存中
后续我们会分析delta怎么处理AddFile,从而做到事务性
由此可见后续还会有对该的处理,这也是为什么叫做DelayedCommitProtocol的原因,后续我们会分析delta怎么处理AddFile,从而做到事务性
注意task输出的文件目录为:
${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}
如:/data/_temporary/0/_temporary/attempt_20190219101232_0003_m_000005_0/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet
关于rename机制的v1和v2的区别,可以参考该文章
网友评论