美文网首页数客联盟
Output Op Duration的含义

Output Op Duration的含义

作者: Woople | 来源:发表于2018-05-11 14:26 被阅读59次

在Spark Streaming的“Details of batch”页面会列出程序中调用的所有Output Operations on DStreams,官方文档列出了5种operation

Output Operation
print()
saveAsTextFiles(prefix, [suffix])
saveAsObjectFiles(prefix, [suffix])
saveAsHadoopFiles(prefix, [suffix])
foreachRDD(func)

简言之,Output Op Duration所显示的时间就是以上这些operation的运行时间,包括driver端和executor端的时间,也就是说Output Op Duration与Job Duration的时间差,就是Output Operation在driver端的代码运行时间。

源码分析

基于spark2.3.0的代码,看一下如何计算的Output Op Duration

case class OutputOperationInfo(
    batchTime: Time,
    id: Int,
    name: String,
    description: String,
    startTime: Option[Long],
    endTime: Option[Long],
    failureReason: Option[String]) {

  /**
   * Return the duration of this output operation.
   */
  def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s
}

org.apache.spark.streaming.scheduler.Job

  def toOutputOperationInfo: OutputOperationInfo = {
    val failureReason = if (_result != null && _result.isFailure) {
      Some(Utils.exceptionString(_result.asInstanceOf[Failure[_]].exception))
    } else {
      None
    }
    OutputOperationInfo(
      time, outputOpId, callSite.shortForm, callSite.longForm, _startTime, _endTime, failureReason)
  }

通过上面的代码可以看到关键的问题是_startTime_endTime是如何获取到的。

org.apache.spark.streaming.scheduler.JobScheduler的下面代码中_eventLoop.post(JobStarted(job, clock.getTimeMillis()))设置了_startTime_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))设置了_endTime

private class JobHandler(job: Job) extends Runnable with Logging {
  import JobScheduler._

  def run() {
    val oldProps = ssc.sparkContext.getLocalProperties
    try {
      ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
      val formattedTime = UIUtils.formatBatchTime(
        job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
      val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
      val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

      ssc.sc.setJobDescription(
        s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
      ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
      ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
      // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
      // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
      ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")

      // We need to assign `eventLoop` to a temp variable. Otherwise, because
      // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
      // it's possible that when `post` is called, `eventLoop` happens to null.
      var _eventLoop = eventLoop
      if (_eventLoop != null) {
        _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
        // Disable checks for existing output directories in jobs launched by the streaming
        // scheduler, since we may need to write output to an existing directory during checkpoint
        // recovery; see SPARK-4835 for more details.
        SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
          job.run()
        }
        _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
        }
      } else {
        // JobScheduler has been stopped.
      }
    } finally {
      ssc.sparkContext.setLocalProperties(oldProps)
    }
  }
}

相关文章

网友评论

    本文标题:Output Op Duration的含义

    本文链接:https://www.haomeiwen.com/subject/jqefdftx.html