Flink 指标(一)

作者: Alex90 | 来源:发表于2019-05-06 15:31 被阅读4次

Flink 自带一个度量系统,允许收集和公开指标到外部系统。

注册指标

可以通过继承 RichFunction,在继承类里面调用 getRuntimeContext().getMetricGroup() 来访问 Flink 的指标系统,这个方法返回一个 MetricGroup 对象,可以通过这个对象创建和注册新的度量指标。

度量类型

支持 CountersGaugesHistogramsMeters 这四个类型的度量值。

Counter(计数器)

Counter 用于计数。可以使用 inc()/inc(long n)dec()/dec(long n) 更新(增加或减少)计数器。可以通过调用 MetricGroupcounter(String name) 方法来创建和注册 Counter 类型的度量值。

class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
      // 使用默认 Counter 实现
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter")
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}

也可以使用自己的 Counter 实现:

class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
      // 使用自定义 Counter 实现
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter())
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}

Gauges(测量)

Gauge 根据需要可提供任何类型的值。首先需要创建一个实现了 org.apache.flink.metrics.Gauge 接口的类,这个类对返回值的类型没有限制。然后,通过调用 MetricGroupgauge(String name, Gauge gauge) 方法创建和注册 Gauge 类型的度量指标。

new class MyMapper extends RichMapFunction[String,String] {
  @transient private var valueToExpose = 0

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext()
      .getMetricGroup()
      .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  }

  override def map(value: String): String = {
    valueToExpose += 1
    value
  }
}

报告会把导出的数据转换成 String 类型,所以返回的统计类型需要实现 toString() 方法。

Histograms(直方图)

Histogram 用来测量长期变化值的分布。可以用过调用 MetricGrouphistogram(String name, Histogram histogram) 方法创建和注册 Histogram 类型的度量指标。

class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var histogram: Histogram = _

  override def open(parameters: Configuration): Unit = {
    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram())
  }

  override def map(value: Long): Long = {
    histogram.update(value)
    value
  }
}

Flink 没有提供默认 Histogram 实现 ,但提供了一个允许使用 Codahale / DropWizard 直方图的包装类(Wrapper),添加以下依赖项:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.6.1</version>
</dependency>

代码如下:

class MyMapper extends RichMapFunction[Long, Long] {
  @transient private var histogram: Histogram = _

  override def open(config: Configuration): Unit = {
    com.codahale.metrics.Histogram dropwizardHistogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))

    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
  }

  override def map(value: Long): Long = {
    histogram.update(value)
    value
  }
}

Meters(仪表)

Meter 用来衡量平均吞吐量。可以通过 markEvent() 方法用来注册事件的发生。可以通过 markEvent(long n) 方法注册多个事件同时发生。可以通过调用 MetricGroupmeter(String name, Meter meter) 方法用来注册 Meter 类型的指标。

class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _

  override def open(config: Configuration): Unit = {
    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter())
  }

  override def map(value: Long): Long = {
    meter.markEvent()
    value
  }
}

Flink 提供了一个允许使用 Codahale / DropWizard 表的 Wrapper,添加以下依赖项:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.6.1</version>
</dependency>

代码如下:

class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _

  override def open(config: Configuration): Unit = {
    com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter()

    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
  }

  override def map(value: Long): Long = {
    meter.markEvent()
    value
  }
}

作用域(Scope)

为每个被报告的度量值分配一个标识符和一组键值对。标识符基于3个部分:

  1. 注册度量标准时的用户定义名称
  2. 可选的用户定义范围
  3. 系统提供的范围。

例如,如果A.B是系统作用域的,C.D是用户作用域的,E是度量值的名称。那么 A.B.C.D.E 就是这个度量值的标识符。
可以通过设置 conf/flink-conf.yaml 中的 metrics.scope.delimiterKeys 来配置要用于标识符的分隔符(默认值: .) 。

用户作用域(User Scope)

用户范围可以通过调用 MetricGroup#addGroup(String name)MetricGroup#addGroup(int name)Metric#addGroup(String key, String value) 来定义。这些方法会影响 MetricGroup#getMetricIdentifierMetricGroup#getScopeComponents 的返回。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter")

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

