返回介绍

度量

发布于 2025-05-02 18:19:20 字数 38356 浏览 0 评论 0 收藏

Flink 公开了一个度量系统,允许收集和公开指标到外部系统。

注册指标

您可以通过调用从扩展 RichFunction 的任何用户函数访问度量标准系统 getRuntimeContext().getMetricGroup() 。此方法返回一个 MetricGroup 对象,您可以在该对象上创建和注册新指标。

度量类型

Flink 支持 CountersGaugesHistogramsMeters

计数器

A Counter 用于计算某些东西。可以使用 inc()/inc(long n) 或来 Reduce 当前值 dec()/dec(long n) 。您可以创建并注册 Counter 调用 counter(String name)MetricGroup

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
  this.counter = getRuntimeContext()
    .getMetricGroup()
    .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
  this.counter.inc();
  return value;
  }
}
class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
  counter = getRuntimeContext()
    .getMetricGroup()
    .counter("myCounter")
  }

  override def map(value: String): String = {
  counter.inc()
  value
  }
}

或者,您也可以使用自己的 Counter 实现:

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
  this.counter = getRuntimeContext()
    .getMetricGroup()
    .counter("myCustomCounter", new CustomCounter());
  }

  @Override
  public String map(String value) throws Exception {
  this.counter.inc();
  return value;
  }
}
class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
  counter = getRuntimeContext()
    .getMetricGroup()
    .counter("myCustomCounter", new CustomCounter())
  }

  override def map(value: String): String = {
  counter.inc()
  value
  }
}

测量

A 根据需要 Gauge 提供任何类型的值。为了使用 a, Gauge 您必须首先创建一个实现该 org.apache.flink.metrics.Gauge 接口的类。返回值的类型没有限制。你可以通过调用注册一个计 gauge(String name, Gauge gauge)MetricGroup

public class MyMapper extends RichMapFunction<String, String> {
  private transient int valueToExpose = 0;

  @Override
  public void open(Configuration config) {
  getRuntimeContext()
    .getMetricGroup()
    .gauge("MyGauge", new Gauge<Integer>() {
    @Override
    public Integer getValue() {
      return valueToExpose;
    }
    });
  }

  @Override
  public String map(String value) throws Exception {
  valueToExpose++;
  return value;
  }
}
new class MyMapper extends RichMapFunction[String,String] {
  @transient private var valueToExpose = 0

  override def open(parameters: Configuration): Unit = {
  getRuntimeContext()
    .getMetricGroup()
    .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  }

  override def map(value: String): String = {
  valueToExpose += 1
  value
  }
}

请注意,报告会将公开的对象转换为 a String ,这意味着需要进行有意义的 toString() 实现。

直方图

A Histogram 衡量长值的分布。你可以通过调用注册一个 histogram(String name, Histogram histogram) 上一个 MetricGroup

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Histogram histogram;

  @Override
  public void open(Configuration config) {
  this.histogram = getRuntimeContext()
    .getMetricGroup()
    .histogram("myHistogram", new MyHistogram());
  }

  @Override
  public Long map(Long value) throws Exception {
  this.histogram.update(value);
  return value;
  }
}
class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var histogram: Histogram = _

  override def open(parameters: Configuration): Unit = {
  histogram = getRuntimeContext()
    .getMetricGroup()
    .histogram("myHistogram", new MyHistogram())
  }

  override def map(value: Long): Long = {
  histogram.update(value)
  value
  }
}

Flink 没有提供默认实现 Histogram ,但提供了一个允许使用 Codahale / DropWizard 直方图的 Wrapper 。要使用此打包,请在以下内容中添加以下依赖项 pom.xml

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-metrics-dropwizard</artifactId>
    <version>1.7-SNAPSHOT</version>
</dependency>

然后你可以像这样注册一个 Codahale / DropWizard 直方图:

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Histogram histogram;

  @Override
  public void open(Configuration config) {
  com.codahale.metrics.Histogram dropwizardHistogram =
    new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));

  this.histogram = getRuntimeContext()
    .getMetricGroup()
    .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));
  }

  @Override
  public Long map(Long value) throws Exception {
  this.histogram.update(value);
  return value;
  }
}
class MyMapper extends RichMapFunction[Long, Long] {
  @transient private var histogram: Histogram = _

