在Spark Streaming的“Details of batch”页面会列出程序中调用的所有Output Operations on DStreams,官方文档列出了5种operation
Output Operation |
---|
print() |
saveAsTextFiles(prefix, [suffix]) |
saveAsObjectFiles(prefix, [suffix]) |
saveAsHadoopFiles(prefix, [suffix]) |
foreachRDD(func) |
简言之,Output Op Duration所显示的时间就是以上这些operation的运行时间,包括driver端和executor端的时间,也就是说Output Op Duration与Job Duration的时间差,就是Output Operation在driver端的代码运行时间。
源码分析
基于spark2.3.0的代码,看一下如何计算的Output Op Duration
case class OutputOperationInfo(
batchTime: Time,
id: Int,
name: String,
description: String,
startTime: Option[Long],
endTime: Option[Long],
failureReason: Option[String]) {
/**
* Return the duration of this output operation.
*/
def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s
}
org.apache.spark.streaming.scheduler.Job
def toOutputOperationInfo: OutputOperationInfo = {
val failureReason = if (_result != null && _result.isFailure) {
Some(Utils.exceptionString(_result.asInstanceOf[Failure[_]].exception))
} else {
None
}
OutputOperationInfo(
time, outputOpId, callSite.shortForm, callSite.longForm, _startTime, _endTime, failureReason)
}
通过上面的代码可以看到关键的问题是_startTime
和_endTime
是如何获取到的。
在org.apache.spark.streaming.scheduler.JobScheduler
的下面代码中_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
设置了_startTime
,_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
设置了_endTime
private class JobHandler(job: Job) extends Runnable with Logging {
import JobScheduler._
def run() {
val oldProps = ssc.sparkContext.getLocalProperties
try {
ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
ssc.sc.setJobDescription(
s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
// We need to assign `eventLoop` to a temp variable. Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
// it's possible that when `post` is called, `eventLoop` happens to null.
var _eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}
} else {
// JobScheduler has been stopped.
}
} finally {
ssc.sparkContext.setLocalProperties(oldProps)
}
}
}
网友评论