系统作用域(System Scope)

系统范围包含有关度量的上下文信息,例如:在哪个 Task 中注册或该 Task 属于哪个 Job。

可以通过设置 conf/flink-conf.yaml 中的以下键,来配置需要包含哪些上下文信息。这些键值的格式由常量(比如“taskmanager”)和变量(比如“<task_id>”)组成,其中变量会在运行时被替换掉:

  • metrics.scope.jm
    默认值:<host>.jobmanager
    应用于 Scope 为 JobManager 的所有指标。

  • metrics.scope.jm.job
    默认值:<host>.jobmanager.<job_name>
    应用于 Scope 为 JobManager 和作业的所有指标。

  • metrics.scope.tm
    默认值:<host>.taskmanager.<tm_id>
    应用于 Scope 为 TaskManager 的所有指标。

  • metrics.scope.tm.job
    默认值:<host>.taskmanager.<tm_id>.<job_name>
    应用于 Scope 为 TaskManager 和作业的所有指标。

  • metrics.scope.task
    默认值:<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
    应用于 Scope 为 Task 的所有指标。

  • metrics.scope.operator
    默认值:<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
    应用于 Scope 为 Operator 的所有指标。

  1. 变量的数量或顺序没有限制。
  2. 变量区分大小写。
  3. 算子指标的默认作用域将产生类似于 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric 的标识符
  4. 如果还想包含任务名称但省略 TaskManager 信息,则可以指定以下格式:
    metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

将产生类似于 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric 的标识符

  1. 对于此格式字符串,如果同时多次运行同一作业,则可能发生标识符冲突,这可能导致度量标准数据不一致。因此,建议使用 <job_id> 或通过为作业和算子分配唯一名称来提供一定程度的唯一性的格式字符串。

所有变量列表

  • JobManager:<host>
  • TaskManager:<host><tm_id>
  • 作业:<job_id><作业名称>
  • 任务:<task_id><task_name><task_attempt_id><task_attempt_num><subtask_index>
  • 算子:<operator_id><operator_name><subtask_index>
    对于 Batch API,<operator_id> 始终等于 <task_id>

用户变量

用户变量可以通过调用 MetricGroup#addGroup(String key, String value) 来定义。会影响 MetricGroup#getMetricIdentifierMetricGroup#getScopeComponentsMetricGroup#getAllVariables() 返回。用户变量不能用于 Scope 定义中。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

Reference:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html

相关文章

  • Flink 指标(一)

    Flink 自带一个度量系统,允许收集和公开指标到外部系统。 注册指标 可以通过继承 RichFunction,在...

  • Flink 指标(二)

    报告(Reporter) 通过 conf/flink-conf.yaml 文件配置一个或多个 Reporters ...

  • Flink指标含义

    监控 State 和 Checkpoint 监控 Checkpoint 行为的最简单方法是通过 WebUI 界面,...

  • flink系统学习

    flink自身提供的2中metric指标监控的方式 图形界面,通过flink自带的webui来查看 restful...

  • flink Metrics及其使用

    flink metric用来对外暴露系统内部的一些运行指标,比如flink框架运行时的JVM相关配置,或者基于fl...

  • Flink的内置指标分析

    故障时间:15:17:00缺点:flink内置的度量,会在故障的时候输出故障时候的数值,然后恢复的时候,重新计算,...

  • Flink Metrics指标采集方案

    1、背景 本文讨论的都是基于Flink On K8s场景下,该场景下存在几个特点,一是存在线上业务系统资源复用,二...

  • Flink:注册Table Kafka Sink报错处理

    实现 实现Flink Job:读取阿里云LogService日志,统计事件后将指标写入搭建的Kafka中。JAVA...

  • Flink中如何实现一个自定义MetricReporter

    什么是 Metrics 在 flink 任务运行的过程中,用户通常想知道任务运行的一些基本指标,比如吞吐量、内存和...

  • 一文读懂Apache Flink技术

    一、Flink介绍 1.1 Flink基石 1.2 Flink API 1.3 Flink的用途 1.4 Flin...

网友评论

    本文标题:Flink 指标(一)

    本文链接:https://www.haomeiwen.com/subject/puxdoqtx.html