  override def open(config: Configuration): Unit = {
  com.codahale.metrics.Histogram dropwizardHistogram =
    new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))

  histogram = getRuntimeContext()
    .getMetricGroup()
    .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
  }

  override def map(value: Long): Long = {
  histogram.update(value)
  value
  }
}

仪表

A Meter 衡量平均吞吐量。可以使用该 markEvent() 方法注册事件的发生。可以使用 markEvent(long n) 方法注册同时发生多个事件。你可以通过调用注册一个仪表 meter(String name, Meter meter)MetricGroup

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Meter meter;

  @Override
  public void open(Configuration config) {
  this.meter = getRuntimeContext()
    .getMetricGroup()
    .meter("myMeter", new MyMeter());
  }

  @Override
  public Long map(Long value) throws Exception {
  this.meter.markEvent();
  return value;
  }
}
class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _

  override def open(config: Configuration): Unit = {
  meter = getRuntimeContext()
    .getMetricGroup()
    .meter("myMeter", new MyMeter())
  }

  override def map(value: Long): Long = {
  meter.markEvent()
  value
  }
}

Flink 提供了一个允许使用 Codahale / DropWizard 表的 打包器 。要使用此打包,请在以下内容中添加以下依赖项 pom.xml

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-metrics-dropwizard</artifactId>
    <version>1.7-SNAPSHOT</version>
</dependency>

然后你可以像这样注册一个 Codahale / DropWizard 仪表:

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Meter meter;

  @Override
  public void open(Configuration config) {
  com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();

  this.meter = getRuntimeContext()
    .getMetricGroup()
    .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
  }

  @Override
  public Long map(Long value) throws Exception {
  this.meter.markEvent();
  return value;
  }
}
class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _

  override def open(config: Configuration): Unit = {
  com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter()

  meter = getRuntimeContext()
    .getMetricGroup()
    .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
  }

  override def map(value: Long): Long = {
  meter.markEvent()
  value
  }
}

范围

为每个度量标准分配一个标识符和一组键值对,在该键值对下将报告度量标准。

标识符基于 3 个组件:注册度量标准时的用户定义名称,可选的用户定义范围和系统提供的范围。例如,如果 A.B 是系统范围, C.D 用户范围和 E 名称,则度量标识符将是 A.B.C.D.E

您可以 . 通过设置 metrics.scope.delimiter Keys 来配置要用于标识符的分隔符(默认值:) conf/flink-conf.yaml

用户范围

你可以通过调用定义用户范围 MetricGroup#addGroup(String name)MetricGroup#addGroup(int name)Metric#addGroup(String key, String value) 。这些方法影响什么 MetricGroup#getMetricIdentifierMetricGroup#getScopeComponents 返回。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter");

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter");
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter")

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

系统范围

系统范围包含有关度量标准的上下文信息,例如,它在哪个任务中注册或该任务属于哪个作业。

可以通过设置以下键来配置应包含哪些上下文信息 conf/flink-conf.yaml 。这些键中的每一个都期望一个格式字符串可能包含常量(例如“taskmanager”)和变量(例如“<task_id>”),它们将在运行时被替换。

  • metrics.scope.jm
    • 默认值:<host> .jobmanager
    • 应用于作用域 JobManager 的所有指标。
  • metrics.scope.jm.job
    • 默认值:<host> .jobmanager。<job_name>
    • 应用于作用于 JobManager 和作业的所有度量标准。
  • metrics.scope.tm
    • 默认值:<host> .taskmanager。<tm_id>
    • 应用于作用于 TaskManager 的所有度量标准。
  • metrics.scope.tm.job
    • 默认值:<host> .taskmanager。<tm_id>。<job_name>
    • 应用于作用于 TaskManager 和作业的所有度量标准。
  • metrics.scope.task
    • 默认值:<host> .taskmanager。<tm_id>。<job_name>。<task_name>。<subtask_index>
    • 应用于作用于任务的所有指标。
  • metrics.scope.operator
    • 默认值:<host> .taskmanager。<tm_id>。<job_name>。<operator_name>。<subtask_index>
    • 应用于作用于算子的所有指标。

