美文网首页spark
【spark系列6】spark delta写操作ACID事务前传

【spark系列6】spark delta写操作ACID事务前传

作者: 鸿乃江边鸟 | 来源:发表于2021-01-04 17:15 被阅读0次

    背景

    本文基于delta 0.7.0
    spark 3.0.1
    我们知道spark或者mapreduce在写文件的时候么,都会写入的文件目录中写入一个临时目录_temporary,用来存储正在写入的文件,那么这是怎么实现的呢以及是怎么控制的,这部分了解了可以避免在多实例写同一个目录下的冲突问题,之后我们再分析一下delta是怎么实现spark多实例下怎么避免文件冲突,这部分是理解delta ACID事务的前提。

    分析

    直接进入主题FileFormatWriter.write,这个是spark写入文件的入口:

     def write(
          sparkSession: SparkSession,
          plan: SparkPlan,
          fileFormat: FileFormat,
          committer: FileCommitProtocol,
          outputSpec: OutputSpec,
          hadoopConf: Configuration,
          partitionColumns: Seq[Attribute],
          bucketSpec: Option[BucketSpec],
          statsTrackers: Seq[WriteJobStatsTracker],
          options: Map[String, String])
        : Set[String] = {
    

    因为delta是基于parquet实现的,
    所以我们fileformat选择分析ParquetFileFormat,
    而对于FileCommitProtocol,我们分析SQLHadoopMapReduceCommitProtocol

    1. 该write方法实现比较长,我们讲重点 :
    committer.setupJob(job)
    

    这个做一些job提交前的准备工作,比如设置jobId,taskId,设置OutputCommitter,OutputCommitter是用来。。

    override def setupJob(jobContext: JobContext): Unit = {
        // Setup IDs
        val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
        val taskId = new TaskID(jobId, TaskType.MAP, 0)
        val taskAttemptId = new TaskAttemptID(taskId, 0)
    
        // Set up the configuration object
        jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)
        jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
        jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString)
        jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
        jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)
    
        val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
        committer = setupCommitter(taskAttemptContext)
        committer.setupJob(jobContext)
      }
    
    

    ParquetFileFormat对应的OutputCommitter是ParquetOutputCommitter,我们看一下方法:format.getOutputCommitter(context),ParquetOutputCommitter为:

    @Override
      public OutputCommitter getOutputCommitter(TaskAttemptContext context)
          throws IOException {
        if (committer == null) {
          Path output = getOutputPath(context);
          committer = new ParquetOutputCommitter(output, context);
        }
        return committer;
      }
    

    而最终调用了父类的构造方法:

    public FileOutputCommitter(Path outputPath, 
                                 TaskAttemptContext context) throws IOException {
        this(outputPath, (JobContext)context);
        if (outputPath != null) {
          workPath = getTaskAttemptPath(context, outputPath);
        }
      }
    

    注意这里的workPath(全局变量)赋值为$outputPath/_temporary,在以下newTaskTempFile方法中会用到

    接着进行setupJob操作:

    public void setupJob(JobContext context) throws IOException {
        if (hasOutputPath()) {
          Path jobAttemptPath = getJobAttemptPath(context);
          FileSystem fs = jobAttemptPath.getFileSystem(
              context.getConfiguration());
          if (!fs.mkdirs(jobAttemptPath)) {
            LOG.error("Mkdirs failed to create " + jobAttemptPath);
          }
        } else {
          LOG.warn("Output Path is null in setupJob()");
        
    

    而getJobAttemptPath中引用到$path/_temporary目录(其中path是文件输出目录),且建立该目录

    接下来是进行任务的提交:

    sparkSession.sparkContext.runJob(
            rddWithNonEmptyPartitions,
            (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
              executeTask(
                description = description,
                jobIdInstant = jobIdInstant,
                sparkStageId = taskContext.stageId(),
                sparkPartitionId = taskContext.partitionId(),
                sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
                committer,
                iterator = iter)
            },
            rddWithNonEmptyPartitions.partitions.indices,
            (index, res: WriteTaskResult) => {
              committer.onTaskCommit(res.commitMsg)
              ret(index) = res
            })
    

    其中重点看看executeTask方法:

    committer.setupTask(taskAttemptContext)
    
        val dataWriter =
          if (sparkPartitionId != 0 && !iterator.hasNext) {
            // In case of empty job, leave first partition to save meta for file format like parquet.
            new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)
          } else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {
            new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
          } else {
            new DynamicPartitionDataWriter(description, taskAttemptContext, committer)
          }
    
    • 对于SQLHadoopMapReduceCommitProtocol:setupTask实现如下:
    committer = setupCommitter(taskContext)
        committer.setupTask(taskContext)
        addedAbsPathFiles = mutable.Map[String, String]()
        partitionPaths = mutable.Set[String]()
    

    而committer.setupTask(taskContext),对应到ParquetOutputCommitter为空实现,

    • 之后看数据写入的最终执行者dataWriter,
      如果是没有分区,则是SingleDirectoryDataWriter:
    class SingleDirectoryDataWriter(
        description: WriteJobDescription,
        taskAttemptContext: TaskAttemptContext,
        committer: FileCommitProtocol)
      extends FileFormatDataWriter(description, taskAttemptContext, committer) {
      private var fileCounter: Int = _
      private var recordsInFile: Long = _
      // Initialize currentWriter and statsTrackers
      newOutputWriter()
    
      private def newOutputWriter(): Unit = {
        recordsInFile = 0
        releaseResources()
    
        val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)
        val currentPath = committer.newTaskTempFile(
          taskAttemptContext,
          None,
          f"-c$fileCounter%03d" + ext)
    
        currentWriter = description.outputWriterFactory.newInstance(
          path = currentPath,
          dataSchema = description.dataColumns.toStructType,
          context = taskAttemptContext)
    
        statsTrackers.foreach(_.newFile(currentPath))
      }
    
      override def write(record: InternalRow): Unit = {
        if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) {
          fileCounter += 1
          assert(fileCounter < MAX_FILE_COUNTER,
            s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")
    
          newOutputWriter()
        }
    
        currentWriter.write(record)
        statsTrackers.foreach(_.newRow(record))
        recordsInFile += 1
      }
    }
    

    这里写文件是哪里呢?

        val currentPath = committer.newTaskTempFile(
          taskAttemptContext,
          None,
          f"-c$fileCounter%03d" + ext)
    

    对应到HadoopMapReduceCommitProtocol到newTaskTempFile方法为:

    override def newTaskTempFile(
          taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
        val filename = getFilename(taskContext, ext)
    
        val stagingDir: Path = committer match {
          case _ if dynamicPartitionOverwrite =>
            assert(dir.isDefined,
              "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
            partitionPaths += dir.get
            this.stagingDir
          // For FileOutputCommitter it has its own staging path called "work path".
          case f: FileOutputCommitter =>
            new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
          case _ => new Path(path)
        }
    
        dir.map { d =>
          new Path(new Path(stagingDir, d), filename).toString
        }.getOrElse {
          new Path(stagingDir, filename).toString
        }
      }
    

    如果开启partitionOverwriteMode,则设置为new Path(path, ".spark-staging-" + jobId)
    如果没有开启partitionOverwriteMode,且FileOutputCommitter的子类,如果workpath存在则设置为workPath,否则为path,注意我们之前FileOutputCommitter构造方法中已经设置了workPath,所以最终的输出目录为$path/_temporary

    所以job向该目录写入数据。
    DynamicPartitionDataWriter的分析,读者可以进行类似的分析,只不过目录则加了分区信息,只写入自己的分区目录中

    • 如果写入成功的话执行如下:
    try {
          Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
            // Execute the task to write rows out and commit the task.
            while (iterator.hasNext) {
              dataWriter.write(iterator.next())
            }
            dataWriter.commit()
          })(catchBlock = {
            // If there is an error, abort the task
            dataWriter.abort()
            logError(s"Job $jobId aborted.")
          }, finallyBlock = {
            dataWriter.close()
          })
    

    dataWriter.commit()如下:

    override def commit(): WriteTaskResult = {
        releaseResources()
        val summary = ExecutedWriteSummary(
          updatedPartitions = updatedPartitions.toSet,
          stats = statsTrackers.map(_.getFinalStats()))
        WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
      }
    

    首先会释放资源,也就是关闭writer
    之后调用FileCommitProtocol.commitTask();

     override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
        val attemptId = taskContext.getTaskAttemptID
        logTrace(s"Commit task ${attemptId}")
        SparkHadoopMapRedUtil.commitTask(
          committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
        new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
      }
    

    而SparkHadoopMapRedUtil.commitTask最终调用FileOutputCommitter的commitTask方法把PATH/_temporary下文件mv到PATH下
    之后返回统计的数值,数据格式如下:

    case class BasicWriteTaskStats(
        numPartitions: Int,
        numFiles: Int,
        numBytes: Long,
        numRows: Long)
      extends WriteTaskStats
    
    1. 之后会committer.onTaskCommit(res.commitMsg)操作,
      对于SQLHadoopMapReduceCommitProtocol的实现为:
      logDebug(s"onTaskCommit($taskCommit)")

    2. 下一步committer.commitJob(job, commitMsgs):

    ...
     committer.commitJob(jobContext)
     ...
      for ((src, dst) <- filesToMove) {
            fs.rename(new Path(src), new Path(dst))
          }
     ...
     fs.delete(stagingDir, true)
    

    这里主要涉及清理job,以及把task所产生的文件(writer输出的临时文件)移动到path目录下,且清理临时目录,至此文件真正的写入到了path目录下

    1. 指标记录
    private[datasources] def processStats(
          statsTrackers: Seq[WriteJobStatsTracker],
          statsPerTask: Seq[Seq[WriteTaskStats]])
      : Unit = {
    
        val numStatsTrackers = statsTrackers.length
        assert(statsPerTask.forall(_.length == numStatsTrackers),
          s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker.
             |There are $numStatsTrackers statsTrackers, but some task returned
             |${statsPerTask.find(_.length != numStatsTrackers).get.length} results instead.
           """.stripMargin)
    
        val statsPerTracker = if (statsPerTask.nonEmpty) {
          statsPerTask.transpose
        } else {
          statsTrackers.map(_ => Seq.empty)
        }
    
        statsTrackers.zip(statsPerTracker).foreach {
          case (statsTracker, stats) => statsTracker.processStats(stats)
        }
      }
    

    主要是把刚才job的指标通过statsTrackers传给driver,而目前的statsTracker实现类为BasicWriteJobStatsTracker,也就是说最终会通过listenerbus以事件的形式传播,
    如下代码:

    class BasicWriteJobStatsTracker(
        serializableHadoopConf: SerializableConfiguration,
        @transient val metrics: Map[String, SQLMetric])
      extends WriteJobStatsTracker {
       ...
    
      override def processStats(stats: Seq[WriteTaskStats]): Unit = {
        ...
    
        metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)
        metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
        metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
        metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(numPartitions)
    
        val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
        SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
      }
    }
    

    至此整个spark parquet write文件的数据流程我们就已经全部过了一遍了,部分细节没有展示。
    最终的数据流如下:

    实例化Job对象
          |
          v
    FileCommitProtocol.setupJob -> OutputCommitter.setupJob  进行作业运行前的准备,如建立临时目录_temporary等
          |
          v
    executeTask()-> FileCommitProtocol.setupTask -> OutputCommitter.setupTask 目前为空实现 
                    |
                    v
             FileCommitProtocol.newTaskTempFile/newTaskTempFileAbsPath 建立写任务的临时目录
                    |
                    v
             dataWriter.write() 
                    |
                    v
             dataWriter.commit() 释放资源以及返回写入文件的指标信息 -> HadoopMapReduceCommitProtocol.commitTask
                              |
                              v
                     SparkHadoopMapRedUtil.commitTask 完成mv $PATH/_temporary文件 到$PATH目录,以及做outputCommitCoordination
                               |
                               v
                     返回需要额外临时目录的信息  
          |
          v
    FileCommitProtocol.onTaskCommit
          |
          v
    FileCommitProtocol.commitJob -> OutputCommitter.commitJob 清理$PATH/_temporary目录且把写额外临时目录下的文件mv到最终path目录下
          |
          v
    processStats,处理写入的文件指标
    

    那对应到delta中,spark写入delta数据是怎么写入的呢?其实流程和以上的流程一模一样,唯一不同的是FileCommitProtocol类的实现,直接到TransactionalWrite.writeFiles:

    def writeFiles(
          data: Dataset[_],
          writeOptions: Option[DeltaOptions],
          isOptimize: Boolean): Seq[AddFile] = {
        hasWritten = true
    
        ...
        val committer = getCommitter(outputPath)
    
        ...
          FileFormatWriter.write(
            sparkSession = spark,
            plan = physicalPlan,
            fileFormat = snapshot.fileFormat, // TODO doesn't support changing formats.
            committer = committer,
            outputSpec = outputSpec,
            hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),
            partitionColumns = partitioningColumns,
            bucketSpec = None,
            statsTrackers = statsTrackers,
            options = Map.empty)
        }
    
        committer.addedStatuses
      }
    

    而这里的commiter为DelayedCommitProtocol,如下:

        new DelayedCommitProtocol("delta", outputPath.toString, None)
    

    我们来看一下DelayedCommitProtocol方法:

    override def newTaskTempFile(
          taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
        val filename = getFileName(taskContext, ext)
        val partitionValues = dir.map(parsePartitions).getOrElse(Map.empty[String, String])
        val relativePath = randomPrefixLength.map { prefixLength =>
          getRandomPrefix(prefixLength) // Generate a random prefix as a first choice
        }.orElse {
          dir // or else write into the partition directory if it is partitioned
        }.map { subDir =>
          new Path(subDir, filename)
        }.getOrElse(new Path(filename)) // or directly write out to the output path
    
        addedFiles.append((partitionValues, relativePath.toUri.toString))
        new Path(path, relativePath).toString
      }
    
      override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
        if (addedFiles.nonEmpty) {
          val fs = new Path(path, addedFiles.head._2).getFileSystem(taskContext.getConfiguration)
          val statuses: Seq[AddFile] = addedFiles.map { f =>
            val filePath = new Path(path, new Path(new URI(f._2)))
            val stat = fs.getFileStatus(filePath)
            AddFile(f._2, f._1, stat.getLen, stat.getModificationTime, true)
          }
    
          new TaskCommitMessage(statuses)
        } else {
          new TaskCommitMessage(Nil)
        }
      }
    
      override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
        val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[AddFile]]).toArray
        addedStatuses ++= fileStatuses
      }
    
    • 其中newTaskTempFile生成的文件中多了一个UUID.randomUUID.toString,这能减少文件的冲突
    • newTaskTempFile目前直接是返回了输出目录,而不是_temporary目录
    • commitTask只是记录增加的文件
    • commitJob并没有真正的提交job,只是把AddFile保存到了内存中

    后续我们会分析delta怎么处理AddFile,从而做到事务性

    由此可见后续还会有对该的处理,这也是为什么叫做DelayedCommitProtocol的原因,后续我们会分析delta怎么处理AddFile,从而做到事务性

    注意task输出的文件目录为:
    ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}
    如:/data/_temporary/0/_temporary/attempt_20190219101232_0003_m_000005_0/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet
    
    

    关于rename机制的v1和v2的区别,可以参考该文章

    相关文章

      网友评论

        本文标题:【spark系列6】spark delta写操作ACID事务前传

        本文链接:https://www.haomeiwen.com/subject/xwvvoktx.html