监控
Flume 中的监控仍在进行中。变化可能经常发生。几个 Flume 组件向 JMX 平台 MBean 服务器报告度量标准。可以使用 Jconsole 查询这些指标。
可用的组件指标
下表显示了可用于组件的度量标准。每个组件仅维护一组度量,由“x”表示,未维护的值显示默认值,即 0.这些表告诉您可以在何处获得有意义的数据。度量标准的名称应该足够描述,有关更多信息,您必须深入了解组件的源代码。
来源 1
Avro | 执行 | HTTP | JMS | Kafka | MultiportSyslogTCP | Scribe | |
---|---|---|---|---|---|---|---|
AppendAcceptedCount | x | ||||||
AppendBatchAcceptedCount | x | x | x | ||||
AppendBatchReceivedCount | x | x | x | ||||
AppendReceivedCount | x | ||||||
ChannelWriteFail | x | x | x | x | x | x | |
EventAcceptedCount | x | x | x | x | x | x | x |
EventReadFail | x | x | x | x | x | ||
EventReceivedCount | x | x | x | x | x | x | x |
GenericProcessingFail | x | x | |||||
KafkaCommitTimer | x | ||||||
KafkaEmptyCount | x | ||||||
KafkaEventGetTimer | x | ||||||
OpenConnectionCount | x |
来源 2
SequenceGenerator | SpoolDirectory | SyslogTcp | SyslogUDP | Taildir | Thrift | |
---|---|---|---|---|---|---|
AppendAcceptedCount | x | |||||
AppendBatchAcceptedCount | x | x | x | x | ||
AppendBatchReceivedCount | x | x | x | |||
AppendReceivedCount | x | |||||
ChannelWriteFail | x | x | x | x | x | x |
EventAcceptedCount | x | x | x | x | x | x |
EventReadFail | x | x | x | x | ||
EventReceivedCount | x | x | x | x | x | |
GenericProcessingFail | x | x | ||||
KafkaCommitTimer | ||||||
KafkaEmptyCount | ||||||
KafkaEventGetTimer | ||||||
OpenConnectionCount |
水槽 1
Avro / Thrift | AsyncHBase | ElasticSearch | HBase | HBase2 | |
---|---|---|---|---|---|
BatchCompleteCount | x | x | x | x | x |
BatchEmptyCount | x | x | x | x | x |
BatchUnderflowCount | x | x | x | x | x |
ChannelReadFail | x | x | |||
ConnectionClosedCount | x | x | x | x | x |
ConnectionCreatedCount | x | x | x | x | x |
ConnectionFailedCount | x | x | x | x | x |
EventDrainAttemptCount | x | x | x | x | x |
EventDrainSuccessCount | x | x | x | x | x |
EventWriteFail | x | x | |||
KafkaEventSendTimer | |||||
RollbackCount |
水槽 2
HDFSEvent | Hive | Http | Kafka | Morphline | RollingFile | |
---|---|---|---|---|---|---|
BatchCompleteCount | x | x | x | |||
BatchEmptyCount | x | x | x | x | ||
BatchUnderflowCount | x | x | x | x | ||
ChannelReadFail | x | x | x | x | x | x |
ConnectionClosedCount | x | x | x | |||
ConnectionCreatedCount | x | x | x | |||
ConnectionFailedCount | x | x | x | |||
EventDrainAttemptCount | x | x | x | x | x | |
EventDrainSuccessCount | x | x | x | x | x | x |
EventWriteFail | x | x | x | x | x | x |
KafkaEventSendTimer | x | |||||
RollbackCount | x |
Channels
文件 | Kafka | 内存 | PseudoTxnMemory | SpillableMemory | |
---|---|---|---|---|---|
ChannelCapacity | x | x | x | ||
ChannelSize | x | x | x | x | |
CheckpointBackupWriteErrorCount | x | ||||
CheckpointWriteErrorCount | x | ||||
EventPutAttemptCount | x | x | x | x | x |
EventPutErrorCount | x | ||||
EventPutSuccessCount | x | x | x | x | x |
EventTakeAttemptCount | x | x | x | x | x |
EventTakeErrorCount | x | ||||
EventTakeSuccessCount | x | x | x | x | x |
KafkaCommitTimer | x | ||||
KafkaEventGetTimer | x | ||||
KafkaEventSendTimer | x | ||||
打开 | x | ||||
RollbackCounter | x | ||||
不 Health | x |
JMX 报告
可以通过使用 flume-env.sh 在 JAVA_OPTS 环境变量中指定 JMX 参数来启用 JMX 报告,如
export JAVA_OPTS =“ - Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port = 5445 -Dcom.sun.management.jmxremote.authenticate = false -Dcom.sun.management.jmxremote.ssl = false”
注意:上面的示例禁用安全性。要启用安全性,请参阅 http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html
Ganglia 报道
Flume 还可以将这些指标报告给 Ganglia 3 或 Ganglia 3.1 元节点。要向 Ganglia 报告指标,必须使用此支持启动水槽代理。必须通过传递以下参数来启动 Flume 代理,因为系统属性前缀为 flume.monitoring.
,可以在 flume-env.sh 中指定:
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称必须为 ganglia |
hosts | - | 以逗号分隔的 hostname:port Ganglia 服务器列表 |
pollFrequency | 60 | 连续向 Ganglia 服务器报告之间的时间(以秒为单位) |
isGanglia3 | false | Ganglia 服务器版本为 3.默认情况下,Flume 以 Ganglia 3.1 格式发送 |
我们可以通过 Ganglia 支持启动 Flume,如下所示:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
JSON 报告
Flume 还可以以 JSON 格式报告指标。为了以 JSON 格式启用报告,Flume 在可配置端口上托管 Web 服务器。 Flume 以以下 JSON 格式报告指标:
{
"typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"},
"typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"}
}
这是一个例子:
{
"CHANNEL.fileChannel":{"EventPutSuccessCount":"468085",
"Type":"CHANNEL",
"StopTime":"0",
"EventPutAttemptCount":"468086",
"ChannelSize":"233428",
"StartTime":"1344882233070",
"EventTakeSuccessCount":"458200",
"ChannelCapacity":"600000",
"EventTakeAttemptCount":"458288"},
"CHANNEL.memChannel":{"EventPutSuccessCount":"22948908",
"Type":"CHANNEL",
"StopTime":"0",
"EventPutAttemptCount":"22948908",
"ChannelSize":"5",
"StartTime":"1344882209413",
"EventTakeSuccessCount":"22948900",
"ChannelCapacity":"100",
"EventTakeAttemptCount":"22948908"}
}
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称必须为 http |
port | 41414 | 启动服务器的端口。 |
我们可以使用 JSON 报告支持启动 Flume,如下所示:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
然后会在 http://<hostname>:<port>/metrics 网页上提供指标。自定义组件可以报告上面 Ganglia 部分中提到的指标。
自定义报告
可以通过编写执行报告的服务器向其他系统报告指标。任何报告类都必须实现该接口, org.apache.flume.instrumentation.MonitorService
。这样的类可以与 GangliaServer 用于报告的方式相同。他们可以轮询平台 mbean 服务器以轮询 mbeans 以获取指标。例如,如果 HTTP 监视服务名为 HTTPReporting
可以使用如下:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称必须为 FQCN |
报告自定义组件的指标
任何自定义水槽组件都应该继承 org.apache.flume.instrumentation.MonitoredCounterGroup
类。然后,该类应为其公开的每个度量标准提供 getter 方法。请参阅下面的代码。 MonitoredCounterGroup 需要一个属性列表,其度量由此类公开。截至目前,此类仅支持将度量标准公开为长值。
public class SinkCounter extends MonitoredCounterGroup implements
SinkCounterMBean {
private static final String COUNTER_CONNECTION_CREATED =
"sink.connection.creation.count";
private static final String COUNTER_CONNECTION_CLOSED =
"sink.connection.closed.count";
private static final String COUNTER_CONNECTION_FAILED =
"sink.connection.failed.count";
private static final String COUNTER_BATCH_EMPTY =
"sink.batch.empty";
private static final String COUNTER_BATCH_UNDERFLOW =
"sink.batch.underflow";
private static final String COUNTER_BATCH_COMPLETE =
"sink.batch.complete";
private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
"sink.event.drain.attempt";
private static final String COUNTER_EVENT_DRAIN_SUCCESS =
"sink.event.drain.sucess";
private static final String[] ATTRIBUTES = {
COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
};
public SinkCounter(String name) {
super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
}
@Override
public long getConnectionCreatedCount() {
return get(COUNTER_CONNECTION_CREATED);
}
public long incrementConnectionCreatedCount() {
return increment(COUNTER_CONNECTION_CREATED);
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论