变量的数量或顺序没有限制。变量区分大小写。

算子指标的默认范围将产生类似于的标识符 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric

如果您还想包含任务名称但省略 TaskManager 信息,则可以指定以下格式:

metrics.scope.operator: &lt;host&gt;.&lt;job_name&gt;.&lt;task_name&gt;.&lt;operator_name&gt;.&lt;subtask_index&gt;

这可以创建标识符 localhost.MyJob.MySource_-&gt;_MyOperator.MyOperator.0.MyMetric

请注意,对于此格式字符串,如果同时多次运行同一作业,则可能发生标识符冲突,这可能导致度量标准数据不一致。因此,建议使用通过包含 ID(例如<job_id>)或通过为作业和 算子分配唯一名称来提供一定程度的唯一性的格式字符串。

所有变量列表

  • JobManager:<host>
  • TaskManager:<host>,<tm_id>
  • 作业:<job_id>,<作业名称>
  • 任务:<task_id>,<task_name>,<task_attempt_id>,<task_attempt_num>,<subtask_index>
  • 算子:<operator_id>,<operator_name>,<subtask_index>

要点: 对于 Batch API,<operator_id>始终等于<task_id>。

用户变量

您可以通过调用来定义用户变量 MetricGroup#addGroup(String key, String value) 。这种方法会影响什么 MetricGroup#getMetricIdentifierMetricGroup#getScopeComponentsMetricGroup#getAllVariables() 返回。

重要提示: 用户变量不能用于范围格式。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter");
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

报告

通过配置一个或多个报告,可以将度量标准暴露给外部系统 conf/flink-conf.yaml 。这些报告将在每个工作和 TaskManager 启动时进行实例化。

  • metrics.reporter.&lt;name&gt;.&lt;config&gt;&lt;config&gt; 报告的通用设置命名 &lt;name&gt;
  • metrics.reporter.&lt;name&gt;.class :报告类用于为报告命名 &lt;name&gt;
  • metrics.reporter.&lt;name&gt;.interval :报告间隔用于报告的名字 &lt;name&gt;
  • metrics.reporter.&lt;name&gt;.scope.delimiter :用于名称的报告者的标识符(默认值使用 metrics.scope.delimiter )的分隔符 &lt;name&gt;
  • metrics.reporters :(可选)以逗号分隔的包含报告名称列表。默认情况下,将使用所有已配置的报告。

所有报告必须至少拥有该 class 财产,其中一些允许指定报告 interval 。下面,我们将列出针对每位报告的更多设置。

示例报表配置,指定多个报告:

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

重要说明: 启动 Flink 时,通过将其放在/ lib 文件夹中,可以访问包含报告者的 jar。

您可以 Reporter 通过实现 org.apache.flink.metrics.reporter.MetricReporter 接口编写自己的。如果 Reporter 应定期发送报告,您还必须实现该 Scheduled 接口。

以下部分列出了受支持的报告。

JMX(org.apache.flink.metrics.jmx.JMXReporter)

您不必包含其他依赖项,因为默认情况下 JMX 报告器可用但未激活。

参数:

  • port - (可选)JMX 侦听连接的端口。为了能够在一个主机上运行多个报告实例(例如,当一个 TaskManager 与 JobManager 共同使用时),建议使用类似的端口范围 9250-9260 。指定范围时,实际端口将显示在相关作业或 TaskManager 日志中。如果设置此设置,Flink 将为给定的端口/范围启动额外的 JMX 连接器。度量标准始终在默认的本地 JMX 界面上可用。

配置示例:

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789

通过 JMX 公开的度量标准由域和键属性列表标识,这些键属性一起形成对象名称。

域始终以 org.apache.flink 广义度量标识符开头。与通常的标识符相反,它不受作用域格式的影响,不包含任何变量,并且在作业中保持不变。这种域的一个例子是 org.apache.flink.job.task.numBytesOut

键属性列表包含与给定度量关联的所有变量的值,无论配置的范围格式如何。这样一个列表的一个例子是 host=localhost,job_name=MyJob,task_name=MyTask

