返回介绍

监控

发布于 2025-04-30 21:00:07 字数 12407 浏览 0 评论 0 收藏 0

Flume 中的监控仍在进行中。变化可能经常发生。几个 Flume 组件向 JMX 平台 MBean 服务器报告度量标准。可以使用 Jconsole 查询这些指标。

可用的组件指标

下表显示了可用于组件的度量标准。每个组件仅维护一组度量,由“x”表示,未维护的值显示默认值,即 0.这些表告诉您可以在何处获得有意义的数据。度量标准的名称应该足够描述,有关更多信息,您必须深入了解组件的源代码。

来源 1

 Avro执行HTTPJMSKafkaMultiportSyslogTCPScribe
AppendAcceptedCountx
AppendBatchAcceptedCountxxx
AppendBatchReceivedCountxxx
AppendReceivedCountx
ChannelWriteFailxxxxxx
EventAcceptedCountxxxxxxx
EventReadFailxxxxx
EventReceivedCountxxxxxxx
GenericProcessingFailxx
KafkaCommitTimerx
KafkaEmptyCountx
KafkaEventGetTimerx
OpenConnectionCountx

来源 2

 SequenceGeneratorSpoolDirectorySyslogTcpSyslogUDPTaildirThrift
AppendAcceptedCountx
AppendBatchAcceptedCountxxxx
AppendBatchReceivedCountxxx
AppendReceivedCountx
ChannelWriteFailxxxxxx
EventAcceptedCountxxxxxx
EventReadFailxxxx
EventReceivedCountxxxxx
GenericProcessingFailxx
KafkaCommitTimer
KafkaEmptyCount
KafkaEventGetTimer
OpenConnectionCount

水槽 1

 Avro / ThriftAsyncHBaseElasticSearchHBaseHBase2
BatchCompleteCountxxxxx
BatchEmptyCountxxxxx
BatchUnderflowCountxxxxx
ChannelReadFailxx
ConnectionClosedCountxxxxx
ConnectionCreatedCountxxxxx
ConnectionFailedCountxxxxx
EventDrainAttemptCountxxxxx
EventDrainSuccessCountxxxxx
EventWriteFailxx
KafkaEventSendTimer
RollbackCount

水槽 2

 HDFSEventHiveHttpKafkaMorphlineRollingFile
BatchCompleteCountxxx
BatchEmptyCountxxxx
BatchUnderflowCountxxxx
ChannelReadFailxxxxxx
ConnectionClosedCountxxx
ConnectionCreatedCountxxx
ConnectionFailedCountxxx
EventDrainAttemptCountxxxxx
EventDrainSuccessCountxxxxxx
EventWriteFailxxxxxx
KafkaEventSendTimerx
RollbackCountx

Channels

 文件Kafka内存PseudoTxnMemorySpillableMemory
ChannelCapacityxxx
ChannelSizexxxx
CheckpointBackupWriteErrorCountx
CheckpointWriteErrorCountx
EventPutAttemptCountxxxxx
EventPutErrorCountx
EventPutSuccessCountxxxxx
EventTakeAttemptCountxxxxx
EventTakeErrorCountx
EventTakeSuccessCountxxxxx
KafkaCommitTimerx
KafkaEventGetTimerx
KafkaEventSendTimerx
打开x
RollbackCounterx
不 Healthx

JMX 报告

可以通过使用 flume-env.sh 在 JAVA_OPTS 环境变量中指定 JMX 参数来启用 JMX 报告,如
export JAVA_OPTS =“ - Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port = 5445 -Dcom.sun.management.jmxremote.authenticate = false -Dcom.sun.management.jmxremote.ssl = false”

注意:上面的示例禁用安全性。要启用安全性,请参阅 http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html

Ganglia 报道

Flume 还可以将这些指标报告给 Ganglia 3 或 Ganglia 3.1 元节点。要向 Ganglia 报告指标,必须使用此支持启动水槽代理。必须通过传递以下参数来启动 Flume 代理,因为系统属性前缀为 flume.monitoring.
,可以在 flume-env.sh 中指定:

