Flink Metrics

作者: Alex90 | 来源:发表于2021-06-16 14:24 被阅读0次

    主要引用官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/ops/metrics/

    Flink 提供了 Metric 系统,允许收集 Metric 并暴露给外部系统。

    注册 Metrics

    可以通过任何继承了 RichFunction 的函数访问 Metric 系统。调用 getRuntionContext().getMetricGroup() 方法,该方法返回一个 MetricGroup 对象,可以创建并注册 Metric。

    Metric 类型

    Counter

    Counter 用来计数。当前值可以使用 inc()/inc(long n)dec()/dec(long n) 进行增减。

    // 实现 RichMapFunction 接口
    public class MyMapper extends RichMapFunction<String, String> {
      private transient Counter counter;
    
      @Override
      public void open(Configuration config) {
        // 定义一个 Counter Metric
        this.counter = getRuntimeContext()
          .getMetricGroup()
          .counter("myCounter");
      }
    
      @Override
      public String map(String value) throws Exception {
        // Counter 增加 1
        this.counter.inc();
        return value;
      }
    }
    

    Gauge

    Gauge 根据需要提供任何类型的值。需要先创建一个实现 org.apache.flink.metrics.Gauge 的类,返回值的类形没有限制。

    Report 程序在暴露数据给外部系统时,会把对象转换为字符串,这意味着需要一个有意义的 toString() 实现。

    public class MyMapper extends RichMapFunction<String, String> {
      private transient int valueToExpose = 0;
    
      @Override
      public void open(Configuration config) {
        getRuntimeContext()
          .getMetricGroup()
          .gauge("MyGauge", new Gauge<Integer>() {
            // 实现 org.apache.flink.metrics.Gauge 接口
            @Override
            public Integer getValue() {
              return valueToExpose;
            }
          });
      }
    
      @Override
      public String map(String value) throws Exception {
        valueToExpose++;
        return value;
      }
    }
    

    Histogram

    Histogram 统计值的分布。

    public class MyMapper extends RichMapFunction<Long, Long> {
      private transient Histogram histogram;
    
      @Override
      public void open(Configuration config) {
        this.histogram = getRuntimeContext()
          .getMetricGroup()
          .histogram("myHistogram", new MyHistogram());
      }
    
      @Override
      public Long map(Long value) throws Exception {
        // 加入一个新值
        this.histogram.update(value);
        return value;
      }
    }
    

    Flink 没有提供 Histogram 的默认实现,可以添加依赖使用 DropwizardHistogramWrapper 实现

    <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-metrics-dropwizard</artifactId>
          <version>1.13.0</version>
    </dependency>
    

    Meter

    Meter 用来统计平均吞吐量。

    public class MyMapper extends RichMapFunction<Long, Long> {
      private transient Meter meter;
    
      @Override
      public void open(Configuration config) {
        this.meter = getRuntimeContext()
          .getMetricGroup()
          .meter("myMeter", new MyMeter());
      }
    
      @Override
      public Long map(Long value) throws Exception {
        // 注册事件
        // markEvent(long n) 可以注册同时发生多个时间
        this.meter.markEvent();
        return value;
      }
    }
    

    同样添加 flink-metrics-dropwizard 依赖,可以使用 DropwizardMeterWrapper 实现

    Scope

    每个 Metric 都会分配一个标识符和一组键值对,用来报告 Metric。

    标识符基于3个组成部分:注册时的用户定义名称、可选的用户定义 Scope 和系统提供的 Scope。例如,如果 A.B 是系统 Scope,C.D 是用户 Scope,E 是名称,那么标识符将是 A.B.C.D.E。

    可以通过在 conf/flink-conf.yaml 中设置 metrics.scope.delimiter 键来配置用于标识符的分隔符(默认值:.)。

    User Scope

    定义 User Scope 的方法: 调用 MetricGroup#addGroup(String name)MetricGroup#addGroup(int name)MetricGroup#addGroup(String key, String value)。这些方法会影响 MetricGroup#getMetricIdentifierMetricGroup#getScopeComponents 的返回值。

    // 创建 Metric 时指定 Scope
    counter = getRuntimeContext()
      .getMetricGroup()
      .addGroup("MyMetrics")
      .counter("myCounter");
    
    counter = getRuntimeContext()
      .getMetricGroup()
      .addGroup("MyMetricsKey", "MyMetricsValue")
      .counter("myCounter");
    

    System Scope

    System Scope 包含 Metric 的上下文信息,例如注册在哪个 Task(<task_name>)或属于哪个 Job(<job_name>)。

    应该包含哪些上下文信息可以通过 conf/flink-conf.yaml 配置。

    • metrics.scope.jm

      • 默认值:<host>.jobmanager
      • JobManager 的所有 Metric
    • metrics.scope.jm.job

      • 默认值:<host>.jobmanager.<job_name>
      • JobManager 和 Job 的所有 Metric
    • metrics.scope.tm

      • 默认值:<host>.taskmanager.<tm_id>
      • TaskManager 的所有 Metric
    • metrics.scope.tm.job

      • 默认值:<host>.taskmanager.<tm_id><job_name>
      • TaskManager 和 Job 的所有 Metric
    • metrics.scope.task

      • 默认值:<host>.taskmanager.<tm_id><job_name><task_name><subtask_index>
      • Task 的所有 Metric
    • metrics.scope.operator

      • 默认值:<host>.taskmanager.<tm_id><job_name><operator_name><subtask_index>
      • Operator 的所有 Metric

    <host> | <job_name> | <tm_id> | <task_name> | <operator_name> | <subtask_index> 可以作为变量使用。变量的数量或顺序没有限制,区分大小写。

    例如:Operator Metric 的默认 Scope 格式为 <host>.taskmanager.<tm_id><job_name><operator_name><subtask_index>,生成的标识符类似 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric 的形式;如果希望包含 Task 名称,并且忽略 TaskManager 信息,可以设置 metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>,生成的标识符会变成 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric

    建议添加带有 ID 的变量(如:<job_id>)保证唯一性,避免出现命名冲突的问题。所有可以使用的变量:

    • JobManager: <host>

    • TaskManager: <host>, <tm_id>

    • Job: <job_id>, <job_name>

    • Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>

    • Operator: <operator_id>, <operator_name>, <subtask_index>

    Reporter

    Flink 允许向外部系统报告 Metric。

    通过在 conf/flink-conf.yaml 中配置一个或多个 Reporter,可以将 Metric 暴露给外部系统。这些 Reporter 在启动时实例化。

    • metrics.reporter.<name>.<config>:Reporter 名称
    • metrics.reporter.<name>.class:Reporter 实现类
    • metrics.reporter.<name>.factory.class:Reporter 工厂类
    • metrics.reporter.<name>.interval:Reporter 调用间隔
    • metrics.reporter.<name>.scope.delimiter:Scope 标识符的分隔符(默认使用 metrics.scope.delimiter
    • metrics.reporter.<name>.scope.variables.excludes:可选项,以 “;” 分隔的变量列表,可以忽略这些变量
    • metrics.reporters:可选项,以 “,” 分隔的 Reporter 名称列表,表示应用哪些 Reporter,默认会包含所有配置的 Reporter。

    Reporter 必须至少配置 classfactory.class 属性(使用哪个取决于 Reporter 的实现)。

    配置 Reporter 示例

    metrics.reporters: my_jmx_reporter,my_other_reporter
    
    metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
    metrics.reporter.my_jmx_reporter.port: 9020-9040
    metrics.reporter.my_jmx_reporter.scope.variables.excludes:job_id;task_attempt_num
    
    metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
    metrics.reporter.my_other_reporter.host: 192.168.1.1
    metrics.reporter.my_other_reporter.port: 10000
    

    自定义 Reporter:

    • 实现 org.apache.flink.metrics.reporter.MetricReporter 接口
    • 如果要定时发送报告,实现 Scheduled 接口

    下面列出了一些支持的 Reporter

    JMX

    org.apache.flink.metrics.jmx.JMXReporter

    参数:

    • port - JMX 监听端口,建议使用范围:9250-9260。实际端口将显示在相关 Job 或 Task Manager 日志中。
    metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory 
    metrics.reporter.jmx.port: 8789
    

    通过 JMX 公开的 Metric 由一个 domain 和一组 key 属性组成标识。domain 总是以 org.apache.flink 开始,接一个通用 metric 标识(与一般的 metric 标识不同,不受 scope 格式的影响,不包含任何变量),例如:org.apache.flink.job.task.numBytesOut。

    key 属性列表包含与给定 Metric 关联的所有变量的值(不受 scope 格式影响)。例如:host=localhost,job_name=MyJob,task_name=MyTask

    Prometheus

    org.apache.flink.metrics.prometheus.PrometheusReporter

    参数:

    • port - Prometheus exporter 侦听的端口,默认为 9249,建议使用范围:9250-9260。
    • filterLabelValueCharacters - 可选项,过滤 label 值中的字符。如果启用,不匹配 [a-zA-Z0-9:_] 的字符会被移除。默认开启,在关闭前,确认 label 值是否符合 Premetheus 要求(Flink metric 变量都会作为 Prometheus label)。
    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    

    Flink Metric 类型和 Prometheus Metric 类型映射

    Flink Prometheus Note
    Counter Gauge Prometheus Counters 不能递减
    Gauge Gauge 只支持数值和布尔
    Histogram Summary 分位数支持 .5, .75, .95, .98, .99, .999
    Meter Gauge

    PrometheusPushGateway

    org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter

    参数

    Key Default Type Description
    deleteOnShutdown true Boolean 在关闭时,是否删除 PushGateway 中的 Metric。
    filterLabelValueCharacters true Boolean 是否过滤 label 值中的字符。如果启用,不匹配 [a-zA-Z0-9:_] 的字符会被移除。默认开启,在关闭前,确认 label 值是否符合 Premetheus 要求(Flink metric 变量都会作为 Prometheus label)。
    groupingKey (none) String 指定 grouping key。格式:lable_name=label_value;lable_name=label_value;
    host (none) String PushGateway 服务地址
    jobName (none) String 指定作业,推送 metric
    port -1 Integer PushGateway 服务端口
    randomJobNameSuffix true Boolean 作业名称添加随机后缀

    PrometheusPushGatewayReporter 将 Metric 推到 Pushgateway

    metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
    metrics.reporter.promgateway.host: localhost
    metrics.reporter.promgateway.port: 9091
    metrics.reporter.promgateway.jobName: myJob
    metrics.reporter.promgateway.randomJobNameSuffix: true
    metrics.reporter.promgateway.deleteOnShutdown: false
    metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
    metrics.reporter.promgateway.interval: 60 SECONDS
    

    系统 Metrics

    默认情况下,Flink 收集的指标

    CPU

    CPU

    Memory

    Memory

    Threads

    Scope 中缀 Metrics 描述 类型
    Job-/TaskManager Status.JVM.Threads Count 活动线程的总数 Gauge

    GC

    GC

    ClassLoader

    ClassLoader

    Default Shuffle Service

    代替 Network/IO 部分 Metrics

    Shuffle

    Cluster

    Cluster

    Availability

    如果启用了 Reactive Mode(1.13 MVP 特性),这些 Metric(除 numRestarts)不能正常工作。

    Availability

    Checkpointing

    如果启用了 Reactive Mode(1.13 MVP 特性),Job Scope 的 Metric 不能正常工作。

    Checkpoint

    IO

    IO

    Connectors

    Kafka Connector

    Scope Metrics 变量 描述 类型
    Operator commitsSucceeded n/a 成功提交到 kafka 的 offset 总数。 <br />如果启动了 offset commit 并且开启 checkpointing Counter
    Operator commitsFailed n/a 没有成功提交到 Kafka 的 offset 总数。 <br />如果启动了 offset commit 并且开启 checkpointing Counter
    Operator committedOffsets topic, partition 对于每个分区,最后一次成功提交到 Kafka 的offset。 <br />可以指定 topic 和 partition Gauge
    Operator currentOffsets topic, partition 对于每个分区,当前读取的 offset。 <br />可以指定 topic 和 partition Gauge

    HBase Connector

    Scope Metrics User Variables Description Type
    Operator lookupCacheHitRate n/a Lookup 缓存命中率 Gauge

    延迟跟踪

    Flink 允许跟踪在系统中传输的记录的延迟。默认情况下禁用此功能。要启用延迟跟踪,必须在 Flink 配置(conf/flink-conf.yaml)或 ExecutionConfig 中将 latencyTrackingInterval 设置为正数。

    Source 会定期(latencyTrackingInterval)发出一个特殊的记录,称为 LatencyMarker。记录包含一个时间戳,该时间戳从记录在源处发出时算起。LatencyMarker 不能超过(overtake)正常记录,因此如果正常记录在 Operator 前排队,将增加标记跟踪的延迟。

    延迟监控的粒度,分为以下3档:

    • single:每个算子单独统计延迟;

    • operator(默认值):每个下游算子都统计自己与 Source 算子之间的延迟;

    • subtask:每个下游算子的 sub-task 都统计自己与 Source 算子的 sub-task 之间的延迟。

    需要注意:

    • LatencyMarker 记录的时间戳最终是靠 System.currentTimeMillis() 方法获取本地时间,要保证 Flink 集群内所有节点的时区、时间是同步的,可以用 NTP 等工具来配置。
    • 启用延迟 metric 会影响集群的性能(特别是 subtask 粒度)。官方建议仅用于调试目的。

    REST API Integration

    Metrics 可以通过 REST API 查询。下面列出一些可用的 Endpoint 和 JSON 返回格式。

    Base URL:http://hostname:8081/jobmanager/metrics

    查询 Metric 未聚合值

    • /jobmanager/metrics
    • /taskmanagers/<taskmanagerid>/metrics
    • /jobs/<jobid>/metrics
    • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>

    查询 Metric 聚合值

    • /taskmanagers/metrics
    • /jobs/metrics
    • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics

    查询 Metric 部分值的聚合值

    • /taskmanagers/metrics?taskmanagers=A,B,C
    • /jobs/metrics?jobs=D,E,F
    • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3

    特殊字符需要转义(符合 URL 标准)

    查看 Metric 列表

    GET /jobmanager/metrics

    [
      {
        "id": "metric1"
      },
      {
        "id": "metric2"
      }
    ]
    

    请求特定 Metric 的值(未聚合)

    GET taskmanagers/<taskmanagerid>/metrics?get=metric1,metric2

    [
      {
        "id": "metric1",
        "value": "34"
      },
      {
        "id": "metric2",
        "value": "2"
      }
    ]
    

    请求特定 Metric 的聚合值

    GET /taskmanagers/metrics?get=metric1,metric2

    [
      {
        "id": "metric1",
        "min": 1,
        "max": 34,
        "avg": 15,
        "sum": 45
      },
      {
        "id": "metric2",
        "min": 2,
        "max": 14,
        "avg": 7,
        "sum": 16
      }
    ]
    

    请求特定 Metric 的特定值的聚合值

    GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max

    [
      {
        "id": "metric1",
        "min": 1,
        "max": 34,
      },
      {
        "id": "metric2",
        "min": 2,
        "max": 14,
      }
    ]
    

    Dashboard Integration

    为 Task 或 Operator 收集的 Metric 也可以在仪表板中可视化。在作业的主页面上,选择 Metrics 选项卡。在 Graph 中选择一个任务后,可以使用 Add Metric 下拉菜单选择要显示的 Metric。

    • Task metrics 列表样式 <subtask_index>.<metric_name>
    • Operator metrics 列表样式 <subtask_index>.<operator_name>.<metric_name>

    每个 Metric 可以被可视化为一个单独的图形,x轴表示时间,y轴表示测量值。图表每10秒自动更新一次。

    相关文章

      网友评论

        本文标题:Flink Metrics

        本文链接:https://www.haomeiwen.com/subject/gsoxyltx.html