因此,域标识度量标准类,而关键属性列表标识该度量标准的一个(或多个)实例。

Ganglia(org.apache.flink.metrics.ganglia.GangliaReporter)

要使用此报告,您必须复制 /opt/flink-metrics-ganglia-1.7-SNAPSHOT.jar/lib Flink 发行版的文件夹中。

参数:

  • host -下配置的的 gmond 主机地址 udp_recv_channel.bindgmond.conf
  • port -下配置的端口的 gmond udp_recv_channel.portgmond.conf
  • tmax - 应保存旧度量标准的软限制
  • dmax - 应保存旧指标多长时间的硬限制
  • ttl - 传输的 UDP 数据包的生存时间
  • addressingMode - 要使用的 UDP 寻址模式(UNICAST / MULTICAST)

配置示例:

metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST

Graphite(org.apache.flink.metrics.graphite.GraphiteReporter)

要使用此报告,您必须复制 /opt/flink-metrics-graphite-1.7-SNAPSHOT.jar/lib Flink 发行版的文件夹中。

参数:

  • host - Graphite 服务器主机
  • port - Graphite 服务器端口
  • protocol - 使用协议(TCP / UDP)

配置示例:

metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP

Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)

要使用此报告,您必须复制 /opt/flink-metrics-prometheus-1.7-SNAPSHOT.jar/lib Flink 发行版的文件夹中。

参数:

  • port - (可选)Prometheus 导出器侦听的端口,默认为 9249 。为了能够在一个主机上运行多个报告实例(例如,当一个 TaskManager 与 JobManager 共同使用时),建议使用类似的端口范围 9250-9260

配置示例:

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

Flink 度量标准类型映射到 Prometheus 度量标准类型,如下所示:

FlinkPrometheus注意
计数器测量Prometheus 计数器不能 Reduce。
测量测量仅支持数字和布尔值。
直方图概要分位数.5,.75,.95,.98,.99 和.999
仪表测量仪表输出仪表的速率。

所有 Flink 度量变量(请参阅 所有变量列表 )都将作为标签导出到 Prometheus。

PrometheusPushGateway(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)

要使用此报告,您必须复制 /opt/flink-metrics-prometheus-1.7-SNAPSHOT.jar/lib Flink 发行版的文件夹中。

参数:

默认描述
deleteOnShutdowntrue指定是否在关闭时从 PushGateway 中删除指标。
Host(none)PushGateway 服务器主机。
jobName(none)将推送指标的作业名称
port-1PushGateway 服务器端口。
randomJobNameSuffixtrue指定是否应将随机后缀附加到作业名称。

配置示例:

metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false

PrometheusPushGatewayReporter 将指标推送到 Pushgateway ,可由 Prometheus 抓取

有关用例,请参阅 Prometheus 文档

StatsD(org.apache.flink.metrics.statsd.StatsDReporter)

要使用此报告,您必须复制 /opt/flink-metrics-statsd-1.7-SNAPSHOT.jar/lib Flink 发行版的文件夹中。

参数:

  • host - StatsD 服务器主机
  • port - StatsD 服务器端口

配置示例:

metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125

Datadog(org.apache.flink.metrics.datadog.DatadogHttpReporter)

要使用此报告,您必须复制 /opt/flink-metrics-datadog-1.7-SNAPSHOT.jar/lib Flink 发行版的文件夹中。

注意 Flink 指标,如任何变量 &lt;host&gt;&lt;job_name&gt;&lt;tm_id&gt;&lt;subtask_index&gt;&lt;task_name&gt; ,和 &lt;operator_name&gt; ,将被发送到 Datadog 的标签。标签看起来像 host:localhostjob_name:myjobname

参数:

  • apikey - Datadog APIKeys
  • tags - (可选)发送到 Datadog 时将应用于度量标准的全局标记。标签应仅以逗号分隔

配置示例:

metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.tags: myflinkapp,prod

Slf4j(org.apache.flink.metrics.slf4j.Slf4jReporter)

要使用此报告,您必须复制 /opt/flink-metrics-slf4j-1.7-SNAPSHOT.jar/lib Flink 发行版的文件夹中。

配置示例:

metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 60 SECONDS

系统指标

默认情况下,Flink 会收集几个指标,这些指标可以提供有关当前状态的深入见解。本节是所有这些指标的参考。

下表通常包含 5 列:

  • “范围”列描述了用于生成系统范围的范围格式。例如,如果单元格包含“Operator”,则使用“metrics.scope.operator”的范围格式。如果单元格包含多个值(以斜杠分隔),则会针对不同的实体多次报告度量标准,例如作业和 TaskManager。
  • (可选)“Infix”列描述了哪个中缀附加到系统范围。
  • “度量标准”列列出了为给定范围和中缀注册的所有度量标准的名称。
  • “描述”列提供有关给定度量正在测量的信息。
  • “类型”列描述了用于测量的度量类型。

请注意,中缀/指标名称列中的所有点仍受“metrics.delimiter”设置的约束。

因此,为了推断度量标识符:

  1. 根据“范围”列获取范围格式
  2. 如果存在,则将值附加到“中缀”列中,并考虑“metrics.delimiter”设置
  3. 附加指标名称。

中央处理器

范围中缀度量描述类型
Job-/TaskManagerStatus.JVM.CPU加载JVM 最近的 CPU 使用情况。测量
时间JVM 使用的 CPU 时间。测量

内存

范围中缀度量描述类型
Job-/TaskManagerStatus.JVM.MemoryHeap.Used当前使用的堆内存量(以字节为单位)。测量
Heap.Committed保证可供 JVM 使用的堆内存量(以字节为单位)。测量
Heap.Max可用于内存管理的最大堆内存量(以字节为单位)。测量
NonHeap.Used当前使用的非堆内存量(以字节为单位)。测量
NonHeap.Committed保证 JVM 可用的非堆内存量(以字节为单位)。测量
NonHeap.Max可用于内存管理的最大非堆内存量(以字节为单位)。测量
Direct.Count直接缓冲池中的缓冲区数。测量
Direct.MemoryUsedJVM 用于直接缓冲池的内存量(以字节为单位)。测量
Direct.TotalCapacity直接缓冲池中所有缓冲区的总容量(以字节为单位)。测量
Mapped.Count映射缓冲池中的缓冲区数。测量
Mapped.MemoryUsedJVM 用于映射缓冲池的内存量(以字节为单位)。测量
Mapped.TotalCapacity映射缓冲池中的缓冲区数(以字节为单位)。测量

线程

范围中缀度量描述类型
Job-/TaskManagerStatus.JVM.Threads计数活动线程总数。测量

垃圾收集

范围中缀度量描述类型
Job-/TaskManagerStatus.JVM.GarbageCollector<GarbageCollector> .Count 之间已发生的集合总数。测量
<GarbageCollector>。时间执行垃圾收集所花费的总时间。测量

类加载器

范围中缀度量描述类型
Job-/TaskManagerStatus.JVM.ClassLoaderClassesLoaded自 JVM 启动以来加载的类总数。测量
ClassesUnloaded自 JVM 启动以来卸载的类总数。测量

网络

范围中缀度量描述类型
TaskManagerStatus.NetworkAvailableMemorySegments未使用的内存段数。测量
TotalMemorySegments分配的内存段数。测量
TaskbuffersinputQueueLength排队的输入缓冲区数。测量
outputQueueLength排队输出缓冲区的数量。测量
inPoolUsage估计输入缓冲区的使用情况。测量
outPoolUsage估计输出缓冲区的使用情况。测量
Network.<Input|Output>.<gate>(only available if taskmanager.net.detailed-metrics config option is set)totalQueueLen所有输入/输出通道中排队缓冲区的总数。测量
minQueueLen所有输入/输出通道中的最小排队缓冲区数。测量
maxQueueLen所有输入/输出通道中的最大排队缓冲区数。测量
avgQueueLen所有输入/输出通道中的平均缓冲区数。测量

集群

范围度量描述类型
JobManagernumRegisteredTaskManagers注册任务管理员的数量。测量
numRunningJobs正在运行的作业数量。测量
taskSlotsAvailable可用任务槽的数量。测量
taskSlotsTotal任务槽的总数。测量

可用性

