美文网首页
3.4 EventLoggingListener - 监听器触发

3.4 EventLoggingListener - 监听器触发

作者: GongMeng | 来源:发表于2018-11-14 15:52 被阅读0次

    1. 概要

    我们看前面的设计图, 在SparkContext中我们重点画出了连个Listener.
    EventLoggingListener用于把监听到的Event信息通过Json格式下刷到存储中, 一般是HDFS.

    2. 代码

    /**
     * A SparkListener that logs events to persistent storage.
     *
     * Event logging is specified by the following configurable parameters:
     *   spark.eventLog.enabled - Whether event logging is enabled.
     *   spark.eventLog.compress - Whether to compress logged events
     *   spark.eventLog.overwrite - Whether to overwrite any existing files.
     *   spark.eventLog.dir - Path to the directory in which events are logged.
     *   spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
     */
    private[spark] class EventLoggingListener(
        appId: String,
        appAttemptId : Option[String],
        logBaseDir: URI,
        sparkConf: SparkConf,
        hadoopConf: Configuration)
      extends SparkListener with Logging {
    
      import EventLoggingListener._
    
      // 下刷日志的文件夹地址, 一般来说是刷到HDFS里
      def this(appId: String, appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf) =
        this(appId, appAttemptId, logBaseDir, sparkConf,
          SparkHadoopUtil.get.newConfiguration(sparkConf))
    
     // 一系列的配置项, 可以注意到为了保证下刷的速度, 默认每100kb刷一次
      private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
      private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
      private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
      private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
      private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
      private val compressionCodec =
        if (shouldCompress) {
          Some(CompressionCodec.createCodec(sparkConf))
        } else {
          None
        }
      
      // 反射出来压缩用的方法
      private val compressionCodecName = compressionCodec.map { c =>
        CompressionCodec.getShortName(c.getClass.getName)
      }
    
      // Only defined if the file system scheme is not local
      private var hadoopDataStream: Option[FSDataOutputStream] = None
       
      // 这个地方非常有趣, 设计的时候考虑到了Hadoop的API可能的变化....
      // The Hadoop APIs have changed over time, so we use reflection to figure out
      // the correct method to use to flush a hadoop data stream. See SPARK-1518
      // for details.
      private val hadoopFlushMethod = {
        val cls = classOf[FSDataOutputStream]
        scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync"))
      }
    
      private var writer: Option[PrintWriter] = None
    
      /**
       * Creates the log file in the configured log directory.
       */
      def start() {}
        // 各种列下刷过程和错误控制
        // 保留下面这个有趣的注释, 伯克利的团队对兼容的执着要求他们考虑到种种文件系统api的bug
        /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
         * Therefore, for local files, use FileOutputStream instead. */
        val dstream =
          if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
            new FileOutputStream(uri.getPath)
          } else {
            hadoopDataStream = Some(fileSystem.create(path))
            hadoopDataStream.get
          }
      }
    
      // 这里序列化用的组件是json4s-jackson
      /** Log the event as JSON. */
      private def   logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
        val eventJson = JsonProtocol.sparkEventToJson(event)
        // scalastyle:off println
        writer.foreach(_.println(compact(render(eventJson))))
        // scalastyle:on println
        if (flushLogger) {
          writer.foreach(_.flush())
          hadoopDataStream.foreach(hadoopFlushMethod.invoke(_))
        }
        if (testing) {
          loggedEvents += eventJson
        }
      }
    
      // Events that do not trigger a flush
      override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event)
    
      override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event)
    
      override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = logEvent(event)
    
      override def onTaskEnd(event: SparkListenerTaskEnd): Unit = logEvent(event)
    
      override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = logEvent(event)
    
      // Events that trigger a flush
      override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
        logEvent(event, flushLogger = true)
      }
    
      override def onJobStart(event: SparkListenerJobStart): Unit = logEvent(event, flushLogger = true)
    
      override def onJobEnd(event: SparkListenerJobEnd): Unit = logEvent(event, flushLogger = true)
    
      override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
        logEvent(event, flushLogger = true)
      }
    
      override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = {
        logEvent(event, flushLogger = true)
      }
    
      override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
        logEvent(event, flushLogger = true)
      }
    
      override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
        logEvent(event, flushLogger = true)
      }
    
      override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
        logEvent(event, flushLogger = true)
      }
      override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
        logEvent(event, flushLogger = true)
      }
    
      override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
        logEvent(event, flushLogger = true)
      }
    
      // No-op because logging every update would be overkill
      override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
    
      // No-op because logging every update would be overkill
      override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
    
      /**
       * Stop logging events. The event log file will be renamed so that it loses the
       * ".inprogress" suffix.
       */
      def stop(): Unit = {
        writer.foreach(_.close())
    
        val target = new Path(logPath)
        if (fileSystem.exists(target)) {
          if (shouldOverwrite) {
            logWarning(s"Event log $target already exists. Overwriting...")
            if (!fileSystem.delete(target, true)) {
              logWarning(s"Error deleting $target")
            }
          } else {
            throw new IOException("Target log file already exists (%s)".format(logPath))
          }
        }
        fileSystem.rename(new Path(logPath + IN_PROGRESS), target)
      }
    
    }
    
    

    相关文章

      网友评论

          本文标题:3.4 EventLoggingListener - 监听器触发

          本文链接:https://www.haomeiwen.com/subject/xisjfqtx.html