1. 概要
我们看前面的设计图, 在SparkContext中我们重点画出了连个Listener.
EventLoggingListener用于把监听到的Event信息通过Json格式下刷到存储中, 一般是HDFS.
2. 代码
/**
* A SparkListener that logs events to persistent storage.
*
* Event logging is specified by the following configurable parameters:
* spark.eventLog.enabled - Whether event logging is enabled.
* spark.eventLog.compress - Whether to compress logged events
* spark.eventLog.overwrite - Whether to overwrite any existing files.
* spark.eventLog.dir - Path to the directory in which events are logged.
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
*/
private[spark] class EventLoggingListener(
appId: String,
appAttemptId : Option[String],
logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {
import EventLoggingListener._
// 下刷日志的文件夹地址, 一般来说是刷到HDFS里
def this(appId: String, appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf) =
this(appId, appAttemptId, logBaseDir, sparkConf,
SparkHadoopUtil.get.newConfiguration(sparkConf))
// 一系列的配置项, 可以注意到为了保证下刷的速度, 默认每100kb刷一次
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
private val compressionCodec =
if (shouldCompress) {
Some(CompressionCodec.createCodec(sparkConf))
} else {
None
}
// 反射出来压缩用的方法
private val compressionCodecName = compressionCodec.map { c =>
CompressionCodec.getShortName(c.getClass.getName)
}
// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None
// 这个地方非常有趣, 设计的时候考虑到了Hadoop的API可能的变化....
// The Hadoop APIs have changed over time, so we use reflection to figure out
// the correct method to use to flush a hadoop data stream. See SPARK-1518
// for details.
private val hadoopFlushMethod = {
val cls = classOf[FSDataOutputStream]
scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync"))
}
private var writer: Option[PrintWriter] = None
/**
* Creates the log file in the configured log directory.
*/
def start() {}
// 各种列下刷过程和错误控制
// 保留下面这个有趣的注释, 伯克利的团队对兼容的执着要求他们考虑到种种文件系统api的bug
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
* Therefore, for local files, use FileOutputStream instead. */
val dstream =
if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
new FileOutputStream(uri.getPath)
} else {
hadoopDataStream = Some(fileSystem.create(path))
hadoopDataStream.get
}
}
// 这里序列化用的组件是json4s-jackson
/** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
val eventJson = JsonProtocol.sparkEventToJson(event)
// scalastyle:off println
writer.foreach(_.println(compact(render(eventJson))))
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
hadoopDataStream.foreach(hadoopFlushMethod.invoke(_))
}
if (testing) {
loggedEvents += eventJson
}
}
// Events that do not trigger a flush
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event)
override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event)
override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = logEvent(event)
override def onTaskEnd(event: SparkListenerTaskEnd): Unit = logEvent(event)
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = logEvent(event)
// Events that trigger a flush
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
logEvent(event, flushLogger = true)
}
override def onJobStart(event: SparkListenerJobStart): Unit = logEvent(event, flushLogger = true)
override def onJobEnd(event: SparkListenerJobEnd): Unit = logEvent(event, flushLogger = true)
override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
logEvent(event, flushLogger = true)
}
override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = {
logEvent(event, flushLogger = true)
}
override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
logEvent(event, flushLogger = true)
}
override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
logEvent(event, flushLogger = true)
}
override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
logEvent(event, flushLogger = true)
}
override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
logEvent(event, flushLogger = true)
}
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
logEvent(event, flushLogger = true)
}
// No-op because logging every update would be overkill
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
/**
* Stop logging events. The event log file will be renamed so that it loses the
* ".inprogress" suffix.
*/
def stop(): Unit = {
writer.foreach(_.close())
val target = new Path(logPath)
if (fileSystem.exists(target)) {
if (shouldOverwrite) {
logWarning(s"Event log $target already exists. Overwriting...")
if (!fileSystem.delete(target, true)) {
logWarning(s"Error deleting $target")
}
} else {
throw new IOException("Target log file already exists (%s)".format(logPath))
}
}
fileSystem.rename(new Path(logPath + IN_PROGRESS), target)
}
}
网友评论