美文网首页
度量系统--Metrics

度量系统--Metrics

作者: 炮灰向前冲啦 | 来源:发表于2018-03-29 11:34 被阅读0次

    Spark的度量系统有以下几部分,也可以参照MetricsSystem类的注释部分

    • Instance: 数据实例。Spark的Instance有Master、Worker、ApplicationInfo、StreamingContext等,主要用来提供Source数据、启停MetricsSystem
    • Source: 度量数据输入源。Source采集的数据来源于Instance实例属性
    • Sink: 度量数据输出源。Spark使用MetricsServlet作为默认Sink
    • MetricsConfig: 度量需要的配置信息。initialize()方法初始化properties
    • MetricsSystem: instance粒度的Source、Sink控制中心

    Source

    Spark将度量数据来源抽象为Source接口。提供了ApplicationSource、MasterSource、WorkerSource、DAGSchedulerSource、StreamingSource、JvmSource等实现

    private[spark] trait Source {
      def sourceName: String
      def metricRegistry: MetricRegistry
    }
    
    • sourceName: 度量源名称
    • metricRegistry: 度量源注册对象

    具体分析下MasterSource、WorkerSource、JvmSource输入源

    private[spark] class MasterSource(val master: Master) extends Source {
      override val metricRegistry = new MetricRegistry()
      override val sourceName = "master"
    
      // Gauge for worker numbers in cluster
      metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] {
        override def getValue: Int = master.workers.size
      })
    
      // Gauge for alive worker numbers in cluster
      metricRegistry.register(MetricRegistry.name("aliveWorkers"), new Gauge[Int]{
        override def getValue: Int = master.workers.count(_.state == WorkerState.ALIVE)
      })
    
      // Gauge for application numbers in cluster
      metricRegistry.register(MetricRegistry.name("apps"), new Gauge[Int] {
        override def getValue: Int = master.apps.size
      })
    
      // Gauge for waiting application numbers in cluster
      metricRegistry.register(MetricRegistry.name("waitingApps"), new Gauge[Int] {
        override def getValue: Int = master.apps.count(_.state == ApplicationState.WAITING)
      })
    }
    
    private[worker] class WorkerSource(val worker: Worker) extends Source {
      override val sourceName = "worker"
      override val metricRegistry = new MetricRegistry()
    
      metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] {
        override def getValue: Int = worker.executors.size
      })
    
      // Gauge for cores used of this worker
      metricRegistry.register(MetricRegistry.name("coresUsed"), new Gauge[Int] {
        override def getValue: Int = worker.coresUsed
      })
    
      // Gauge for memory used of this worker
      metricRegistry.register(MetricRegistry.name("memUsed_MB"), new Gauge[Int] {
        override def getValue: Int = worker.memoryUsed
      })
    
      // Gauge for cores free of this worker
      metricRegistry.register(MetricRegistry.name("coresFree"), new Gauge[Int] {
        override def getValue: Int = worker.coresFree
      })
    
      // Gauge for memory free of this worker
      metricRegistry.register(MetricRegistry.name("memFree_MB"), new Gauge[Int] {
        override def getValue: Int = worker.memoryFree
      })
    }
    

    MetricRegistry的Gauge统计数据来源于Master、Worker对象的字段属性

    JvmSource的MetricSet来源于metrics-jvm包的实现

    private[spark] class JvmSource extends Source {
      override val sourceName = "jvm"
      override val metricRegistry = new MetricRegistry()
    
      metricRegistry.registerAll(new GarbageCollectorMetricSet)
      metricRegistry.registerAll(new MemoryUsageGaugeSet)
      metricRegistry.registerAll(
        new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer))
    }
    

    Source用来创建MetricRegistry对象,并register需要统计的数据指标,指标来源于Instance实例对象属性

    Sink

    Spark将度量数据统计输出源抽象为Sink接口。提供了ConsoleSink、CsvSink、MetricsServlet、GraphiteSink、JmxSink、Slf4jSink等实现

    private[spark] trait Sink {
      def start(): Unit
      def stop(): Unit
      def report(): Unit
    }
    
    • MetricsServlet: 在Spark UI的jetty服务中创建ServletContextHandler,将度量数据统计展示在浏览器

    具体分析下Slf4jSink实现

    private[spark] class Slf4jSink(
        val property: Properties,
        val registry: MetricRegistry,
        securityMgr: SecurityManager)
      extends Sink {
      val SLF4J_DEFAULT_PERIOD = 10
      val SLF4J_DEFAULT_UNIT = "SECONDS"
    
      val SLF4J_KEY_PERIOD = "period"
      val SLF4J_KEY_UNIT = "unit"
    
      val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match {
        case Some(s) => s.toInt
        case None => SLF4J_DEFAULT_PERIOD
      }
    
      val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match {
        case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
        case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT)
      }
      // 检查scheduleAtFixedRate周期时间最短1s
      MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
    
      val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry)
        .convertDurationsTo(TimeUnit.MILLISECONDS)
        .convertRatesTo(TimeUnit.SECONDS)
        .build()
    
      override def start() {
        reporter.start(pollPeriod, pollUnit)
      }
    
      override def stop() {
        reporter.stop()
      }
    
      override def report() {
        reporter.report()
      }
    }
    

    主要看start()方法,需要一个reporter对象,以及数据产生的周期时间pollPeriod、pollUnit。start再调用ScheduledReporter.start()

    public void start(long period, TimeUnit unit) {
        // executor对象是Executors.newSingleThreadScheduledExecutor实现
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    report();
                } catch (Exception ex) {
                    LOG.error("Exception thrown from {}#report. Exception was suppressed.", ScheduledReporter.this.getClass().getSimpleName(), ex);
                }
            }
        }, period, period, unit);
    }
    

    Sink需要创建reporter、pollPeriod、pollUnit。周期性获取Source数据并reporter

    MetricsConfig

    读取Metrics相关的配置信息

    private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
    
      private val DEFAULT_PREFIX = "*"
      private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
      private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties"
    
      private[metrics] val properties = new Properties()
      private[metrics] var perInstanceSubProperties: mutable.HashMap[String, Properties] = null
      
      // 设置default Properties属性
      private def setDefaultProperties(prop: Properties) {
        prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
        prop.setProperty("*.sink.servlet.path", "/metrics/json")
        prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
        prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
      }
    
      /**
       * Load properties from various places, based on precedence
       * If the same property is set again latter on in the method, it overwrites the previous value
       */
      // 入口方法,加载配置信息
      def initialize() {
        // Add default properties in case there's no properties file
        setDefaultProperties(properties)
    
        loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))
    
        // Also look for the properties in provided Spark configuration
        val prefix = "spark.metrics.conf."
        conf.getAll.foreach {
          case (k, v) if k.startsWith(prefix) =>
            properties.setProperty(k.substring(prefix.length()), v)
          case _ =>
        }
    
        // Now, let's populate a list of sub-properties per instance, instance being the prefix that
        // appears before the first dot in the property name.
        // Add to the sub-properties per instance, the default properties (those with prefix "*"), if
        // they don't have that exact same sub-property already defined.
        //
        // For example, if properties has ("*.class"->"default_class", "*.path"->"default_path",
        // "driver.path"->"driver_path"), for driver specific sub-properties, we'd like the output to be
        // ("driver"->Map("path"->"driver_path", "class"->"default_class")
        // Note how class got added to based on the default property, but path remained the same
        // since "driver.path" already existed and took precedence over "*.path"
        perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX)
        if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) {
          val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala
          for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX);
               (k, v) <- defaultSubProperties if (prop.get(k) == null)) {
            prop.put(k, v)
          }
        }
      }
    
      /**
       * Take a simple set of properties and a regex that the instance names (part before the first dot)
       * have to conform to. And, return a map of the first order prefix (before the first dot) to the
       * sub-properties under that prefix.
       *
       * For example, if the properties sent were Properties("*.sink.servlet.class"->"class1",
       * "*.sink.servlet.path"->"path1"), the returned map would be
       * Map("*" -> Properties("sink.servlet.class" -> "class1", "sink.servlet.path" -> "path1"))
       * Note in the subProperties (value of the returned Map), only the suffixes are used as property
       * keys.
       * If, in the passed properties, there is only one property with a given prefix, it is still
       * "unflattened". For example, if the input was Properties("*.sink.servlet.class" -> "class1"
       * the returned Map would contain one key-value pair
       * Map("*" -> Properties("sink.servlet.class" -> "class1"))
       * Any passed in properties, not complying with the regex are ignored.
       *
       * @param prop the flat list of properties to "unflatten" based on prefixes
       * @param regex the regex that the prefix has to comply with
       * @return an unflatted map, mapping prefix with sub-properties under that prefix
       */
       // 参考下面图片示例
      def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
        val subProperties = new mutable.HashMap[String, Properties]
        prop.asScala.foreach { kv =>
          if (regex.findPrefixOf(kv._1.toString).isDefined) {
            val regex(prefix, suffix) = kv._1.toString
            subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2.toString)
          }
        }
        subProperties
      }
    
      // 当key不存在时,获取*对应的properties属性
      def getInstance(inst: String): Properties = {
        perInstanceSubProperties.get(inst) match {
          case Some(s) => s
          case None => perInstanceSubProperties.getOrElse(DEFAULT_PREFIX, new Properties)
        }
      }
    
      /**
       * Loads configuration from a config file. If no config file is provided, try to get file
       * in class path.
       */
      private[this] def loadPropertiesFromFile(path: Option[String]): Unit = {
        var is: InputStream = null
        try {
          is = path match {
            // 标准写法。path存在时FileInputStream读取;不存在读取项目中的metrics.properties文件,通过classloader加载: Utils.getSparkClassLoader.getResourceAsStream
            case Some(f) => new FileInputStream(f)
            case None => Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)
          }
    
          if (is != null) {
            // load进properties里
            properties.load(is)
          }
        } catch {
          case e: Exception =>
            val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME)
            logError(s"Error loading configuration file $file", e)
        } finally {
          if (is != null) {
            // 切记必须close
            is.close()
          }
        }
      }
    }
    
    subprop.png

    MetricsSystem

    负责register Sources、Sinks,并start sinks。MetricsSystem不是系统的控制中心,而是每个instance一个MetricsSystem对象,负责instance粒度的控制

    MetricsSystem类三个核心方法: registerSources()、registerSinks()、sinks.foreach(_.start)

    private[spark] class MetricsSystem private (
        val instance: String,
        conf: SparkConf,
        securityMgr: SecurityManager)
      extends Logging {
      // 构造MetricsConfig对象,用于读取配置信息
      private[this] val metricsConfig = new MetricsConfig(conf)
    
      private val sinks = new mutable.ArrayBuffer[Sink]
      private val sources = new mutable.ArrayBuffer[Source]
      private val registry = new MetricRegistry()
    
      private var running: Boolean = false
    
      // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
      private var metricsServlet: Option[MetricsServlet] = None
    
      /**
       * Get any UI handlers used by this metrics system; can only be called after start().
       */
      def getServletHandlers: Array[ServletContextHandler] = {
        require(running, "Can only call getServletHandlers on a running MetricsSystem")
        metricsServlet.map(_.getHandlers(conf)).getOrElse(Array())
      }
      // MetricsConfig对象初始化
      metricsConfig.initialize()
    
      def start() {
        require(!running, "Attempting to start a MetricsSystem that is already running")
        running = true
        // 注册StaticSources,也就是CodegenMetrics、HiveCatalogMetrics
        StaticSources.allSources.foreach(registerSource)
        // 注册Sources
        registerSources()
        // 获取Sinks
        registerSinks()
        // 启动Sinks
        sinks.foreach(_.start)
      }
    
      def stop() {
        if (running) {
          // foreach调用Sinks的stop方法
          sinks.foreach(_.stop)
        } else {
          logWarning("Stopping a MetricsSystem that is not running")
        }
        running = false
      }
    
      def report() {
        // foreach调用Sinks的report方法
        sinks.foreach(_.report())
      }
    
      /**
       * Build a name that uniquely identifies each metric source.
       * The name is structured as follows: <app ID>.<executor ID (or "driver")>.<source name>.
       * If either ID is not available, this defaults to just using <source name>.
       *
       * @param source Metric source to be named by this method.
       * @return An unique metric name for each combination of
       *         application, executor/driver and metric source.
       */
      // 构建registry name
      private[spark] def buildRegistryName(source: Source): String = {
        val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))
    
        val executorId = conf.getOption("spark.executor.id")
        val defaultName = MetricRegistry.name(source.sourceName)
    
        if (instance == "driver" || instance == "executor") {
          if (metricsNamespace.isDefined && executorId.isDefined) {
            // 当instance是driver或executor时,name的元素构成
            // {{conf.getOption("spark.app.id")}}.{{conf.getOption("spark.executor.id")}}.{{source.sourceName}}
            MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName)
          } else {
            // Only Driver and Executor set spark.app.id and spark.executor.id.
            // Other instance types, e.g. Master and Worker, are not related to a specific application.
            if (metricsNamespace.isEmpty) {
              logWarning(s"Using default name $defaultName for source because neither " +
                s"${METRICS_NAMESPACE.key} nor spark.app.id is set.")
            }
            if (executorId.isEmpty) {
              logWarning(s"Using default name $defaultName for source because spark.executor.id is " +
                s"not set.")
            }
            defaultName
          }
        } else { defaultName }
      }
    
      def getSourcesByName(sourceName: String): Seq[Source] =
        sources.filter(_.sourceName == sourceName)
    
      // 注册单个source
      def registerSource(source: Source) {
        sources += source
        try {
          val regName = buildRegistryName(source)
          registry.register(regName, source.metricRegistry)
        } catch {
          case e: IllegalArgumentException => logInfo("Metrics already registered", e)
        }
      }
      
      // 删除source
      def removeSource(source: Source) {
        sources -= source
        val regName = buildRegistryName(source)
        registry.removeMatching(new MetricFilter {
          def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
        })
      }
    
      // 注册所有以source开头的数据源
      private def registerSources() {
        val instConfig = metricsConfig.getInstance(instance)
        // MetricsSystem.SOURCE_REGEX: "^source\\.(.+)\\.(.+)".r
        val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
    
        // Register all the sources related to instance
        sourceConfigs.foreach { kv =>
          val classPath = kv._2.getProperty("class")
          try {
            // 反射对象。这里只能反射无参数的Source对象,比如JvmSource
            val source = Utils.classForName(classPath).newInstance()
            registerSource(source.asInstanceOf[Source])
          } catch {
            case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
          }
        }
      }
    
      // 获取以sink开头的Sinks
      private def registerSinks() {
        val instConfig = metricsConfig.getInstance(instance)
        // 以sink开头的属性配置: "^sink\\.(.+)\\.(.+)".r
        val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
    
        sinkConfigs.foreach { kv =>
          val classPath = kv._2.getProperty("class")
          if (null != classPath) {
            try {
              // 传入构造函数参数值创建sink对象: kv._2, registry, securityMgr
              val sink = Utils.classForName(classPath)
                .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
                .newInstance(kv._2, registry, securityMgr)
              if (kv._1 == "servlet") {
                // key是servlet时,转换成MetricsServlet对象
                metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
              } else {
                // 否则添加到sinks列表
                sinks += sink.asInstanceOf[Sink]
              }
            } catch {
              case e: Exception =>
                logError("Sink class " + classPath + " cannot be instantiated")
                throw e
            }
          }
        }
      }
    }
    

    总结

    先看下metrics.properties.template模板

    *.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
    
    *.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
    
    *.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink
    *.sink.statsd.prefix=spark
    
    *.sink.console.period=10
    *.sink.console.unit=seconds
    
    master.sink.console.period=15
    master.sink.console.unit=seconds
    
    *.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
    
    *.sink.csv.period=1
    *.sink.csv.unit=minutes
    
    *.sink.csv.directory=/tmp/
    
    worker.sink.csv.period=10
    worker.sink.csv.unit=minutes
    
    *.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink
    
    *.sink.slf4j.period=1
    *.sink.slf4j.unit=minutes
    
    master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
    
    worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
    
    driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
    
    executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
    
    1. 先读取properties配置信息
    2. 根据instance name,获取${name}开头的,不存在时读取*开头的属性值,生成instConfig对象
    3. instConfig基础上再分别获取source或sink开头的sourceConfigs对象
    4. sourceConfigs获取source class、sink class反射对象。source反射时调用默认的无参构造函数,只能反射比如JvmSource,对于MasterSource需要在Master类里new出来;sink反射时传入了构造函数参数值,参数也是从prop里读取的
    5. register sources、sinks start
    6. MetricsSystem负责source、sink的启停,而每个instance单独启停自身的metrics

    相关文章

      网友评论

          本文标题:度量系统--Metrics

          本文链接:https://www.haomeiwen.com/subject/qwkmcftx.html