范围度量描述类型
Job (only available on JobManager)restartingTime重新启动作业所花费的时间,或当前重新启动的持续时间(以毫秒为单位)。测量
uptime作业运行的时间不间断。对于已完成的作业,返回-1(以毫秒为单位)。测量
downtime对于当前处于故障/恢复状态的作业,在此中断期间经过的时间。对于正在运行的作业返回 0,对于已完成的作业返回-1(以毫秒为单位)。测量
fullRestarts自提交此作业以来完全重新启动的总次数。测量

检查点

范围度量描述类型
Job (only available on JobManager)lastCheckpointDuration完成最后一个检查点所花费的时间(以毫秒为单位)。测量
lastCheckpointSize最后一个检查点的总大小(以字节为单位)。测量
lastCheckpointExternalPath存储最后一个外部检查点的路径。测量
lastCheckpointRestoreTimestamp在协调器上恢复最后一个检查点时的时间戳(以毫秒为单位)。测量
lastCheckpointAlignmentBuffered在最后一个检查点的所有子任务上进行对齐期间的缓冲字节数(以字节为单位)。测量
numberOfInProgressCheckpoints进行中检查点的数量。测量
numberOfCompletedCheckpoints成功完成检查点的数量。测量
numberOfFailedCheckpoints失败检查点的数量。测量
totalNumberOfCheckpoints总检查点的数量(正在进行,已完成,失败)。测量
TaskcheckpointAlignmentTime最后一次屏障对齐完成所花费的时间(以纳秒为单位),或当前对齐到目前为止所用的时间(以纳秒为单位)。测量

IO

范围度量描述类型
Job (only available on TaskManager)<SOURCE_ID> <source_subtask_index> <operator_id> <operator_subtask_index> .latency从给定源子任务到算子子任务的延迟分布(以毫秒为单位)。直方图
任务numBytesInLocal此任务从本地源读取的总字节数。计数器
numBytesInLocalPerSecond此任务每秒从本地源读取的字节数。仪表
numBytesInRemote此任务从远程源读取的总字节数。计数器
numBytesInRemotePerSecond此任务每秒从远程源读取的字节数。仪表
numBuffersInLocal此任务从本地源读取的网络缓冲区总数。计数器
numBuffersInLocalPerSecond此任务每秒从本地源读取的网络缓冲区数。仪表
numBuffersInRemote此任务从远程源读取的网络缓冲区总数。计数器
numBuffersInRemotePerSecond此任务每秒从远程源读取的网络缓冲区数。仪表
numBytesOut此任务已发出的总字节数。计数器
numBytesOutPerSecond此任务每秒发出的字节数。仪表
numBuffersOut此任务已发出的网络缓冲区总数。计数器
numBuffersOutPerSecond此任务每秒发出的网络缓冲区数。仪表
任务/算子numRecordsIn此 算子/任务已收到的记录总数。计数器
numRecordsInPerSecond此 算子/任务每秒接收的记录数。仪表
numRecordsOut此 算子/任务已发出的记录总数。计数器
numRecordsOutPerSecond此 算子/任务每秒发送的记录数。仪表
numLateRecordsDropped此算子/任务因迟到而丢失的记录数。计数器
currentInputWatermark此 算子/任务收到的最后一个水印(以毫秒为单位)。 注意: 对于具有 2 个输入的算子/任务,这是最后收到的水印的最小值。测量
算子currentInput1Watermark此 算子在其第一个输入(毫秒)中收到的最后一个水印。 注意: 仅适用于具有 2 个输入的算子。测量
currentInput2Watermark此 算子在其第二个输入中接收的最后一个水印(以毫秒为单位)。 注意: 仅适用于具有 2 个输入的算子。测量
currentOutputWatermark此 算子发出的最后一个水印(以毫秒为单位)。测量
numSplitsProcessed此数据源已处理的 InputSplits 总数(如果 算子是数据源)。测量

连接器

Kafka 连接器

