美文网首页
spark MetricsSystem 完全揭秘

spark MetricsSystem 完全揭秘

作者: 毕利格次_de99 | 来源:发表于2019-02-21 11:48 被阅读0次

    MetricsSystem系统,顾名思义就是来度量系统的各项指标的,比如说可以度量driver,worker,executor端的jvm相关的信息,来测试服务器的性能。spark中的MetricsSystem底层使用的是第三方的库metrics,metrics是一套开源的度量系统,可以用来计数、监控某项指标的均值等,MetricsSystem基于metrics做了一层封装,但是系统结构和metrics大体相同,理解了metrics后MetricsSystem也比较好理解

    一:开源的度量系统Metrics

    先来看看两个例子

    1. gauges 用来监控某个指标的瞬时值
    /**
     * demo for guages
     *
     * @author xiongmao
     * @create 2019-02-18 5:09 PM
     */
    public class GaugesDemo {
        private static final MetricRegistry metrics = new MetricRegistry();
        private static Queue<String> queue = new LinkedBlockingDeque<String>();
        // 定义reporter,作为度量值的接受端,用于处理接受到的度量值,这里使用的是ConsoleReporter,也就是将度量值
        // 打印到控制台
        // 这里调用了ConsolreReporter的forRegistry,主要是讲reporter注册到MetricRegistry中
        private static ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();
    
        public static void main(String[] args)throws Exception {
            // 启动reporter,开始处理接受到的度量结果
            reporter.start(3, TimeUnit.SECONDS);
            
            // 定义Gauge,也就是度量系统的检测端,其实就是metric,Gauge继承了metric
            Gauge<Integer> gauge = new Gauge<Integer>() {
                @Override
                public Integer getValue() {
                    return queue.size();
                }
            };
            // 将metics注册到Metrics中,这样reporter就知道接受哪个metrics中的数据了
            metrics.register(MetricRegistry.name(GaugesDemo.class,"pending-job", "size"), gauge);
    
    //        JmxReporter jmxReporter = JmxReporter.forRegistry(metrics).build();
    //        jmxReporter.start();
    
            for (int i = 0; i < 20; i++) {
                queue.add("a");
                Thread.sleep(1000);
            }
        }
    }
    
    
    1. Counter 指标计数
    /**
     * counter demo
     * 
     * @author xiongmao
     * @create 2019-02-19 1:49 PM
     */
    public class CounterDemo {
        private static final MetricRegistry metrics = new MetricRegistry();
        // 定义reporter,并注册到MetricRegistry中
        private static final ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();
        // 定义Counter,同样counter也继承了Metrics
        private static final Counter pendingJobs = new Counter();
    
        public static void add(String str) {
            pendingJobs.inc();
        }
        public static void main(String[] args)throws Exception {
            // 注册metrics到MetricRegistry,与reporter对应
            metrics.register(MetricRegistry.name(CounterDemo.class,"pending-jobs"),pendingJobs);
            reporter.start(3, TimeUnit.SECONDS);
            while (true) {
                add("1");
                Thread.sleep(1000);
            }
        }
    }
    

    上面两个例子可以看出Metrics中 MetricRegistry作为对量系统的中枢大脑,metrics和reporter必须要注册到同一个MetricRegistry中才能协同工作,可以猜想一个reporter可以接受多个metrics的度量结果,一个metrics的度量结果可以被多个reporter接受,只要这些reporter和metrics注册到同一个MetricRegistry中即可

    1. 多个reporter和多个metrics注册到同一个MetricRegistry中
    /**
     * counter demo
     * 同一个MetricRegistry 可以注册多个metric和多个reporter,多个metrics的度量输出会被每一个reporter接受
     * @author xiongmao
     * @create 2019-02-19 1:49 PM
     */
    public class CounterDemo {
        private static final MetricRegistry metrics = new MetricRegistry();
    
        private static final ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();
    
        private static final ConsoleReporter reporter1 = ConsoleReporter.forRegistry(metrics).build();
    
        private static Queue<String> queue = new LinkedBlockingDeque<String>();
    
        private static final Counter pendingJobs = new Counter();
    
        public static void add(String str) {
            pendingJobs.inc();
        }
        public static void main(String[] args)throws Exception {
            Gauge<Integer> gauge = new Gauge<Integer>() {
                @Override
                public Integer getValue() {
                    return queue.size();
                }
            };
    
            metrics.register(MetricRegistry.name(CounterDemo.class,"pending-jobs"),pendingJobs);
            metrics.register(MetricRegistry.name(CounterDemo.class,"gauge"),gauge);
    
            reporter.start(3, TimeUnit.SECONDS);
            reporter1.start(3, TimeUnit.SECONDS);
            while (true) {
                add("1");
                queue.add("1");
                Thread.sleep(1000);
            }
        }
    }
    
    
    1. Metirc、reporter、metricRegistry关系图解

    Metric系统的三个组件Metirc、reporter、metricRegistry之间的关系由metricRegistry来协同,只有把metric和reporter都注册到metricRegistry中才能保证度量系统正常的工作

    二:spark中的MetricsSystem
    1. MetricsSystem的体系结构
    • Source:度量系统的数据源,也就是Metric体系中的metric组件
    // source内部维护了一个MetricRegistry,用于注册Metrics
    private[spark] trait Source {
      def sourceName: String
      def metricRegistry: MetricRegistry
    }
    
    //spark中JvmSource具体实现,这个地方有个疑问,上面说到的metricRegistry使用来注册的Metric的,这里Source并没有继承Metric啊,那么这个
    // Source是怎么当做Metric来使用的,这里就要看metricRegistry这个value了,MetricRegistry这个类其实也是继承MetricSet(MetricSet继承了Metric)的,该类里面提供了两个方法
    // register和registerAll,其中register使用注册单个的Metric对象的,registerAll是用来注册MetricSet,该方法最终调用的也是register方法
    // MetricsSystem注册Metric实际上是将source对象的metricRegistry注册到MetricsSystem内部的metricRegistry里面
    // 
    private[spark] class JvmSource extends Source {
      override val sourceName = "jvm"
      override val metricRegistry = new MetricRegistry()
    
      metricRegistry.registerAll(new GarbageCollectorMetricSet)
      metricRegistry.registerAll(new MemoryUsageGaugeSet)
      metricRegistry.registerAll(
        new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer))
    }
    
    // MetricRegistry的register方法
     public <T extends Metric> T register(String name, T metric) throws IllegalArgumentException {
            if (metric instanceof MetricSet) {
                // 如果是MetricSet则调用registerAll
                //此处很关键,此处表明MetricRegistry可以注册MetricRegistry的对象,因为MetricRegistry是MetricSet的子类
                this.registerAll(name, (MetricSet)metric);
            } else {
                Metric existing = (Metric)this.metrics.putIfAbsent(name, metric);
                if (existing != null) {
                    throw new IllegalArgumentException("A metric named " + name + " already exists");
                }
    
                this.onMetricAdded(name, metric);
            }
    
            return metric;
        }
    
    // MetricRegistry的registerAll方法
     public void registerAll(MetricSet metrics) throws IllegalArgumentException {
            this.registerAll((String)null, metrics);
        }
    
    private void registerAll(String prefix, MetricSet metrics) throws IllegalArgumentException {
            
            Iterator var3 = metrics.getMetrics().entrySet().iterator();
    
            while(var3.hasNext()) {
                Entry<String, Metric> entry = (Entry)var3.next();
                if (entry.getValue() instanceof MetricSet) {
                    this.registerAll(name(prefix, (String)entry.getKey()), (MetricSet)entry.getValue());
                } else {
                    // 最终还是调用register
                    this.register(name(prefix, (String)entry.getKey()), (Metric)entry.getValue());
                }
            }
    
        }
    
    • Sink:度量系统度量结果的接收端,也就是Metric体系中的reporter
    // Sink trait ,从这里看不出任何reporter的影子
    private[spark] trait Sink {
      def start(): Unit
      def stop(): Unit
      def report(): Unit
    }
    
    // ConsoleSink的具体实现,这个地方有点不合理,trait没有任何信号透露出MetricRegistr 这个对象怎么和Sink产生关联,这里使用的在构造函数中
    // 传入这个registry,这一点不符合抽象编程的规范,这里应该把Sink改成一个具体的类或者类似于Source一样的,维护一个MetricRegistr对象
    // 从这个ConsoleSink可以看出,在Sink具体类实例化是会传入一个 MetricRegistr对象并初始化reporter,这样Sink和reporter就关联起来了
    private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
        securityMgr: SecurityManager) extends Sink {
      val CONSOLE_DEFAULT_PERIOD = 10
      val CONSOLE_DEFAULT_UNIT = "SECONDS"
    
      val CONSOLE_KEY_PERIOD = "period"
      val CONSOLE_KEY_UNIT = "unit"
    
      val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
        case Some(s) => s.toInt
        case None => CONSOLE_DEFAULT_PERIOD
      }
    
      val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
        case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
        case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
      }
    
      MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
    
      // sink内部持有的reporter对象,  将这个reporter注册到registry中
      val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
          .convertDurationsTo(TimeUnit.MILLISECONDS)
          .convertRatesTo(TimeUnit.SECONDS)
          .build()
    
      override def start() {
        reporter.start(pollPeriod, pollUnit)
      }
    
      override def stop() {
        reporter.stop()
      }
    
      override def report() {
        reporter.report()
      }
    }
    
    • MetricsSystem:度量系统的中枢大脑,里面维护了一个MetricRegistry的实例,Source和Sink的都是通过这个MetricRegistry注册的。整体上spark中的MetricsSystem的设计思路和原生Metric的设计思路一样,MetricsSystem其实就是对原生3个组件的封装
    // 内部维护一个MetricRegistry对象,用来注册Source和sink,使用该registry注册的source和sink就可以协同工作了
    private val registry = new MetricRegistry()
    
    1. MetricsSystem的工作原理
    • 初始化:MeticsSystem的初始化是在SparkContext中完成的,具体的是在SparkEnv创建的过程中创建的
    //如果是driver端,则要等待taskScheduler提交作业后的app id,如果是executor的话则直接启动
    val metricsSystem = if (isDriver) {
          // Don't start metrics system right now for Driver.
          // We need to wait for the task scheduler to give us an app ID.
          // Then we can start the metrics system.
          MetricsSystem.createMetricsSystem("driver", conf, securityManager)
        } else {
          // We need to set the executor ID before the MetricsSystem is created because sources and
          // sinks specified in the metrics configuration file will want to incorporate this executor's
          // ID into the metrics they report.
          conf.set("spark.executor.id", executorId)
          val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
          ms.start()
          ms
        }
    
    • 启动:如果是driver端,需要等到taskScheduler启动app之后才会启动MetricsSystem(需要appId),如果是executor端的话,在创建之后即可启动
     _applicationId = _taskScheduler.applicationId()
        _applicationAttemptId = taskScheduler.applicationAttemptId()
        _conf.set("spark.app.id", _applicationId)
        if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
          System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
        }
        _ui.foreach(_.setAppId(_applicationId))
        _env.blockManager.initialize(_applicationId)
    
        // The metrics system for Driver need to be set spark.app.id to app ID.
        // So it should start after we get app ID from the task scheduler and set spark.app.id.
        _env.metricsSystem.start()
        _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
    
     // start()方法
     def start() {
        require(!running, "Attempting to start a MetricsSystem that is already running")
        running = true
        StaticSources.allSources.foreach(registerSource)
        registerSources()
        registerSinks()
        sinks.foreach(_.start)
      }
    

    start()方法内部调用了registerSources和registerSinks,也就是将source和sink注册到MetricsSystem内部对象MetricRegistry中

    //先调用registerSources(),获取到instance(driver,worker,executor等)对应配置文件中的source
    //然后调用registerSource(source: Source)方法,将source注册到MetricsSystem中的内部对象MetricRegistry中
    private def registerSources() {
        val instConfig = metricsConfig.getInstance(instance)
        val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
    
        // Register all the sources related to instance
        sourceConfigs.foreach { kv =>
          val classPath = kv._2.getProperty("class")
          try {
            val source = Utils.classForName(classPath).newInstance()
            registerSource(source.asInstanceOf[Source])
          } catch {
            case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
          }
        }
      }
    
    def registerSource(source: Source) {
        sources += source
        try {
          val regName = buildRegistryName(source)
          // 此处就是上面解释Source里面说的将Source内部的metricRegistry注册到MetricsSystem里面的metricRegistry中
          registry.register(regName, source.metricRegistry)
        } catch {
          case e: IllegalArgumentException => logInfo("Metrics already registered", e)
        }
      }
    
    // 注册Sinks,重点是下面的newInstance
    private def registerSinks() {
        val instConfig = metricsConfig.getInstance(instance)
        val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
    
        sinkConfigs.foreach { kv =>
          val classPath = kv._2.getProperty("class")
          if (null != classPath) {
            try {
              // 实例化时传入registry,并用这个registry注册内部的reporter对象,从抽象编程来说,这个地方很不合理
              val sink = Utils.classForName(classPath)
                .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
                .newInstance(kv._2, registry, securityMgr)
              if (kv._1 == "servlet") {
                metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
              } else {
                sinks += sink.asInstanceOf[Sink]
              }
            } catch {
              case e: Exception =>
                logError("Sink class " + classPath + " cannot be instantiated")
                throw e
            }
          }
        }
      }
    }
    

    注册号Source和sink之后,调用循环调用sink的的start()方法,度量系统开始工作

    三:Q&A
    1. Sink trait 设计有明显的问题,怎么改进?
    2. metricServlet这个类是spark默认提供的Sink,可以通过http的方式访问source度量的到的结果,但是在这个sink并没有调用start()方法启动,那么这个类是如果采集到source发送过来的数据的?

    相关文章

      网友评论

          本文标题:spark MetricsSystem 完全揭秘

          本文链接:https://www.haomeiwen.com/subject/ebnlyqtx.html