属性名称默认说明
type-组件类型名称必须为 ganglia
hosts-以逗号分隔的 hostname:port Ganglia 服务器列表
pollFrequency60连续向 Ganglia 服务器报告之间的时间(以秒为单位)
isGanglia3falseGanglia 服务器版本为 3.默认情况下,Flume 以 Ganglia 3.1 格式发送

我们可以通过 Ganglia 支持启动 Flume,如下所示:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455

JSON 报告

Flume 还可以以 JSON 格式报告指标。为了以 JSON 格式启用报告,Flume 在可配置端口上托管 Web 服务器。 Flume 以以下 JSON 格式报告指标:

{
"typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"},
"typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"}
}

这是一个例子:

{
"CHANNEL.fileChannel":{"EventPutSuccessCount":"468085",
                      "Type":"CHANNEL",
                      "StopTime":"0",
                      "EventPutAttemptCount":"468086",
                      "ChannelSize":"233428",
                      "StartTime":"1344882233070",
                      "EventTakeSuccessCount":"458200",
                      "ChannelCapacity":"600000",
                      "EventTakeAttemptCount":"458288"},
"CHANNEL.memChannel":{"EventPutSuccessCount":"22948908",
                   "Type":"CHANNEL",
                   "StopTime":"0",
                   "EventPutAttemptCount":"22948908",
                   "ChannelSize":"5",
                   "StartTime":"1344882209413",
                   "EventTakeSuccessCount":"22948900",
                   "ChannelCapacity":"100",
                   "EventTakeAttemptCount":"22948908"}
}
属性名称默认说明
type-组件类型名称必须为 http
port41414启动服务器的端口。

我们可以使用 JSON 报告支持启动 Flume,如下所示:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

然后会在 http://<hostname>:<port>/metrics 网页上提供指标。自定义组件可以报告上面 Ganglia 部分中提到的指标。

自定义报告

可以通过编写执行报告的服务器向其他系统报告指标。任何报告类都必须实现该接口, org.apache.flume.instrumentation.MonitorService
。这样的类可以与 GangliaServer 用于报告的方式相同。他们可以轮询平台 mbean 服务器以轮询 mbeans 以获取指标。例如,如果 HTTP 监视服务名为 HTTPReporting
可以使用如下:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
属性名称默认说明
type-组件类型名称必须为 FQCN

报告自定义组件的指标

任何自定义水槽组件都应该继承 org.apache.flume.instrumentation.MonitoredCounterGroup
类。然后,该类应为其公开的每个度量标准提供 getter 方法。请参阅下面的代码。 MonitoredCounterGroup 需要一个属性列表,其度量由此类公开。截至目前,此类仅支持将度量标准公开为长值。

public class SinkCounter extends MonitoredCounterGroup implements
    SinkCounterMBean {

  private static final String COUNTER_CONNECTION_CREATED =
    "sink.connection.creation.count";

  private static final String COUNTER_CONNECTION_CLOSED =
    "sink.connection.closed.count";

  private static final String COUNTER_CONNECTION_FAILED =
    "sink.connection.failed.count";

  private static final String COUNTER_BATCH_EMPTY =
    "sink.batch.empty";

  private static final String COUNTER_BATCH_UNDERFLOW =
      "sink.batch.underflow";

  private static final String COUNTER_BATCH_COMPLETE =
    "sink.batch.complete";

  private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
    "sink.event.drain.attempt";

  private static final String COUNTER_EVENT_DRAIN_SUCCESS =
    "sink.event.drain.sucess";

  private static final String[] ATTRIBUTES = {
    COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
    COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
    COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
    COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
  };


  public SinkCounter(String name) {
    super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
  }

  @Override
  public long getConnectionCreatedCount() {
    return get(COUNTER_CONNECTION_CREATED);
  }

  public long incrementConnectionCreatedCount() {
    return increment(COUNTER_CONNECTION_CREATED);
  }

}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。