范围度量用户变量描述类型
算子commitsSucceededN / A如果启用了偏移提交并且启用了检查点,则成功向 Kafka 提交的偏移提交总数。计数器
算子commitsFailedN / A如果启用了偏移提交并且启用了检查点,则 Kafka 的偏移提交失败总数。请注意,将偏移量提交回 Kafka 只是暴露消费者进度的一种方法,因此提交失败不会影响 Flink 的检查点分区偏移的完整性。计数器
算子committedOffsetsTopic,分区对于每个分区,最后成功提交到 Kafka 的偏移量。可以通过主题名称和分区 ID 指定特定分区的度量标准。测量
算子currentOffsetsTopic,分区消费者对每个分区的当前读取偏移量。可以通过主题名称和分区 ID 指定特定分区的度量标准。测量

Kinesis 连接器

范围度量用户变量描述类型
算子millisBehindLateststream,shardId对于每个 Kinesis 分片,消费者在流的头部后面的毫秒数,表示消费者当前时间落后多少。可以通过流名称和分片标识指定特定分片的度量标准。值为 0 表示记录处理被捕获,此时没有要处理的新记录。值-1 表示该度量标准尚未报告。测量
算子sleepTimeMillisstream,shardId消费者在从 Kinesis 获取记录之前花费的毫秒数。可以通过流名称和分片标识指定特定分片的度量标准。测量
算子maxNumberOfRecordsPerFetchstream,shardId消费者在单个 getRecords 调用 Kinesis 时请求的最大记录数。如果 ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS 设置为 true,则自适应地计算此值以最大化 Kinesis 的 2 Mbps 读取限制。测量
算子numberOfAggregatedRecordsPerFetchstream,shardId消费者在单个 getRecords 调用 Kinesis 时获取的聚合 Kinesis 记录数。测量
算子numberOfDeggregatedRecordsPerFetchstream,shardId消费者在单个 getRecords 调用 Kinesis 时获取的分解 Kinesis 记录的数量。测量
算子averageRecordSizeBytesstream,shardIdKinesis 记录的平均大小(以字节为单位),由消费者在单个 getRecords 调用中获取。测量
算子runLoopTimeNanosstream,shardId消费者在运行循环中花费的实际时间(以纳秒为单位)。测量
算子loopFrequencyHzstream,shardId一秒钟内调用 getRecords 的次数。测量
算子bytesRequestedPerFetchstream,shardId在一次调用 getRecords 中请求的字节数(2 Mbps / loopFrequencyHz)。测量

系统资源

默认情况下禁用系统资源报告。当 metrics.system-resource 启用下面列出的指标将是可利用的作业-与 TaskManager。系统资源度量标准会定期更新,并显示已配置间隔( metrics.system-resource-probing-interval )的平均值。

系统资源报告要求在类路径上存在可选的依赖项(例如,放在 Flink 的 lib 目录中):

  • com.github.oshi:oshi-core:3.4.0 (根据 EPL 1.0 许可证授权)

包括它的传递依赖:

  • net.java.dev.jna:jna-platform:jar:4.2.2
  • net.java.dev.jna:jna:jar:4.2.2

这方面的失败将被报告为启动期间 NoClassDefFoundError 记录的警告消息 SystemResourcesMetricsInitializer

系统 CPU

范围中缀度量描述
Job-/TaskManagerSystem.CPU用法机器上 CPU 使用率的总体百分比。
机器上 CPU 空闲使用率的百分比。
SYS计算机上系统 CPU 使用率的百分比。
用户计算机上用户 CPU 使用率的百分比。
IOWAIT计算机上 IOWait CPU 使用率的百分比。
IRQ机器上 Irq CPU 使用率的百分比。
软中断计算机上 SoftIrq CPU 使用率的百分比。
尼斯在机器上使用 Nice Idle 的百分比。
Load1min平均 CPU 负载超过 1 分钟
Load5min平均 CPU 负载超过 5 分钟
Load15min平均 CPU 负载超过 15 分钟
UsageCPU *每个处理器的 CPU 使用率百分比

系统内存

范围中缀度量描述
Job-/TaskManagerSystem.Memory可得到可用内存字节数
总内存(字节)
System.Swap用过的使用的交换字节
总交换字节数

系统网络

范围中缀度量描述
Job-/TaskManagerSystem.Network.INTERFACE_NAMEReceiveRate平均接收速率,以每秒字节数为单位
SendRate平均发送速率,以字节/秒为单位

延迟跟踪

Flink 允许跟踪通过系统传输的记录的延迟。默认情况下禁用此函数。为了使延迟跟踪你必须设置 latencyTrackingInterval 在无论是正数 Flink 配置ExecutionConfig

latencyTrackingInterval ,源将定期发出一个特殊的记录,称为 LatencyMarker 。标记包含从源发出记录时的时间戳。延迟标记不能超过常规用户记录,因此如果记录在算子面前排队,则会增加标记跟踪的延迟。

请注意,延迟标记不会考虑用户记录在算子中绕过它们的时间。特别是标记不考虑记录在窗口缓冲区中花费的时间。只有当算子无法接受新记录,因此他们排队时,使用标记测量的延迟才会反映出来。

所有中间 算子都会保存 n 每个源的最后一个延迟列表,以计算延迟分布。接收器算子保存每个源的列表,以及每个并行源实例,以允许检测由各个机器引起的延迟问题。

目前,Flink 假定群集中所有计算机的时钟都是同步的。我们建议设置自动时钟同步服务(如 NTP)以避免错误的延迟结果。

警告启用延迟指标可能会显着影响群集的性能。强烈建议仅将它们用于调试目的。

REST API 集成

可以通过 Monitoring REST API 查询度量标准。

下面是可用端点列表,带有示例 JSON 响应。所有端点都是样本表单 http://hostname:8081/jobmanager/metrics ,下面我们仅列出 URL 的 路径 部分。

尖括号中的值是变量,例如 http://hostname:8081/jobs/&lt;jobid&gt;/metrics ,必须请求变量 http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metrics

请求特定实体的指标:

  • /jobmanager/metrics
  • /taskmanagers/&lt;taskmanagerid&gt;/metrics
  • /jobs/&lt;jobid&gt;/metrics
  • /jobs/&lt;jobid&gt;/vertices/&lt;vertexid&gt;/subtasks/&lt;subtaskindex&gt;

请求在相应类型的所有实体之间聚合的指标:

  • /taskmanagers/metrics
  • /jobs/metrics
  • /jobs/&lt;jobid&gt;/vertices/&lt;vertexid&gt;/subtasks/metrics

请求在相应类型的所有实体的子集上聚合的度量标准:

  • /taskmanagers/metrics?taskmanagers=A,B,C
  • /jobs/metrics?jobs=D,E,F
  • /jobs/&lt;jobid&gt;/vertices/&lt;vertexid&gt;/subtasks/metrics?subtask=1,2,3

请求可用指标列表:

GET /jobmanager/metrics

[  {  "id":  "metric1"  },  {  "id":  "metric2"  }  ]

请求特定(未聚合)指标的值:

GET taskmanagers/ABCDE/metrics?get=metric1,metric2

[  {  "id":  "metric1",  "value":  "34"  },  {  "id":  "metric2",  "value":  "2"  }  ]

请求特定指标的汇总值:

GET /taskmanagers/metrics?get=metric1,metric2

[  {  "id":  "metric1",  "min":  1,  "max":  34,  "avg":  15,  "sum":  45  },  {  "id":  "metric2",  "min":  2,  "max":  14,  "avg":  7,  "sum":  16  }  ]

请求特定指标的特定聚合值:

GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max

[  {  "id":  "metric1",  "min":  1,  "max":  34,  },  {  "id":  "metric2",  "min":  2,  "max":  14,  }  ]

仪表板集成

为每个任务或算子收集的度量标准也可以在仪表板中显示。在作业的主页面上,选择 Metrics 选项卡。选择顶部图表中的一个任务后,您可以使用 Add Metric 下拉菜单选择要显示的指标。

  • 任务指标列为 &lt;subtask_index&gt;.&lt;metric_name&gt;
  • 算子指标列为 &lt;subtask_index&gt;.&lt;operator_name&gt;.&lt;metric_name&gt;

每个度量将可视化为单独的图形,x 轴表示时间,y 轴表示测量值。所有图表每 10 秒自动更新一次,并在导航到另一页时继续更新。

可视化指标的数量没有限制; 但是只能显示数字指标。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。