配置
如前面部分所述,Flume 代理程序配置是从类似于具有分层属性设置的 Java 属性文件格式的文件中读取的。
定义流程
要在单个代理中定义流,您需要通过通道链接源和接收器。您需要列出给定代理的源,接收器和通道,然后将源和接收器指向通道。源实例可以指定多个通道,但接收器实例只能指定一个通道。格式如下:
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>
# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>
例如,名为 agent_foo 的代理正在从外部 avro 客户端读取数据并通过内存通道将其发送到 HDFS。配置文件 weblog.config 可能如下所示:
# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1
# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
这将使事件流动起来 avro-AppSrv-source 到 hdfs-Cluster1-sink 通过内存通道 mem-channel-1。当使用 weblog.config 作为其配置文件启动代理程序时,它将实例化该流程。
配置各个组件
定义流后,您需要设置每个源,接收器和通道的属性。这是以相同的分层命名空间方式完成的,您可以在其中设置组件类型以及特定于每个组件的属性的其他值:
# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue>
# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>
# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>
需要为 Flume 的每个组件设置属性“type”,以了解它需要什么类型的对象。每个源,接收器和通道类型都有自己的一组属性,使其能够按预期运行。所有这些都需要根据需要进行设置。在前面的示例中,我们有一个从 avro-AppSrv-source 到 hdfs-Cluster1-sink 的流程通过内存通道 mem-channel-1。这是一个显示每个组件配置的示例:
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1
# set channel for sources, sinks
# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000
# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100
# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
#...
在代理中添加多个流
单个 Flume 代理可以包含多个独立流。您可以在配置中列出多个源,接收器和通道。可以链接这些组件以形成多个流:
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
然后,您可以将源和接收器链接到通道(用于接收器)的相应通道(用于源),以设置两个不同的流。例如,如果您需要在代理中设置两个流,一个从外部 avro 客户端到外部 HDFS,另一个从尾部输出到 avro 接收器,那么这是一个配置来执行此操作:
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
配置多代理程序流
要设置多层流,您需要有第一跳的 avro / thrift 接收器指向下一跳的 avro / thrift 源。这将导致第一个 Flume 代理将事件转发到下一个 Flume 代理。例如,如果您使用 avro 客户端定期向本地 Flume 代理发送文件(每个事件 1 个文件),则此本地代理可以将其转发到另一个已安装存储的代理。
Weblog 代理配置:
# list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel
# define the flow
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel
# avro sink properties
agent_foo.sinks.avro-forward-sink.type = avro
agent_foo.sinks.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sinks.avro-forward-sink.port = 10000
# configure other pieces
#...
HDFS 代理配置:
# list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel
# define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel
# avro source properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000
# configure other pieces
#...
在这里,我们将 weblog 代理的 avro-forward-sink 链接到 hdfs 代理的 avro-collection-source。这将导致来自外部应用程序服务器源的事件最终存储在 HDFS 中。
扇出流动
如前一节所述,Flume 支持扇出从一个源到多个通道的流量。扇出,复制和多路复用有两种模式。在复制流程中,事件将发送到所有已配置的通道。在多路复用的情况下,事件仅被发送到合格信道的子集。为了散开流量,需要指定源的通道列表以及扇出它的策略。这是通过添加可以复制或多路复用的通道“选择器”来完成的。如果它是多路复用器,则进一步指定选择规则。如果您没有指定选择器,那么默认情况下它会复制:
# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
<Agent>.sources.<Source1>.selector.type = replicating
多路复用选择具有另一组属性以分流流。这需要指定事件属性到通道集的映射。选择器检查事件头中的每个已配置属性。如果它与指定的值匹配,则该事件将发送到映射到该值的所有通道。如果没有匹配项,则将事件发送到配置为默认值的通道集:
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...
<Agent>.sources.<Source1>.selector.default = <Channel2>
映射允许为每个值重叠通道。
以下示例具有多路复用到两个路径的单个流。名为 agent_foo 的代理具有单个 avro 源和两个链接到两个接收器的通道:
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2
# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
选择器检查名为“State”的标头。如果该值为“CA”,则将其发送到 mem-channel-1,如果其为“AZ”,则将其发送到文件通道-2,或者如果其为“NY”则为两者。如果“状态” Headers 未设置或与三者中的任何一个都不匹配,则它将转到 mem-channel-1,其被指定为“default”。
选择器还支持可选通道。要为标头指定可选通道,可通过以下方式使用 config 参数“optional”:
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
选择器将首先尝试写入所需的通道,如果其中一个通道无法使用事件,则会使事务失败。该事务在 all Channels 上重新尝试。一旦所有必需的通道消耗了事件,则选择器将尝试写入可选通道。任何可选通道使用该事件的失败都会被忽略而不会重试。
如果可选信道与特定报头的所需信道之间存在重叠,则认为该信道是必需的,并且信道中的故障将导致重试所有必需信道集。例如,在上面的示例中,对于 Headers “CA”,mem-channel-1 被认为是必需的通道,即使它被标记为必需和可选,并且写入此通道的失败将导致该事件 all 重审为选择器配置的通道。
请注意,如果标头没有任何所需的通道,则该事件将被写入默认通道,并将尝试写入该标头的可选通道。如果未指定所需的通道,则指定可选通道仍会将事件写入默认通道。如果没有将通道指定为默认通道且没有必需通道,则选择器将尝试将事件写入可选通道。在这种情况下,任何失败都会被忽略。
SSL / TLS 支持
多个 Flume 组件支持 SSL / TLS 协议,以便安全地与其他系统通信。
组件 | SSL 服务器或客户端 |
---|---|
Avro 来源 | 服务器 |
Avro Sink | client |
Thrift Source | server |
Thrift Sink | client |
Kafka 来源 | 客户 |
Kafka Channel | client |
Kafka Sink | client |
HTTP 源 | 服务器 |
JMS 来源 | 客户 |
Syslog TCP 源 | 服务器 |
Multiport Syslog TCP 源 | 服务器 |
SSL 兼容组件具有若干配置参数来设置 SSL,例如启用 SSL 标志,密钥库/信任库参数(位置,密码,类型)和其他 SSL 参数(例如禁用的协议)。
始终在代理配置文件的组件级别指定为组件启用 SSL。因此,某些组件可能配置为使用 SSL,而其他组件则不配置(即使具有相同的组件类型)。
密钥库/信任库设置可以在组件级别或全局指定。
在组件级别设置的情况下,通过组件特定参数在代理配置文件中配置密钥库/信任库。此方法的优点是组件可以使用不同的密钥库(如果需要)。缺点是必须为代理配置文件中的每个组件复制密钥库参数。组件级别设置是可选的,但如果已定义,则其优先级高于全局参数。
使用全局设置,只需定义一次密钥库/信任库参数,并对所有组件使用相同的设置,这意味着更少和更集中的配置。
可以通过系统属性或通过环境变量来配置全局设置。
系统属性 | 环境变量 | 描述 |
---|---|---|
javax.net.ssl.keyStore | FLUME_SSL_KEYSTORE_PATH | 密钥库位置 |
javax.net.ssl.keyStorePassword | FLUME_SSL_KEYSTORE_PASSWORD | 密钥库密码 |
javax.net.ssl.keyStoreType | FLUME_SSL_KEYSTORE_TYPE | 密钥库类型(默认为 JKS) |
javax.net.ssl.trustStore | FLUME_SSL_TRUSTSTORE_PATH | 信任库位置 |
javax.net.ssl.trustStorePassword | FLUME_SSL_TRUSTSTORE_PASSWORD | Truststore 密码 |
javax.net.ssl.trustStoreType | FLUME_SSL_TRUSTSTORE_TYPE | 信任库类型(默认为 JKS) |
flume.ssl.include.protocols | FLUME_SSL_INCLUDE_PROTOCOLS | 计算启用的协议时要包含的协议。逗号(,)分隔列表。如果提供,排除的协议将从此列表中排除。 |
flume.ssl.exclude.protocols | FLUME_SSL_EXCLUDE_PROTOCOLS | 计算启用的协议时要排除的协议。逗号(,)分隔列表。 |
flume.ssl.include.cipherSuites | FLUME_SSL_INCLUDE_CIPHERSUITES | 在计算启用的密码套件时要包含的密码套件。逗号(,)分隔列表。如果提供,排除的密码套件将被排除在此列表之外。 |
flume.ssl.exclude.cipherSuites | FLUME_SSL_EXCLUDE_CIPHERSUITES | 在计算启用的密码套件时要排除的密码套件。逗号(,)分隔列表。 |
SSL 系统属性可以在命令行上传递,也可以通过设置 JAVA_OPTS
来传递
conf / flume-env.sh 中的环境变量。 (尽管使用命令行是不可取的,因为包含密码的命令将保存到命令历史记录中。)
export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore.jks"
export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStorePassword=password"
Flume 使用 JSSE(Java 安全套接字扩展)中定义的系统属性,因此这是设置 SSL 的标准方法。另一方面,在系统属性中指定密码意味着可以在进程列表中看到密码。对于不可接受的情况,也可以在环境变量中定义参数。在这种情况下,Flume 在内部从相应的环境变量初始化 JSSE 系统属性。
SSL 环境变量可以在之前的 shell 环境中设置启动 Flume 或在 conf / flume-env.sh 中。 (尽管使用命令行是不可取的,因为包含密码的命令将保存到命令历史记录中。)
export FLUME_SSL_KEYSTORE_PATH=/path/to/keystore.jks
export FLUME_SSL_KEYSTORE_PASSWORD=password
Please note:
必须在组件级别启用 SSL。仅指定全局 SSL 参数不会产生任何影响。
如果在多个级别指定了全局 SSL 参数,则优先级如下(从高到低):
代理程序配置中的组件参数
系统属性
环境变量
如果为组件启用了 SSL,但未以上述任何方式指定 SSL 参数,则
在密钥库的情况下:配置错误
_999_如果是 truststores:将使用默认信任库(jssecacerts
/cacerts
在 Oracle JDK 中)在所有情况下,可信任密码都是可选的。如果未指定,则在 JDK 打开信任库时,不会对信任库执行完整性检查。
源和接收批量大小和通道事务容量
源和接收器可以具有批量大小参数,该参数确定它们在一个批次中处理的最大事件数。这发生在具有称为事务容量的上限的通道事务中。批量大小必须小于渠道的 Transaction 容量。有一个明确的检查,以防止不兼容的设置。只要读取配置,就会进行此检查。
水槽来源
Avro 来源
侦听 Avro 端口并从外部 Avro 客户端流接收事件。当与另一个(上一跳)Flume 代理上的内置 Avro Sink 配对时,它可以创建分层集合拓扑。必需属性位于 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channels | - | ||
type | - | 组件类型名称,需要 avro | |
bind | - | 要收听的主机名或 IP 地址 | |
port | - | 要绑定到 | 的端口号 |
threads | - | 要生成的最大工作线程数 | |
selector.type | |||
选择器。* | |||
interceptors | - | 以空格分隔的拦截器列表 | |
拦截器。* | |||
compression-type | none | 这可以是“none”或“deflate”。压缩类型必须与匹配 AvroSource | 的压缩类型匹配 |
ssl | false | 将此项设置为 true 以启用 SSL 加密。如果启用了 SSL,则还必须通过组件级参数(请参阅下文)或全局 SSL 参数(请参阅 SSL/TLS support 部分)指定“密钥库”和“密钥库密码”。 | |
keystore | - | 这是 Java 密钥库文件的路径。如果未在此处指定,则将使用全局密钥库(如果已定义,则配置错误)。 | |
keystore-password | - | Java 密钥库的密码。如果未在此处指定,则将使用全局密钥库密码(如果已定义,则配置错误)。 | |
keystore-type | JKS | Java 密钥库的类型。这可以是“JKS”或“PKCS12”。如果未在此处指定,则将使用全局密钥库类型(如果已定义,则默认为 JKS)。 | |
exclude-protocols | SSLv3 | 要排除的以空格分隔的 SSL / TLS 协议列表。除指定的协议外,将始终排除 SSLv3。 | |
include-protocols | - | 要包含的以空格分隔的 SSL / TLS 协议列表。启用的协议将是包含的协议,没有排除的协议。如果包含协议为空,则它包括每个支持的协议。 | |
exclude-cipher-suites | - | 要排除的以空格分隔的密码套件列表。 | |
include-cipher-suites | - | 要包含的以空格分隔的密码套件列表。启用的密码套件将是包含的密码套件,不包括排除的密码套件。如果 included-cipher-suites 为空,则包含每个支持的密码套件。 | |
ipFilter | false | 将此设置为 true 以启用 ipFiltering for netty | |
ipFilterRules | - | 使用此配置定义 N netty ipFilter 模式规则。 |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
ipFilterRules 的示例
ipFilterRules 定义由逗号分隔的 N 个 netty ipFilters 模式规则必须采用此格式。
<'allow'或 deny>:<'ip'或'name'代表计算机名>:<pattern>或 allow / deny:ip / name:pattern
示例:ipFilterRules = allow:ip:127。,allow:name:localhost,deny:ip:
请注意,匹配的第一个规则将适用,如下例所示,来自 localhost 上的客户端
这将允许 localhost 上的客户端拒绝来自任何其他 ip 的客户端 llow:name:localhost,deny:ip:'这将拒绝 localhost 上的客户端允许来自任何其他 ip 的客户端:name:localhost,allow:ip:鈥
节俭来源
侦听 Thrift 端口并从外部 Thrift 客户端流接收事件。当与另一个(上一跳)Flume 代理上的内置 ThriftSink 配对时,它可以创建分层集合拓扑。可以通过启用 kerberos 身份验证将 Thrift 源配置为以安全模式启动。 agent-principal 和 agent-keytab 是 Thrift 源用于向 kerberos KDC 进行身份验证的属性。必需属性位于 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
channels | 鈥 | |
type | 组件类型名称,需要 thrift | |
bind | 要收听的主机名或 IP 地址 | |
port | 端口号绑定到 | |
threads | 要生成的最大工作线程数 | |
selector.type | ||
选择器。* | ||
拦截器 | 空格隔离拦截器列表 | |
拦截器。* | ||
ssl | false | 将此项设置为 true 以启用 SSL 加密。如果启用了 SSL,则还必须通过组件级参数(参见下文)或全局 SSL 参数(请参阅 SSL/TLS support 部分)指定“ Store ”和“ Store 密码” |
keystore | 这是 Java 密钥库文件的路径。如果未在此处指定,则将使用全局密钥库(如果已定义,则配置错误)。 | |
keystore-password | Java 密钥库的密码。如果未在此处指定,则将使用全局密钥库密码(如果已定义,则配置错误)。 | |
keystore-type | JKS | Java 密钥库的类型。这可以是“KS”或“KCS12”。如果未在此处指定,则将使用全局密钥库类型(如果已定义,则默认为 JKS)。 |
exclude-protocols | SSLv3 | 要排除的以空格分隔的 SSL / TLS 协议列表。除指定的协议外,将始终排除 SSLv3。 |
include-protocols | 要包含的以空格分隔的 SSL / TLS 协议列表。启用的协议将是包含的协议,没有排除的协议。如果包含协议为空,则它包括每个支持的协议。 | |
exclude-cipher-suites | 要排除的以空格分隔的密码套件列表。 | |
include-cipher-suites | 鈥 | 包含空格分隔的密码套件列表。启用的密码套件将是包含的密码套件,不包括排除的密码套件。 |
kerberos | false | 设置为 true 以启用 kerberos 身份验证。在 kerberos 模式下,成功进行身份验证需要 agent-principal 和 agent-keytab。安全模式下的 Thrift 源仅接受来自已启用 kerberos 且已成功通过 kerberos KDC 验证的 Thrift 客户端的连接。 |
agent-principal | Thrift Source 用于向 kerberos KDC 进行身份验证的 kerberos 主体。 | |
agent-keytab | 芒聙聪 - | Thrift Source 与 agent-principal 结合使用的 keytab 位置,用于对 kerberos KDC 进行身份验证。 |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
执行来源
Exec 源在启动时运行给定的 Unix 命令,并期望该进程在标准输出上连续生成数据(stderr 被简单地丢弃,除非属性 logStdErr 设置为 true)。如果进程因任何原因退出,则源也会退出并且不会生成其他数据。这意味着 cat [named pipe]
等配置
或 tail -F [file]
如果 date
将产生预期的结果
可能不会 - 前两个命令产生数据流,而后者产生单个事件并退出。
必需属性位于 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channels | 鈥 | ||
type | 组件类型名称,需要 exec | ||
command | 执行 | 的命令 | |
shell | 用于运行命令的 shell 调用。例如/ bin / sh -c。仅适用于依赖于通配符,后退标记,管道等 shell 功能的命令。 | ||
restartThrottle | 10000 | 尝试重启前等待的时间(以毫秒为单位) | |
restart | false | 执行的 cmd 是否应该重新启动 | |
logStdErr | false | 是否应记录命令“stderr” | |
batchSize | 20 | 一次读取和发送到通道的最大行数 | |
batchTimeout | 3000 | 在向下游推送数据之前,如果未达到缓冲区大小,则等待的时间(以毫秒为单位) | |
selector.type | 复制 | 复制或多路复用 | |
selector。* | 取决于 selector.type 值 | ||
interceptors | - | 以空格分隔的拦截器列表 | |
拦截器。* |
警告
ExecSource 和其他异步源的问题在于,如果无法将事件放入 Channel 中,则源无法保证客户端知道它。在这种情况下,数据将丢失。例如,最常请求的功能之一是 tail -F [file]
类似用例,其中应用程序写入磁盘上的日志文件,Flume 将文件作为尾部发送,将每一行作为事件发送。虽然这是可能的,但是有一个明显的问题;如果 Channels 填满并且 Flume 无法发送事件,会发生什么?由于某种原因,Flume 无法向编写日志文件的应用程序指示它需要保留日志或事件尚未发送。如果这没有意义,您只需要知道:当使用 ExecSource 等单向异步接口时,您的应用程序永远无法保证已收到数据!作为此警告的延伸 - 并且完全清楚 - 使用此源时,事件传递绝对没有保证。为了获得更强的可靠性保证,请考虑 Spooling Directory Source,Taildir Source 或通过 SDK 直接与 Flume 集成。
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
'shell'配置用于通过命令 shell(例如 Bash 或 Powershell)调用'命令'。 'command'作为参数传递给'shell'执行。这允许'命令'使用 shell 中的功能,例如通配符,后退标记,管道,循环,条件等。如果没有'shell'配置,将直接调用'command'。 'shell'的常用值:'/ bin / sh -c','/ bin / ksh -c','cmd / c','powershell -Command'等。
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
JMS 来源
JMS Source 从 JMS 目标(例如队列或主题)读取消息。作为 JMS 应用程序,它应该与任何 JMS 提供程序一起使用,但仅使用 ActiveMQ 进行测试。 JMS 源提供可配置的批量大小,消息选择器,用户/传递和消息到水槽事件转换器。请注意,供应商提供的 JMS jar 应该包含在 Flume 类路径中,使用 plugins.d 目录(首选),命令行上的-classpath 或 flume-env.sh 中的 FLUME_CLASSPATH 变量。
必需属性位于 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
channels | - | |
type | - | 组件类型名称,需要 jms |
initialContextFactory | - | 初始上下文工厂,例如:org.apache.activemq.jndi.ActiveMQInitialContextFactory |
connectionFactory | - | 连接工厂的 JNDI 名称应显示为 |
providerURL | - | JMS 提供程序 URL |
destinationName | - | 目的地名称 |
destinationType | - | 目的地类型(队列或主题) |
messageSelector | - | 创建消费者时使用的消息选择器 |
userName | - | 目的地/提供者的用户名 |
passwordFile | - | 包含目标/提供商密码的文件 |
batchSize | 100 | 一批消耗的消息数量 |
converter.type | DEFAULT | 用于将消息转换为水槽事件的类。见下文。 |
converter。* | - | 转换器属性。 |
converter.charset | UTF-8 | 仅限默认转换器。在将 JMS TextMessages 转换为字节数组时使用的字符集。 |
createDurableSubscription | false | 是否创建持久订阅。持久订阅只能与 destinationType 主题一起使用。如果为 true,则必须指定“clientId”和“durableSubscriptionName”。 |
clientId | - | JMS 客户端标识符在创建后立即在 Connection 上设置。持久订阅必需。 |
durableSubscriptionName | - | 用于标识持久订阅的名称。持久订阅必需。 |
JMS 消息转换器
JMS 源允许可插拔转换器,尽管默认转换器可能适用于大多数用途。默认转换器能够将字节,文本和对象消息转换为 FlumeEvents。在所有情况下,消息中的属性都将作为 Headers 添加到 FlumeEvent 中。
- BytesMessage:
消息的字节被复制到 FlumeEvent 的主体。每封邮件无法转换超过 2GB 的数据。
- TextMessage:
消息文本转换为字节数组并复制到 FlumeEvent 的主体。默认转换器默认使用 UTF-8,但这是可配置的。
- ObjectMessage:
Object 被写入包含在 ObjectOutputStream 中的 ByteArrayOutputStream,并将生成的数组复制到 FlumeEvent 的主体。
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
SSL 和 JMS 源
JMS 客户端实现通常支持通过 JSSE(Java 安全套接字扩展)定义的某些 Java 系统属性来配置 SSL / TLS。为 Flume 的 JVM 指定这些系统属性,JMS 源(或更准确地说是 JMS 源使用的 JMS 客户端实现)可以通过 SSL 连接到 JMS 服务器(当然,仅当 JMS 服务器也已设置为使用 SSL 时)。它应该可以与任何 JMS 提供程序一起使用,并且已经过 ActiveMQ,IBM MQ 和 Oracle WebLogic 的测试。
以下部分仅介绍 Flume 方面所需的 SSL 配置步骤。您可以在 Flume Wiki 上找到有关不同 JMS 提供程序的服务器端设置以及完整工作配置示例的更详细说明。
SSL transport / server authentication:
如果 JMS 服务器使用自签名证书或其证书由不受信任的 CA(例如公司自己的 CA)签名,则需要设置信任库(包含正确的证书)并传递给 Flume。它可以通过全局 SSL 参数完成。有关全局 SSL 设置的更多详细信息,请参阅 SSL/TLS support 部分。
使用 SSL 时,某些 JMS 提供程序需要 SSL 特定的 JNDI 初始上下文工厂和/或提供程序 URL 设置(例如,ActiveMQ 使用 ssl:// URL 前缀而不是 tcp://)。在这种情况下,源属性( initialContextFactory
和/或 providerURL
)必须在代理配置文件中进行调整。
Client certificate authentication (two-way SSL):
JMS Source 可以通过客户端证书身份验证而不是通常的用户/密码登录来对 JMS 服务器进行身份验证(使用 SSL 并且 JMS 服务器配置为接受此类身份验证时)。
包含用于身份验证的 Flume 密钥的密钥库需要再次通过全局 SSL 参数进行配置。有关全局 SSL 设置的更多详细信息,请参阅 SSL/TLS support 部分。
密钥库应该只包含一个密钥(如果存在多个密钥,则将使用第一个密钥)。密钥密码必须与密钥库密码相同。
在客户端证书身份验证的情况下,不需要指定 userName
/ passwordFile
Flume 代理程序配置文件中 JMS 源的属性。
Please note:
与其他组件不同,JMS Source 没有组件级别的配置参数。也没有启用 SSL 标志。 SSL 设置由 JNDI / Provider URL 设置(最终是 JMS 服务器设置)以及 truststore / keystore 的存在/不存在控制。
假脱机目录来源
此源允许您通过将要摄取的文件放入磁盘上的“假脱机”目录来摄取数据。此源将查看新文件的指定目录,并将在新文件出现时解析事件。事件解析逻辑是可插入的。在给定文件完全读入通道后,默认情况下通过重命名文件来指示完成,或者可以删除它或使用 trackerDir 来跟踪已处理的文件。
与 Exec 源不同,即使 Flume 重新启动或被杀死,此源也是可靠的并且不会遗漏数据。作为这种可靠性的交换,只有不可变的,唯一命名的文件必须被放入假脱机目录中。 Flume 试图检测这些问题,如果违反则会大声失败:
如果在放入假脱机目录后写入文件,Flume 会在其日志文件中输出错误并停止处理。
如果以后重复使用文件名,Flume 会在其日志文件中输出错误并停止处理。
为避免上述问题,在将文件名移动到假脱机目录中时,添加唯一标识符(例如时间戳)可能很有用。
尽管该源的可靠性保证,但仍存在如果发生某些下游故障则可能重复事件的情况。这与其他 Flume 组件提供的保证一致。
属性名称 | 默认 | 说明 |
---|---|---|
channels | - | |
type | - | 组件类型名称,需要 spooldir 。 |
spoolDir | - | 从中读取文件的目录。 |
fileSuffix | .COMPLETED | 附加到完全摄取的文件的后缀 |
deletePolicy | never | 何时删除已完成的文件: never 或 immediate |
fileHeader | false | 是否添加存储绝对路径文件名的标头。 |
fileHeaderKey | file | 将绝对路径文件名追加到事件 Headers 时使用的 Headers 键。 |
basenameHeader | false | 是否添加存储文件基本名称的标头。 |
basenameHeaderKey | basename | Headers 将文件的基名附加到事件 Headers 时使用的 Headers 。 |
includePattern | ^。* $ | 正则表达式,指定要包含的文件。它可以与 ignorePattern 一起使用。如果文件与 ignorePattern 和 includePattern 正则表达式匹配,则忽略该文件。 |
ignorePattern | ^ $ | 正则表达式,指定要忽略的文件(跳过)。它可以与 includePattern 一起使用。如果文件与 ignorePattern 和 includePattern 正则表达式匹配,则忽略该文件。 |
trackerDir | .flumespool | 用于存储与文件处理相关的元数据的目录。如果此路径不是绝对路径,则将其解释为相对于 spoolDir。 |
trackingPolicy | rename | 跟踪策略定义如何跟踪文件处理。它可以是“重命名”或“tracker_dir”。此参数仅在 deletePolicy 为“never”时有效。 “重命名” - 处理完文件后,会根据 fileSuffix 参数重命名。 “tracker_dir” - 不重命名文件,但会在 trackerDir 中创建新的空文件。新的跟踪器文件名源自摄取的文件名和 fileSuffix。 |
consumeOrder | 最旧 | 假冒伪劣目录中的文件将被消耗 oldest , youngest 和 random 。在 oldest 和 youngest 的情况下,文件的最后修改时间将用于比较文件。如果出现平局,将首先消耗具有最小字典顺序的文件。在 random 的情况下,将随机挑选任何文件。当使用 oldest 和 youngest 时,将扫描整个目录以选择最旧/最年轻的文件,如果存在大量文件,则可能会很慢,而使用 random 可能会导致旧文件在新文件不断进入时很晚被占用假脱机目录。 |
pollDelay | 500 | 轮询新文件时使用的延迟(以毫秒为单位)。 |
recursiveDirectorySearch | false | 是否监视子目录以查找要读取的新文件。 |
maxBackoff | 4000 | 如果通道已满,则在连续尝试写入通道之间等待的最长时间(以毫秒为单位)。源将以低退避开始,并在每次通道抛出 ChannelException 时以指数方式增加,直到此参数指定的值。 |
batchSize | 100 | 批量转移到通道的粒度 |
inputCharset | UTF-8 | 反序列化程序使用的字符集,用于将输入文件视为文本。 |
decodeErrorPolicy | FAIL | 当我们在输入文件中看到不可解码的字符时该怎么办。 FAIL :抛出异常并且无法解析文件。 REPLACE :用“替换字符”字符替换不可解析字符,通常为 Unicode U FFFD。 IGNORE :删除不可解析的字符序列。 |
deserializer | LINE | 指定用于将文件解析为事件的反序列化程序。默认将每行解析为事件。指定的类必须实现 EventDeserializer.Builder 。 |
deserializer。* | 每个事件反序列化器都有变化。 | |
bufferMaxLines | - | (Obselete)此选项现在被忽略。 |
bufferMaxLineLength | 5000 | (已弃用)提交缓冲区中行的最大长度。请改用 deserializer.maxLineLength。 |
selector.type | 复制 | 复制或多路复用 |
selector。* | 取决于 selector.type 值 | |
interceptors | - | 以空格分隔的拦截器列表 |
拦截器。* |
名为 agent-1 的代理示例:
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
事件反序列化器
以下事件反序列化器随 Flume 一起提供。
LINE
此解串器为每行文本输入生成一个事件。
属性名称 | 默认 | 说明 |
---|---|---|
deserializer.maxLineLength | 2048 | 单个事件中包含的最大字符数。如果一行超过此长度,则会被截断,并且该行上的其余字符将出现在后续事件中。 |
deserializer.outputCharset | UTF-8 | 用于编码放入通道的事件的字符集。 |
AVRO
此解串器能够读取 Avro 容器文件,并在文件中为每个 Avro 记录生成一个事件。每个活动都是用 Headers 指示,指示使用的模式。事件的主体是二进制 Avro 记录数据,不包括模式或容器文件元素的其余部分。
请注意,如果假脱机目录源必须重试将其中一个事件放入通道(例如,因为通道已满),则它将重置并从最新的 Avro 容器文件同步点重试。为了减少此类故障情况下的潜在事件重复,请在 Avro 输入文件中更频繁地写入同步标记。
属性名称 | 默认 | 说明 |
---|---|---|
deserializer.schemaType | HASH | 如何表示架构。默认情况下,或者指定值 HASH 时,将对 Avro 架构进行哈希处理,并将哈希值存储在事件头“flume.avro.schema.hash”中的每个事件中。如果指定了 LITERAL ,则 JSON 编码的模式本身存储在事件头“flume.avro.schema.literal”中的每个事件中。与 HASH 模式相比,使用 LITERAL 模式效率相对较低。 |
BlobDeserializer
此反序列化器为每个事件读取二进制大对象(BLOB),通常每个文件一个 BLOB。例如 PDF 或 JPG 文件。请注意,此方法不适用于非常大的对象,因为整个 BLOB 都缓存在 RAM 中。
属性名称 | 默认 | 说明 |
---|---|---|
deserializer | - | 此类的 FQCN: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder |
deserializer.maxBlobLength | 100000000 | 给定请求的最大字节数和缓冲区 |
Taildir 来源
注意
This source is provided as a preview feature. It does not work on Windows.
观察指定的文件,并在检测到添加到每个文件的新行后几乎实时地拖尾它们。如果正在写入新行,则此源将重试读取它们以等待写入完成。
此源是可靠的,即使拖尾文件旋转也不会丢失数据。它定期以 JSON 格式写入给定位置文件上每个文件的最后读取位置。如果 Flume 由于某种原因停止或停止,它可以从写在现有位置文件上的位置重新开始拖尾。
在其他用例中,此源也可以使用给定的位置文件从每个文件的任意位置开始拖尾。当指定路径上没有位置文件时,默认情况下它将从每个文件的第一行开始拖尾。
文件将按修改时间顺序使用。将首先使用具有最早修改时间的文件。
此源不会重命名或删除或对正在挂载的文件执行任何修改。目前此源不支持拖尾二进制文件。它逐行读取文本文件。
属性名称 | 默认 | 说明 |
---|---|---|
channels | - | |
type | - | 组件类型名称,需要为 TAILDIR 。 |
filegroups | - | 以空格分隔的文件组列表。每个文件组都指示一组要挂起的文件。 |
filegroups.<filegroupName> | - | 文件组的绝对路径。正则表达式(而不是文件系统模式)只能用于文件名。 |
positionFile | ~ / .flume / taildir_position.json | JSON 格式的文件,用于记录每个尾部文件的 inode,绝对路径和最后位置。 |
headers。<filegroupName>。<headerKey> | - | Headers 值,使用 Headers 键设置。可以为一个文件组指定多个标头。 |
byteOffsetHeader | false | 是否将尾线的字节偏移量添加到名为“byteoffset”的标头中。 |
skipToEnd | false | 如果文件未写入位置文件,是否跳过位置到 EOF。 |
idleTimeout | 120000 | 关闭非活动文件的时间(毫秒)。如果关闭的文件附加了新行,则此源将自动重新打开它。 |
writePosInterval | 3000 | 写入位置文件上每个文件的最后位置的间隔时间(ms)。 |
batchSize | 100 | 一次读取和发送到通道的最大行数。使用默认值通常很好。 |
maxBatchCount | Long.MAX_VALUE | 控制从同一文件连续读取的批次数。如果源正在拖尾多个文件,并且其中一个文件以快速写入,则可以防止处理其他文件,因为繁忙文件将在无限循环中读取。在这种情况下,降低此值。 |
backoffSleepIncrement | 1000 | 在最后一次尝试未找到任何新数据时,重新尝试轮询新数据之前的时间延迟增量。 |
maxBackoffSleep | 5000 | 每次重新尝试轮询新数据时的最大时间延迟,当最后一次尝试未找到任何新数据时。 |
cachePatternMatching | true | 对于包含数千个文件的目录,列出目录并应用文件名正则表达式模式可能非常耗时。缓存匹配文件列表可以提高性能。消耗文件的顺序也将被缓存。要求文件系统以至少 1 秒的粒度跟踪修改时间。 |
fileHeader | false | 是否添加存储绝对路径文件名的标头。 |
fileHeaderKey | file | 将绝对路径文件名追加到事件 Headers 时使用的 Headers 键。 |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000
Twitter 1%firehose 来源(实验性)
警告
这个来源是高度实验性的,可能会在次要版本的 Flume 之间发生变化。使用风险由您自己承担。
通过 Streaming API 连接到 1%示例 twitter firehose 的实验源,不断下载推文,将它们转换为 Avro 格式,并将 Avro 事件发送到下游 Flume 接收器。需要消费者并访问 Twitter 开发者帐户的令牌和秘密。必需属性在 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
channels | - | |
type | - | 组件类型名称,需要 org.apache.flume.source.twitter.TwitterSource |
consumerKey | - | OAuth 使用者密钥 |
consumerSecret | - | OAuth 消费者秘密 |
accessToken | - | OAuth 访问令牌 |
accessTokenSecret | - | OAuth 令牌密码 |
maxBatchSize | 1000 | 单批投放的最大推文消息数 |
maxBatchDurationMillis | 1000 | 关闭批次前等待的最大毫秒数 |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
a1.sources.r1.channels = c1
a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
a1.sources.r1.maxBatchSize = 10
a1.sources.r1.maxBatchDurationMillis = 200
Kafka 来源
Kafka Source 是一个 Apache Kafka 消费者,它从 Kafka 主题中读取消息。如果您运行了多个 Kafka 源,则可以使用相同的使用者组配置它们,以便每个源都读取一组唯一的主题分区。这目前支持 Kafka 服务器版本 0.10.1.0 或更高版本。测试完成了 2.0.1,这是发布时最高的可用版本。
属性名称 | 默认 | 说明 |
---|---|---|
channels | - | |
type | - | 组件类型名称,需要 org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | - | 来源使用的 Kafka 集群中的代理列表 |
kafka.consumer.group.id | flume | 唯一确定的消费者群体。在多个源或代理中设置相同的 ID 表示它们属于同一个使用者组 |
kafka.topics | - | kafka 消费者将从中读取消息的以逗号分隔的主题列表。 |
kafka.topics.regex | - | 正则表达式,用于定义订阅源的主题集。此属性的优先级高于 kafka.topics ,如果存在,则覆盖 kafka.topics 。 |
batchSize | 1000 | 一批中写入通道的最大消息数 |
batchDurationMillis | 1000 | 将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。 |
backoffSleepIncrement | 1000 | 当 Kafka 主题显示为空时触发的初始和增量等待时间。等待时间将减少对空 Kafka 主题的激进 ping 操作。一秒钟是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。 |
maxBackoffSleep | 5000 | Kafka 主题显示为空时触发的最长等待时间。 5 秒是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。 |
useFlumeEventFormat | false | 默认情况下,事件被视为从 Kafka 主题直接进入事件正文的字节。设置为 true 以读取事件作为 Flume Avro 二进制格式。与 KafkaSink 上的相同属性或 Kafka Channel 上的 parseAsFlumeEvent 属性一起使用时,这将保留在生成端发送的任何 Flume 标头。 |
setTopicHeader | true | 设置为 true 时,将检索到的消息的主题存储到由 topicHeader 属性定义的 Headers 中。 |
topicHeader | topic | 如果 setTopicHeader 属性设置为 true ,则定义用于存储接收消息主题名称的 Headers 的名称。如果与 Kafka 结合,应该小心接收 topicHeader 属性,以避免在循环中将消息发送回同一主题。 |
kafka.consumer.security.protocol | PLAINTEXT | 如果使用某种程度的安全性写入 Kafka,则设置为 SASL_PLAINTEXT,SASL_SSL 或 SSL。有关安全设置的其他信息,请参见下文。 |
更多消费者安全道具 | 如果使用 SASL_PLAINTEXT,SASL_SSL 或 SSL 请参阅 Kafka security 以获取需要在消费者上设置的其他属性。 | |
其他 Kafka Consumer Properties | - | 这些属性用于配置 Kafka Consumer。可以使用 Kafka 支持的任何消费者 property 。唯一的要求是使用前缀 kafka.consumer 添加属性名称。例如: kafka.consumer.auto.offset.reset |
注意
Kafka Source 会覆盖两个 Kafka 使用者参数:source.com 将 auto.commit.enable 设置为“false”,并提交每个批处理。 Kafka 源至少保证一次消息检索策略。源启动时可以存在重复项。 Kafka Source 还提供了 key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和 value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值。不建议修改这些参数。
不推荐使用的属性
属性名称 | 默认 | 说明 |
---|---|---|
topic | - | 使用 kafka.topics |
groupId | flume | 使用 kafka.consumer.group.id |
zookeeperConnect | - | 自 0.9.x 起不再受 kafka 消费者客户端的支持。使用 kafka.bootstrap.servers 与 kafka 集群 Build 连接 |
migrateZookeeperOffsets | true | 当找不到 Kafka 存储的偏移量时,在 Zookeeper 中查找偏移量并将它们提交给 Kafka。这应该是支持从旧版本的 Flume 无缝 Kafka 客户端迁移。迁移后,可以将其设置为 false,但通常不需要这样做。如果未找到 Zookeeper 偏移量,则 Kafka 配置 kafka.consumer.auto.offset.reset 定义如何处理偏移量。检查 Kafka documentation 了解详情 |
通过逗号分隔的主题列表进行主题订阅的示例。
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
正则表达式主题订阅的示例
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used
Security and Kafka Source:
Flume 和 Kafka 之间的通信渠道支持安全认证和数据加密。对于安全身份验证,可以使用 Kafka 0.9.0 版中的 SASL / GSSAPI(Kerberos V5)或 SSL(即使该参数名为 SSL,实际协议是 TLS 实现)。
截至目前,数据加密仅由 SSL / TLS 提供。
设置 kafka.consumer.security.protocol
以下任何值均表示:
SASL_PLAINTEXT - 没有数据加密的 Kerberos 或纯文本身份验证
SASL_SSL - 使用数据加密的 Kerberos 或纯文本身份验证
SSL - 具有可选身份验证的基于 TLS 的加密。
警告
启用 SSL 时性能会下降,其大小取决于 CPU 类型和 JVM 实现。参考: Kafka security overview 以及用于跟踪此问题的 jira: KAFKA-2561
TLS and Kafka Source:
请阅读 Configuring Kafka Clients SSL 中描述的步骤,以了解有关微调的其他配置设置,例如以下任何一项:安全提供程序,密码套件,已启用协议,信任库或密钥库类型。
配置服务器端身份验证和数据加密的示例。
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
在此处指定信任库是可选的,可以使用全局信任库。有关全局 SSL 设置的更多详细信息,请参阅 SSL/TLS support 部分。
注意:默认情况下属性 ssl.endpoint.identification.algorithm
未定义,因此不执行主机名验证。要启用主机名验证,请设置以下属性
a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS
启用后,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):
如果还需要客户端身份验证,则还需要将以下内容添加到 Flume 代理配置中,或者可以使用全局 SSL 设置(请参阅 SSL/TLS support 部分)。每个 Flume 代理都必须拥有其客户证书,Kafka 经纪人必须单独或通过其签名链来信任。常见示例是由单个根 CA 签署每个客户端证书,而后者又由 Kafka 经纪人信任。
# optional, the global keystore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>
如果密钥库和密钥使用不同的密码保护,那么 ssl.key.password
property 将为两个消费者密钥库提供所需的额外秘密:
a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>
Kerberos and Kafka Source:
要将 Kafka 源与使用 Kerberos 保护的 Kafka 集群 一起使用,请设置 consumer.security.protocol
上面提到的消费者 properties 。与 Kafka 代理一起使用的 Kerberos 密钥表和主体在 JAAS 文件的“KafkaClient”部分中指定。 “客户”部分如果需要,描述 Zookeeper 连接。有关 JAAS 文件内容的信息,请参阅 Kafka doc 。可以通过 flume-env.sh 中的 JAVA_OPTS 指定此 JAAS 文件的位置以及可选的系统范围的 kerberos 配置:
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
使用 SASL_PLAINTEXT 的示例安全配置:
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
使用 SASL_SSL 的安全配置示例:
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
示例 JAAS 文件。有关其内容的参考,请参阅 SASL configuration 的 Kafka 文档中所需认证机制(GSSAPI / PLAIN)的客户端配置部分。由于 Kafka Source 也可能连接到 Zookeeper 以进行偏移迁移,因此“Client”部分也添加到此示例中。除非您需要偏移迁移,否则不需要这样做,或者您需要此部分用于其他安全组件。另外,请确保 Flume 进程的操作系统用户对 jaas 和 keytab 文件具有读权限。
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
NetCat TCP Source
类似于 netcat 的源,它侦听给定端口并将每行文本转换为事件。像 nc -k -l [host] [port]
这样的行为
。换句话说,它打开一个指定的端口并侦听数据。期望是提供的数据是换行符分隔的文本。每行文本都转换为 Flume 事件,并通过连接的通道发送。
必需属性位于 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channels | - | ||
type | - | 组件类型名称,需要 netcat | |
bind | - | 要绑定到 | 的主机名或 IP 地址 |
port | - | 要绑定到 | 的端口号 |
max-line-length | 512 | 每个事件正文的最大行长度(以字节为单位) | |
ack-every-event | true | 对收到的每个事件回复“OK” | |
selector.type | 复制 | 复制或多路复用 | |
selector。* | 取决于 selector.type 值 | ||
interceptors | - | 以空格分隔的拦截器列表 | |
拦截器。* |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
NetCat UDP 源
根据原始的 Netcat(TCP)源,该源侦听给定端口并将每行文本转换为事件并通过连接的通道发送。像 nc -u -k -l [host] [port]
这样的行为
。
必需属性位于 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channels | - | ||
type | - | 组件类型名称,需要 netcatudp | |
bind | - | 要绑定到 | 的主机名或 IP 地址 |
port | - | 要绑定到 | 的端口号 |
remoteAddressHeader | - | ||
selector.type | 复制 | 复制或多路复用 | |
selector。* | 取决于 selector.type 值 | ||
interceptors | - | 以空格分隔的拦截器列表 | |
拦截器。* |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
序列发生器源
一个简单的序列生成器,它使用从 0 开始的计数器连续生成事件,递增 1 并在 totalEvents 处停止。无法向 Channels 发送事件时重试。主要用于测试。在重试期间,它使重试消息的主体保持与以前相同,以便在目的地重复数据删除之后,唯一事件的数量预计等于指定的 totalEvents
。必需属性位于 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
channels | - | |
type | - | 组件类型名称,需要 seq |
selector.type | 复制或多路复用 | |
selector。* | 复制 | 取决于 selector.type 值 |
interceptors | - | 以空格分隔的拦截器列表 |
拦截器。* | ||
batchSize | 1 | 每个请求循环尝试处理的事件数。 |
totalEvents | Long.MAX_VALUE | 源发送的唯一事件数。 |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1
Syslog 来源
读取 syslog 数据并生成 Flume 事件。 UDP 源将整个消息视为单个事件。 TCP 源为每个由换行符('n')分隔的字符串创建一个新事件。
必需属性位于 bold 中。
Syslog TCP Source
原始的,经过验证的 syslog TCP 源代码。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channels | - | ||
type | - | 组件类型名称,需要 syslogtcp | |
host | - | 要绑定到 | 的主机名或 IP 地址 |
port | - | 要绑定到 | 的端口号 |
eventSize | 2500 | 单个事件行的最大大小,以字节为单位 | |
keepFields | none | 将此设置为“all”将保留事件正文中的 Priority,Timestamp 和 Hostname。还允许包含间隔开的字段列表。目前,可以包括以下字段:优先级,版本,时间戳,主机名。值'true'和'false'已被弃用,有利于'all'和'none'。 | |
clientIPHeader | - | 如果指定,客户端的 IP 地址将使用此处指定的标头名称存储在每个事件的标头中。这允许拦截器和通道选择器基于客户端的 IP 地址定制路由逻辑。不要在此处使用标准 Syslog 标头名称(如_host_),因为在这种情况下将覆盖事件标头。 | |
clientHostnameHeader | - | 如果指定,客户端的主机名将使用此处指定的标头名称存储在每个事件的标头中。这允许拦截器和通道选择器基于客户端的主机名自定义路由逻辑。检索主机名可能涉及名称服务反向查找,这可能会影响性能。不要在此处使用标准 Syslog 标头名称(如_host_),因为在这种情况下将覆盖事件标头。 | |
selector.type | 复制或多路复用 | ||
selector。* | 复制 | 取决于 selector.type 值 | |
interceptors | - | 以空格分隔的拦截器列表 | |
拦截器。* | |||
ssl | false | 将此项设置为 true 以启用 SSL 加密。如果启用了 SSL,则还必须通过组件级参数(请参阅下文)或全局 SSL 参数(请参阅 SSL/TLS support 部分)指定“密钥库”和“密钥库密码”。 | |
keystore | - | 这是 Java 密钥库文件的路径。如果未在此处指定,则将使用全局密钥库(如果已定义,则配置错误)。 | |
keystore-password | - | Java 密钥库的密码。如果未在此处指定,则将使用全局密钥库密码(如果已定义,则配置错误)。 | |
keystore-type | JKS | Java 密钥库的类型。这可以是“JKS”或“PKCS12”。如果未在此处指定,则将使用全局密钥库类型(如果已定义,则默认为 JKS)。 | |
exclude-protocols | SSLv3 | 要排除的以空格分隔的 SSL / TLS 协议列表。除指定的协议外,将始终排除 SSLv3。 | |
include-protocols | - | 要包含的以空格分隔的 SSL / TLS 协议列表。启用的协议将是包含的协议,没有排除的协议。如果包含协议为空,则它包括每个支持的协议。 | |
exclude-cipher-suites | - | 要排除的以空格分隔的密码套件列表。 | |
include-cipher-suites | - | 要包含的以空格分隔的密码套件列表。启用的密码套件将是包含的密码套件,不包括排除的密码套件。如果 included-cipher-suites 为空,则包含每个支持的密码套件。 |
例如,名为 a1 的代理程序的 syslog TCP 源:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
Multiport Syslog TCP 源
这是 Syslog TCP 源的更新,更快,多端口版本。请注意 ports
配置设置已替换 port
。多端口功能意味着它可以以有效的方式一次监听多个端口。此源使用 Apache Mina 库来执行此操作。提供对 RFC-3164 和许多常见 RFC-5424 格式消息的支持。还提供了配置基于每个端口的字符集的功能。
属性名称 | 默认 | 说明 |
---|---|---|
channels | - | |
type | - | 组件类型名称,需要 multiport_syslogtcp |
host | - | 要绑定的主机名或 IP 地址。 |
ports | - | 要绑定到的空格分隔列表(一个或多个)。 |
eventSize | 2500 | 单个事件行的最大大小(以字节为单位)。 |
keepFields | none | 将此设置为“all”将保留事件正文中的 Priority,Timestamp 和 Hostname。还允许包含间隔开的字段列表。目前,可以包括以下字段:优先级,版本,时间戳,主机名。 Value '真''false'被弃用,赞成'all'和'none'。 |
portHeader | - | 如果指定,端口号将使用此处指定的 Headers 名称存储在每个事件的 Headers 中。这允许拦截器和通道选择器基于传入端口定制路由逻辑。 |
clientIPHeader | - | 如果指定,客户端的 IP 地址将使用此处指定的标头名称存储在每个事件的标头中。这允许拦截器和通道选择器基于客户端的 IP 地址定制路由逻辑。不要在此处使用标准 Syslog 标头名称(如_host_),因为在这种情况下将覆盖事件标头。 |
clientHostnameHeader | - | 如果指定,客户端的主机名将使用此处指定的标头名称存储在每个事件的标头中。这允许拦截器和通道选择器基于客户端的主机名自定义路由逻辑。检索主机名可能涉及名称服务反向查找,这可能会影响性能。不要在此处使用标准 Syslog 标头名称(如_host_),因为在这种情况下将覆盖事件标头。 |
charset.default | UTF-8 | 将 syslog 事件解析为字符串时使用的默认字符集。 |
charset.port。<port> | - | 字符集可基于每个端口进行配置。 |
batchSize | 100 | 每个请求循环尝试处理的最大事件数。使用默认值通常很好。 |
readBufferSize | 1024 | 内部 Mina 读缓冲区的大小。提供性能调整。使用默认值通常很好。 |
numProcessors | (自动检测) | 系统上可用于处理消息时使用的处理器数。默认是使用 Java Runtime API 自动检测 CPU 数量。 Mina 将为每个检测到的 CPU 生成 2 个请求处理线程,这通常是合理的。 |
selector.type | 复制 | 复制,多路复用或自定义 |
selector。* | - | 取决于 selector.type 值 |
interceptors | - | 以空格分隔的拦截器列表。 |
拦截器。* | ||
ssl | false | 将此项设置为 true 以启用 SSL 加密。如果启用了 SSL,则还必须通过组件级参数(请参阅下文)或全局 SSL 参数(请参阅 SSL/TLS support 部分)指定“密钥库”和“密钥库密码”。 |
keystore | - | 这是 Java 密钥库文件的路径。如果未在此处指定,则将使用全局密钥库(如果已定义,则配置错误)。 |
keystore-password | - | Java 密钥库的密码。如果未在此处指定,则将使用全局密钥库密码(如果已定义,则配置错误)。 |
keystore-type | JKS | Java 密钥库的类型。这可以是“JKS”或“PKCS12”。如果未在此处指定,则将使用全局密钥库类型(如果已定义,则默认为 JKS)。 |
exclude-protocols | SSLv3 | 要排除的以空格分隔的 SSL / TLS 协议列表。除指定的协议外,将始终排除 SSLv3。 |
include-protocols | - | 要包含的以空格分隔的 SSL / TLS 协议列表。启用的协议将是包含的协议,没有排除的协议。如果包含协议为空,则它包括每个支持的协议。 |
exclude-cipher-suites | - | 要排除的以空格分隔的密码套件列表。 |
include-cipher-suites | - | 要包含的以空格分隔的密码套件列表。启用的密码套件将是包含的密码套件,不包括排除的密码套件。如果 included-cipher-suites 为空,则包含每个支持的密码套件。 |
例如,名为 a1 的代理程序的多端口 syslog TCP 源:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port
Syslog UDP 源
属性名称 | 默认 | 说明 | |
---|---|---|---|
channels | - | ||
type | - | 组件类型名称,需要 syslogudp | |
host | - | 要绑定到 | 的主机名或 IP 地址 |
port | - | 要绑定到 | 的端口号 |
keepFields | false | 将此设置为 true 将保留事件正文中的 Priority,Timestamp 和 Hostname。 | |
clientIPHeader | - | 如果指定,客户端的 IP 地址将使用此处指定的标头名称存储在每个事件的标头中。这允许拦截器和通道选择器基于客户端的 IP 地址定制路由逻辑。不要使用标准的 Syslog 此处的标头名称(如_host_),因为在这种情况下将覆盖事件标头。 | |
clientHostnameHeader | - | 如果指定,客户端的主机名将使用此处指定的标头名称存储在每个事件的标头中。这允许拦截器和通道选择器基于客户端的主机名自定义路由逻辑。检索主机名可能涉及名称服务反向查找,这可能会影响性能。不要在此处使用标准 Syslog 标头名称(如_host_),因为在这种情况下将覆盖事件标头。 | |
selector.type | 复制或多路复用 | ||
selector。* | 复制 | 取决于 selector.type 值 | |
interceptors | - | 以空格分隔的拦截器列表 | |
拦截器。* |
例如,名为 a1 的代理程序的 syslog UDP 源:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
HTTP 源
通过 HTTP POST 和 GET 接受 Flume 事件的源。 GET 应仅用于实验。 HTTP 请求由可插入的“处理程序”转换为水槽事件,该处理程序必须实现 HTTPSourceHandler 接口。该处理程序获取 HttpServletRequest 并返回水槽事件列表。从一个 Http 请求处理的所有事件都在一个事务中提交给通道,从而允许在诸如文件通道之类的通道上提高效率。如果处理程序抛出异常,则此源将返回 HTTP 状态 400.如果通道已满,或者源无法将事件附加到通道,则源将返回 HTTP 503 - 暂时不可用状态。
在一个发布请求中发送的所有事件都被视为一个批处理,并在一个事务中插入到通道中。
此源基于 Jetty 9.4,并提供了设置其他 Jetty 特定参数的功能,这些参数将直接传递给 Jetty 组件。
属性名称 | 默认 | 说明 | |
---|---|---|---|
type | 组件类型名称,需要 http | ||
port | - | 源应绑定到的端口。 | |
bind | 0.0.0.0 | 要监听的主机名或 IP 地址 | |
handler | org.apache.flume.source.http.JSONHandler | 处理程序类的 FQCN。 | |
handler。* | - | 配置处理程序的参数 | |
selector.type | 复制 | 复制或多路复用 | |
selector。* | 取决于 selector.type 值 | ||
interceptors | - | 以空格分隔的拦截器列表 | |
拦截器。* | |||
ssl | false | 将属性设置为 true,以启用 SSL。 HTTP Source 不支持 SSLv3。 | |
exclude-protocols | SSLv3 | 要排除的以空格分隔的 SSL / TLS 协议列表。除指定的协议外,将始终排除 SSLv3。 | |
include-protocols | - | 要包含的以空格分隔的 SSL / TLS 协议列表。启用的协议将是包含的协议,没有排除的协议。如果包含协议为空,则它包括每个支持的协议。 | |
exclude-cipher-suites | - | 要排除的以空格分隔的密码套件列表。 | |
include-cipher-suites | - | 要包含的以空格分隔的密码套件列表。启用的密码套件将是包含的密码套件,不包括排除的密码套件。 | |
keystore | 密钥库的位置,包括密钥库文件名。如果启用了 SSL 但未在此处指定密钥库,则将使用全局密钥库(如果已定义,则配置错误)。 | ||
keystore-password | 密钥库密码。如果启用了 SSL 但未在此处指定密钥库密码,则将使用全局密钥库密码(如果已定义,则配置错误)。 | ||
keystore-type | JKS | 密钥库类型。这可以是“JKS”或“PKCS12”。 | |
QueuedThreadPool。* | 要在 org.eclipse.jetty.util.thread.QueuedThreadPool 上设置的 Jetty 特定设置。注:只有在设置了此类的至少一个属性时,才会使用 QueuedThreadPool。 | ||
HttpConfiguration。* | 要在 org.eclipse.jetty.server.HttpConfiguration | 上设置的 Jetty 特定设置 | |
SslContextFactory。* | 要在 org.eclipse.jetty.util.ssl.SslContextFactory 上设置的 Jetty 特定设置(仅在 ssl 设置为 true 时适用)。 | ||
ServerConnector。* | 要在 org.eclipse.jetty.server.ServerConnector | 上设置的 Jetty 特定设置 |
不推荐使用的属性
属性名称 | 默认 | 说明 |
---|---|---|
keystorePassword | - | 使用密钥库密码。不推荐的值将被新的值覆盖。 |
excludeProtocols | SSLv3 | 使用 exclude-protocols。不推荐的值将被新的值覆盖。 |
enableSSL | false | 使用 ssl。不推荐的值将被新的值覆盖。 |
注:使用上面列出的对象上的 setter-methods 设置 Jetty 特定设置。有关完整详细信息,请参阅这些类的 Javadoc( QueuedThreadPool , HttpConfiguration , SslContextFactory 和 ServerConnector )。
使用特定于 Jetty 的设置时,上面命名的属性将优先(例如,excludeProtocols 将优先于 SslContextFactory.ExcludeProtocols)。所有房产都是小写的。
名为 a1 的代理的示例 http 源:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props
a1.sources.r1.HttpConfiguration.sendServerVersion = false
a1.sources.r1.ServerConnector.idleTimeout = 300
JSONHandler
开箱即用的处理程序可以处理以 JSON 格式表示的事件,并支持 UTF-8,UTF-16 和 UTF-32 字符集。处理程序接受一个事件数组(即使只有一个事件,事件必须在数组中发送),并根据请求中指定的编码将它们转换为 Flume 事件。如果未指定编码,则假定为 UTF-8。 JSON 处理程序支持 UTF-8,UTF-16 和 UTF-32。事件表示如下。
[{
"headers" : {
"timestamp" : "434324343",
"host" : "random_host.example.com"
},
"body" : "random_body"
},
{
"headers" : {
"namenode" : "namenode.example.com",
"datanode" : "random_datanode.example.com"
},
"body" : "really_random_body"
}]
要设置 charset,请求必须将内容类型指定为 application/json; charset=UTF-8
(根据需要用 UTF-16 或 UTF-32 替换 UTF-8)。
以此处理程序所期望的格式创建事件的一种方法是使用 Flume SDK 中提供的 JSONEvent 并使用 Google Gson 使用 Gson#fromJson(Object,Type)方法创建 JSON 字符串。要作为事件列表的此方法的第二个参数传递的类型标记可以通过以下方式创建:
Type type = new TypeToken<List<JSONEvent>>() {}.getType();
BlobHandler
默认情况下,HTTPSource 将 JSON 输入拆分为 Flume 事件。作为替代方案,BlobHandler 是 HTTPSource 的处理程序,它返回包含请求参数的事件以及使用此请求上载的二进制大对象(BLOB)。例如 PDF 或 JPG 文件。请注意,此方法不适用于非常大的对象,因为它会将整个 BLOB 缓存在 RAM 中。
属性名称 | 默认 | 说明 |
---|---|---|
handler | - | 本课程的 FQCN: org.apache.flume.sink.solr.morphline.BlobHandler |
handler.maxBlobLength | 100000000 | 给定请求的最大字节数和缓冲区 |
压力源
StressSource 是一个内部负载生成源实现,对压力测试非常有用。它允许用户使用空标头配置事件有效负载的大小。用户可以配置要发送的事件总数以及要传递的最大成功事件数。
必需属性在 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称,需要 org.apache.flume.source.StressSource |
size | 500 | 每个事件的有效载荷大小。单位: byte |
maxTotalEvents | -1 | 要发送的最大事件数 |
maxSuccessfulEvents | -1 | 成功发送的最大事件数 |
batchSize | 1 | 一批中要发送的事件数 |
maxEventsPerSecond | 0 | 设置为大于零的整数时,在源上强制执行速率限制。 |
代理名为 a1 的示例:
a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1
遗产来源
遗留源允许 Flume 1.x 代理从 Flume 0.9.4 代理接收事件。它接受 Flume 0.9.4 格式的事件,将它们转换为 Flume 1.0 格式,并将它们存储在连接的通道中。 0.9.4 事件属性(如 timestamp,pri,host,nanos 等)将转换为 1.x 事件头属性。旧版源支持 Avro 和 Thrift RPC 连接。要在两个 Flume 版本之间使用此桥接,您需要使用 avroLegacy 或 thriftLegacy 源启动 Flume 1.x 代理。 0.9.4 代理应该让代理 Sink 指向 1.x 代理的主机/端口。
注意
Flume 1.x 的可靠性语义与 Flume 0.9.x 的可靠性语义不同。旧版源不支持 Flume 0.9.x 代理的 E2E 或 DFO 模式。唯一支持的 0.9.x 模式是尽力而为,尽管 1.x 流的可靠性设置将适用于传统源保存到 Flume 1.x 通道后的事件。
必需属性在 bold 中。
Avro 遗产来源
属性名称 | 默认 | 说明 | |
---|---|---|---|
channels | - | ||
type | - | 组件类型名称,需要 org.apache.flume.source.avroLegacy.AvroLegacySource | |
host | - | 要绑定到 | 的主机名或 IP 地址 |
port | - | 端口#收听 | |
selector.type | 复制或多路复用 | ||
selector。* | 复制 | 取决于 selector.type 值 | |
interceptors | - | 以空格分隔的拦截器列表 | |
拦截器。* |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
Thrift Legacy Source
属性名称 | 默认 | 说明 | |
---|---|---|---|
channels | - | ||
type | - | 组件类型名称,需要 org.apache.flume.source.thriftLegacy.ThriftLegacySource | |
host | - | 要绑定到 | 的主机名或 IP 地址 |
port | - | 要收听 | 的端口号 |
selector.type | 复制或多路复用 | ||
selector。* | 复制 | 取决于 selector.type 值 | |
interceptors | - | 以空格分隔的拦截器列表 | |
拦截器。* |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
自定义来源
自定义源是您自己的 Source 接口实现。启动 Flume 代理时,自定义源的类及其依赖项必须包含在代理程序的类路径中。自定义源的类型是其 FQCN。
属性名称 | 默认 | 说明 |
---|---|---|
channels | - | |
type | - | 组件类型名称,需要是您的 FQCN |
selector.type | replicating 或 multiplexing | |
selector。* | 复制 | 取决于 selector.type 值 |
interceptors | - | 以空格分隔的拦截器列表 |
拦截器。* |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1
Scribe 来源
Scribe 是另一种摄取系统。要采用现有的 Scribe 摄取系统,Flume 应该使用基于 Thrift 的 ScribeSource 和兼容的传输协议。要部署 Scribe,请遵循 Facebook 的指南。必需属性在 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
type | - | 组件类型名称,需要 org.apache.flume.source.scribe.ScribeSource | |
port | 1499 | 应该连接 Scribe 的端口 | |
maxReadBufferBytes | 16384000 | Thrift 默认 FrameBuffer 尺寸 | |
workerThreads | 5 | 在 Thrift | 中处理线程编号 |
selector.type | |||
选择器。* |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1
Flume Sinks
HDFS 接收器
此接收器将事件写入 Hadoop 分布式文件系统(HDFS)。它目前支持创建文本和序列文件。它支持两种文件类型的压缩。可以根据经过的时间或数据大小或事件数量定期滚动文件(关闭当前文件并创建新文件)。它还根据事件源自的时间戳或机器等属性对数据进行分区/分区。 HDFS 目录路径可能包含格式转义序列,将由 HDFS 接收器替换,以生成用于存储事件的目录/文件名。使用此接收器需要安装 hadoop,以便 Flume 可以使用 Hadoop jar 与 HDFS 集群进行通信。请注意,需要支持 sync()调用的 Hadoop 版本。
以下是支持的转义序列:
别名 | 描述 |
---|---|
% | 名为“host”的事件 Headers 的替换值。支持任意 Headers 名称。 |
%t | Unix 时间,以毫秒为单位 |
% | locale 的工作日短名称(周一,周二,...) |
%A | locale 的完整工作日名称(星期一,星期二,...) |
%b | locale 的短月名(Jan,Feb,...) |
%B | locale 的长月名称(1 月,2 月,...) |
%c | locale 的日期和时间(2005 年 3 月 3 日星期三 23:05:25) |
%d | 月中的某一天(01) |
%e | 没有填充的月份日期(1) |
%D | date;相同%m /%d /%y |
%H | 小时(00..23) |
%我 | 小时(01..12) |
%j | 一年中的一天(001..366) |
%k | 小时(0..23) |
%m | 月(01..12) |
%n | 月没有填充(1..12) |
%M | 分钟(00..59) |
%p | locale 相当于 am 或 pm |
%s | 秒自 1970-01-01 00:00:00 UTC |
%S | 秒(00..59) |
%y | 年份的最后两位数字(00..99) |
%Y | 年(2010 年) |
%z | hhmm 数字时区(例如,-0400) |
%[localhost] | 替换正在运行代理程序的主机的主机名 |
%[IP] | 替换正在运行代理的主机的 IP 地址 |
%[FQDN] | 替换正在运行代理程序的主机的规范主机名 |
注意:转义字符串%[localhost],%[IP]和%[FQDN]都依赖于 Java 获取主机名的能力,这在某些网络环境中可能会失败。
正在使用的文件将在名称末尾包含“.tmp”。文件关闭后,将删除此扩展程序。这允许排除目录中的部分完整文件。必需属性在 bold 中。
注意
对于所有与时间相关的转义序列,事件 Headers 中必须存在带有“timestamp”键的 Headers (除非 hdfs.useLocalTimeStamp
设置为 true
)。自动添加此方法的一种方法是使用 TimestampInterceptor。
名称 | 默认 | 说明 |
---|---|---|
channel | - | |
type | - | 组件类型名称,需要 hdfs |
hdfs.path | - | HDFS 目录路径(例如 hdfs:// namenode / flume / webdata /) |
hdfs.filePrefix | FlumeData | 名称前缀为 Flume 在 hdfs 目录中创建的文件 |
hdfs.fileSuffix | - | 附加到文件的后缀(例如 .avro - 注意:不会自动添加句号) |
hdfs.inUsePrefix | - | 用于临时文件的前缀,水槽主动写入 |
hdfs.inUseSuffix | .tmp | 用于临时文件的后缀,水槽主动写入 |
hdfs.emptyInUseSuffix | false | 如果 false 在写入输出时使用 hdfs.inUseSuffix 。关闭输出后,将从输出文件名中删除 hdfs.inUseSuffix 。如果忽略 true hdfs.inUseSuffix 参数,则使用空字符串。 |
hdfs.rollInterval | 30 | 滚动当前文件前等待的秒数(0 =永不基于时间间隔滚动) |
hdfs.rollSize | 1024 | 触发滚动的文件大小,以字节为单位(0:永不基于文件大小滚动) |
hdfs.rollCount | 10 | 在滚动之前写入文件的事件数(0 =永不根据事件数滚动) |
hdfs.idleTimeout | 0 | 超时后非活动文件关闭(0 =禁用自动关闭空闲文件) |
hdfs.batchSize | 100 | 在将文件刷新到 HDFS 之前写入文件的事件数 |
hdfs.codeC | - | 压缩编解码器。以下之一:gzip,bzip2,lzo,lzop,snappy |
hdfs.fileType | SequenceFile | 文件格式:当前 SequenceFile , DataStream 或 CompressedStream (1)DataStream 不会压缩输出文件,请不要设置 codeC(2)CompressedStream 需要设置 hdfs.codeC 和可用的 codeC |
hdfs.maxOpenFiles | 5000 | 仅允许此数量的打开文件。如果超过此数量,则关闭最旧的文件。 |
hdfs.minBlockReplicas | - | 指定每个 HDFS 块的最小副本数。如果未指定,则它来自类路径中的默认 Hadoop 配置。 |
hdfs.writeFormat | 可写 | 序列文件记录的格式。 Text 或 Writable 之一。在使用 Flume 创建数据文件之前设置为 Text ,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。 |
hdfs.threadsPoolSize | 10 | HDFS IO 操作的每个 HDFS 接收器的线程数(打开,写入等) |
hdfs.rollTimerPoolSize | 1 | 用于安排定时文件滚动的每个 HDFS 接收器的线程数 |
hdfs.kerberosPrincipal | - | 用于访问安全 HDFS 的 Kerberos 用户主体 |
hdfs.kerberosKeytab | - | 用于访问安全 HDFS 的 Kerberos 密钥表 |
hdfs.proxyUser | ||
hdfs.round | false | 应该向下舍入时间戳(如果为 true,则影响除%t 之外的所有基于时间的转义序列) |
hdfs.roundValue | 1 | 舍入到此最高倍数(在使用 hdfs.roundUnit 配置的单位中),小于当前时间。 |
hdfs.roundUnit | second | 舍入值的单位 - second , minute 或 hour 。 |
hdfs.timeZone | Local Time | 用于解析目录路径的时区名称,例如:美洲/洛杉矶。 |
hdfs.useLocalTimeStamp | false | 在替换转义序列时使用本地时间(而不是事件头中的时间戳)。 |
hdfs.closeTries | 0 | 接收器必须尝试的次数在启动近距离尝试后重命名文件。如果设置为 1,则此接收器将不会重新尝试失败的重命名(例如,由于 NameNode 或 DataNode 失败),并且可能使文件处于打开状态,扩展名为.tmp。如果设置为 0,接收器将尝试重命名该文件,直到最终重命名该文件(它将尝试的次数没有限制)。如果关闭调用失败但数据将保持不变,则文件可能仍保持打开状态,在这种情况下,只有在 Flume 重启后文件才会关闭。 |
hdfs.retryInterval | 180 | 连续尝试关闭文件之间的时间(以秒为单位)。每次关闭调用都会花费多次 RPC 往返 Namenode,因此将此设置得太低会导致名称节点上的大量负载。如果设置为 0 或更小,则如果第一次尝试失败,接收器将不会尝试关闭文件,并且可能使文件保持打开状态或扩展名为“.tmp”。 |
serializer | TEXT | 其他可能的选项包括 avro_event 或 EventSerializer.Builder 接口的实现的完全限定类名。 |
序列化程序。* |
不推荐使用的属性
名称默认描述====================== ============ ============= ================================================== ======= hdfs.callTimeout 30000 HDFS 操作允许的毫秒数,例如 open,write,flush,close。如果发生许多 HDFS 超时操作,则应增加此数量。 ====================== ============ ================ ================================================== ====
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
以上配置将时间戳向下舍入到最后 10 分钟。例如,时间戳为 2012 年 6 月 12 日上午 11:54:34 的事件将导致 hdfs 路径变为 /flume/events/2012-06-12/1150/00
。
Hive Sink
此接收器将包含分隔文本或 JSON 数据的事件直接流式传输到 Hive 表或分区。事件使用 Hive 事务编写。一旦将一组事件提交给 Hive,它们就会立即显示给 Hive 查询。水槽将流入的分区既可以预先创建,也可以选择 Flume 创建它们,如果它们缺失的话。传入事件数据中的字段将映射到 Hive 表中的相应列。
名称 | 默认 | 说明 |
---|---|---|
channel | - | |
type | - | 组件类型名称,需要 hive |
hive.metastore | - | Hive Metastore URI(例如 thrift://a.b.com:9083) |
hive.database | - | Hive 数据库名称 |
hive.table | - | 蜂巢表名称 |
hive.partition | - | 逗号分隔标识要写入的分区的分区值列表。可能包含转义序列。例如:如果表格被分区(大陆:字符串,国家:字符串,时间:字符串),那么'亚洲,印度,2014-02-26-01-21'将表示大陆=亚洲,国家=印度,时间= 2014 -02-26-01-21 |
hive.txnsPerBatchAsk | 100 | Hive 向 Flume 等流媒体客户端授予一批 Transaction 而非单笔 Transaction 。此设置配置每个事务批处理所需的事务数。来自单个批次中所有事务的数据最终都在一个文件中。 Flume 将在批处理中的每个事务中写入最多 batchSize 事件。此设置与 batchSize 一起提供对每个文件大小的控制。请注意,最终 Hive 会将这些文件透明地压缩为更大的文件。 |
heartBeatInterval | 240 | (以秒为单位)发送到 Hive 的连续心跳之间的间隔,以防止未使用的事务过期。将此值设置为 0 可禁用心跳。 |
autoCreatePartitions | true | Flume 将自动创建必要的 Hive 分区以流式传输到 |
batchSize | 15000 | 在单个 Hive 事务中写入 Hive 的最大事件数 |
maxOpenConnections | 500 | 仅允许此数量的打开连接。如果超过此数量,则关闭最近最少使用的连接。 |
callTimeout | 10000 | (以毫秒为单位)Hive 和 HDFS I / O 操作的超时,例如 openTxn,write,commit,abort。 |
serializer | Serializer 负责解析事件中的字段并将它们映射到 hive 表中的列。串行器的选择取决于事件中数据的格式。支持的序列化程序:DELIMITED 和 JSON | |
roundUnit | minute | 舍入值的单位 - second , minute 或 hour 。 |
roundValue | 1 | 舍入到此最高倍数(在使用 hive.roundUnit 配置的单位中),小于当前时间 |
timeZone | 当地时间 | 姓名应该用于解析分区中的转义序列的时区,例如,美洲/洛杉矶。 |
useLocalTimeStamp | false | 在替换转义序列时使用本地时间(而不是事件头中的时间戳)。 |
为 Hive sink 提供了以下序列化程序:
JSON :处理 UTF8 编码的 Json(严格语法)事件,不需要配置。 JSON 中的对象名称直接映射到 Hive 表中具有相同名称的列。内部使用 org.apache.hive.hcatalog.data.JsonSerDe,但独立于 Hive 表的 Serde。此序列化程序需要安装 HCatalog。
DELIMITED :处理简单的分隔文本事件。内部使用 LazySimpleSerde,但独立于 Hive 表的 Serde。
名称 | 默认 | 说明 |
---|---|---|
serializer.delimiter | , | (类型:字符串)传入数据中的字段分隔符。要使用特殊字符,请用双引号括起来,例如“\ t” |
serializer.fieldnames | - | 从输入字段到配置单元表中的列的映射。指定为 hive 表列名称的逗号分隔列表(无空格),按发生顺序标识输入字段。要跳过字段,请保留未指定的列名称。例如。 'time ,, ip,message'表示输入映射到 hive 表中的 time,ip 和 message 列的第 1,第 3 和第 4 个字段。 |
serializer.serdeSeparator | Ctrl-A | (类型:字符)自定义基础 serde 使用的分隔符。如果 serializer.fieldnames 中的字段与表列的顺序相同,则 serializer.delimiter 与 serializer.serdeSeparator 相同,并且 serializer.fieldnames 中的字段数小于或等于表的数量,可以提高效率列,因为传入事件正文中的字段不需要重新排序以匹配表列的顺序。对于'\ t'这样的特殊字符使用单引号。确保输入字段不包含此字符。注意:如果 serializer.delimiter 是单个字符,最好将其设置为相同的字符 |
以下是支持的转义序列:
别名 | 描述 |
---|---|
% | 名为“host”的事件 Headers 的替换值。支持任意 Headers 名称。 |
%t | Unix 时间,以毫秒为单位 |
% | locale 的工作日短名称(周一,周二,...) |
%A | locale 的完整工作日名称(星期一,星期二,...) |
%b | locale 的短月名(Jan,Feb,...) |
%B | locale 的长月名称(1 月,2 月,...) |
%c | locale 的日期和时间(2005 年 3 月 3 日星期三 23:05:25) |
%d | 月中的某一天(01) |
%D | date;相同%m /%d /%y |
%H | 小时(00..23) |
%我 | 小时(01..12) |
%j | 一年中的一天(001..366) |
%k | 小时(0..23) |
%m | 月(01..12) |
%M | 分钟(00..59) |
%p | locale 相当于 am 或 pm |
%s | 秒自 1970-01-01 00:00:00 UTC |
%S | 秒(00..59) |
%y | 年份的最后两位数字(00..99) |
%Y | 年(2010 年) |
%z | hhmm 数字时区(例如,-0400) |
注意
对于所有与时间相关的转义序列,事件 Headers 中必须存在带有“timestamp”键的 Headers (除非 useLocalTimeStamp
设置为 true
)。自动添加此方法的一种方法是使用 TimestampInterceptor。
示例 Hive 表:
create table weblogs ( id int , msg string )
partitioned by (continent string, country string, time string)
clustered by (id) into 5 buckets
stored as orc;
代理名为 a1 的示例:
a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg
以上配置将时间戳向下舍入到最后 10 分钟。例如,将时间戳标头设置为 2012 年 6 月 12 日上午 11:54:34 且“country”标头设置为“india”的事件将评估为分区(continent ='asia',country ='india',time ='2012-06-12-11-50'。序列化程序配置为接受包含三个字段的制表符分隔输入并跳过第二个字段。
Logger 接收器
在 INFO 级别记录事件。通常用于测试/调试目的。必需属性在 bold 中。此接收器是唯一的例外,它不需要 Logging raw data 部分中解释的额外配置。
属性名称 | 默认 | 说明 |
---|---|---|
channel | - | |
type | - | 组件类型名称,需要 logger |
maxBytesToLog | 16 | 要记录的事件主体的最大字节数 |
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
Avro Sink
这个水槽形成了 Flume 的分层收集支持的一半。发送到此接收器的 Flume 事件将被转换进入 Avro 事件并发送到配置的主机名/端口对。事件将从配置的通道中批量获取配置的批处理大小。必需属性在 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channel | - | ||
type | - | 组件类型名称,需要为 avro 。 | |
hostname | - | 要绑定的主机名或 IP 地址。 | |
port | - | 要收听的端口号。 | |
批量大小 | 100 | 要一起批量发送的事件数。 | |
connect-timeout | 20000 | 允许第一个(握手)请求的时间(ms)。 | |
request-timeout | 20000 | 允许第一个请求后的请求的时间(ms)。 | |
reset-connection-interval | none | 重置连接到下一跳之前的时间量。这将迫使 Avro Sink 重新连接到下一跳。这将允许接收器在添加新闻主机时连接到硬件负载均衡 器后面的主机,而无需重新启动代理。 | |
compression-type | none | 这可以是“none”或“deflate”。压缩类型必须与匹配 AvroSource | 的压缩类型匹配 |
compression-level | 6 | 压缩事件的压缩级别。 0 =无压缩,1-9 是压缩。数字越大,压缩越多 | |
ssl | false | 设置为 true 以启用此 AvroSink 的 SSL。配置 SSL 时,您可以选择设置“truststore”,“truststore-password”,“truststore-type”,并指定是否“trust-all-certs”。 | |
trust-all-certs | false | 如果设置为 true,则不会检查远程服务器(Avro 源)的 SSL 服务器证书。这不应该在 生产环境 中使用,因为它使攻击者更容易执行中间人攻击并“监听”加密连接。 | |
truststore | - | 自定义 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Avro Source 的 SSL 身份验证凭据。如果未指定,则将使用全局密钥库。如果未指定全局密钥库,则将使用缺省 Java JSSE 证书颁发机构文件(通常为 Oracle JRE 中的“jssecacerts”或“cacerts”)。 | |
truststore-password | - | 信任库的密码。如果未指定,则将使用全局密钥库密码(如果已定义)。 | |
truststore-type | JKS | Java 信任库的类型。这可以是“JKS”或其他受支持的 Java 信任库类型。如果未指定,则将使用全局密钥库类型(如果已定义,则 defautl 为 JKS)。 | |
exclude-protocols | SSLv3 | 要排除的以空格分隔的 SSL / TLS 协议列表。除指定的协议外,将始终排除 SSLv3。 | |
maxIoWorkers | 2 *机器中可用处理器的数量 | I / O 工作线程的最大数量。这是在 NettyAvroRpcClient NioClientSocketChannelFactory 上配置的。 |
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
Thrift Sink
这个水槽形成了 Flume 的分层收集支持的一半。发送到此接收器的 Flume 事件将转换为 Thrift 事件并发送到配置的主机名/端口对。事件将从配置的通道中批量获取配置的批处理大小。
可以通过启用 kerberos 身份验证将 Thrift sink 配置为以安全模式启动。要与以安全模式启动的 Thrift 源通信,Thrift 接收器也应该以安全模式运行。 client-principal 和 client-keytab 是 Thrift 接收器用于向 kerberos KDC 进行身份验证的属性。 server-principal 表示 Thrift 源的主体,此接收器配置为以安全模式连接。必需属性位于 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
channel | - | |
type | - | 组件类型名称,需要为 thrift 。 |
hostname | - | 要绑定的主机名或 IP 地址。 |
port | - | 要收听的端口号。 |
批量大小 | 100 | 要一起批量发送的事件数。 |
connect-timeout | 20000 | 允许第一个(握手)请求的时间(ms)。 |
request-timeout | 20000 | 允许第一个请求后的请求的时间(ms)。 |
connection-reset-interval | none | 连接前的时间量到下一跳重置。这将迫使 Thrift Sink 重新连接到下一跳。这将允许接收器在添加新闻主机时连接到硬件负载均衡 器后面的主机,而无需重新启动代理。 |
ssl | false | 设置为 true 以为此 ThriftSink 启用 SSL。配置 SSL 时,您可以选择设置“防锈 Store ”,“防毒 Store 密码”和“防火墙类型”--0017_ |
truststore | 自定义 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Thrift Source 的 SSL 身份验证凭据。如果未指定,则将使用全局密钥库。如果未指定全局密钥库,则将使用缺省 Java JSSE 证书颁发机构文件(通常是 Oracle JRE 中的“ssecacerts”或“acerts”)。 | |
truststore-password | 信任库的密码。如果未指定,则将使用全局密钥库密码(如果已定义)。 | |
truststore-type | JKS | Java 信任库的类型。这可以是“KS”或其他受支持的 Java 信任库类型。如果未指定,则将使用全局密钥库类型(如果已定义,则 defautl 为 JKS)。 |
exclude-protocols | SSLv3 | 要排除的空格分隔的 SSL / TLS 协议列表 |
kerberos | false | 设置为 true 以启用 kerberos 身份验证。在 kerberos 模式下,需要 client-principal,client-keytab 和 server-principal 才能成功进行身份验证并与启用 kerberos 的 Thrift Source 进行通信。 |
client-principal | 芒聙聪 - | Thrift Sink 使用的 kerberos 校长对 kerberos KDC 进行身份验证。 |
client-keytab | 芒聙聪 - | Thrift Sink 与客户端主体结合使用的 keytab 位置,用于对 kerberos KDC 进行身份验证。 |
server-principal | Thrift Sink 配置为连接到的 Thrift Source 的 kerberos 主体。 |
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
IRC 水槽
IRC 接收器从附加通道接收消息,并将这些消息中继到已配置的 IRC 目标。必需属性位于 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channel | 鈥 | ||
type | 组件类型名称,需要 irc | ||
hostname | 要连接到 | 的主机名或 IP 地址 | |
port | 6667 | 要连接的远程主机的端口号 | |
nick | 昵称 | ||
user | 用户名 | ||
密码 | 用户密码 | ||
chan | 鈥 | Channels | |
name | |||
splitlines | 鈥 | (布尔) | |
splitchars | n | 行分隔符(如果你要在配置文件中输入默认值,那么你需要转义反斜杠,如下所示:'Ànn¢ |
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume
文件滚动接收器
在本地文件系统上存储事件。必需属性位于 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
channel | ||
type | 组件类型名称,需要为 file_roll 。 | |
sink.directory | 将存储文件的目录 | |
sink.pathManager | DEFAULT | 要使用的 PathManager 实现。 |
sink.pathManager.extension | 如果使用默认的 PathManager,则为文件扩展名。 | |
sink.pathManager.prefix | 如果使用默认 PathManager,则添加到文件名开头的字符串 | |
sink.rollInterval | 30 | 每隔 30 秒滚动一次文件。指定 0 将禁用滚动并导致所有事件都写入单个文件。 |
sink.serializer | TEXT | 其他可能的选项包括 avro_event 或 EventSerializer.Builder 接口的 FQCN 实现。 |
sink.batchSize | 100 |
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
Null Sink
丢弃从 Channels 收到的所有事件。必需属性在 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
channel | 鈥 | |
type | 组件类型名称,需要为 null 。 | |
batchSize | 100 |
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1
HBaseSinks
HBaseSink
此接收器将数据写入 HBase。 Hbase 配置从类路径中遇到的第一个 hbase-site.xml 中获取。实现由配置指定的 HbaseEventSerializer 的类用于将事件转换为 HBase put 和/或增量。然后将这些放置和增量写入 HBase。该接收器提供与 HBase 相同的一致性保证,HBase 是当前行的原子性。如果 Hbase 无法写入某些事件,则接收器将重播该事务中的所有事件。
HBaseSink 支持将数据写入安全 HBase。要写入安全 HBase,代理程序正在运行的用户必须具有对接收器配置为写入的表的写入权限。可以在配置中指定用于对 KDC 进行身份验证的主体和密钥表。 Flume 代理程序的类路径中的 hbase-site.xml 必须将身份验证设置为 kerberos
(有关如何执行此操作的详细信息,请参阅 HBase 文档)。
为方便起见,Flume 提供了两个序列化器。 SimpleHbaseEventSerializer(org.apache.flume.sink.hbase.SimpleHbaseEventSerializer)按原样将事件主体写入 HBase,并可选择增加 Hbase 中的列。这主要是一个示例实现。 RegexHbaseEventSerializer(org.apache.flume.sink.hbase.RegexHbaseEventSerializer)根据给定的正则表达式打破事件体,并将每个部分写入不同的列。
类型是 FQCN:org.apache.flume.sink.hbase.HBaseSink。
必需属性位于 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channel | - | ||
type | - | 组件类型名称,需要 hbase | |
table | - | 要写入的 Hbase 中的表的名称。 | |
columnFamily | - | Hbase 中的列族写入。 | |
zookeeperQuorum | - | 法定人数规格。这是 hbase-site.xml | 中属性 hbase.zookeeper.quorum 的值 |
znodeParent | / hbase | -ROOT-区域的 znode 的基本路径。 zookeeper.znode.parent 的值在 hbase-site.xml 中 | |
batchSize | 100 | 每个 txn 要写入的事件数。 | |
coalesceIncrements | false | 接收器是否应将每个批次的多个增量合并为一个单元格。如果有限数量的单元格有多个增量,这可能会提供更好的性能。 | |
serializer | org.apache.flume.sink.hbase.SimpleHbaseEventSerializer | 默认增量列=“iCol”,有效负载列=“pCol”。 | |
serializer。* | - | 要传递给序列化程序的属性。 | |
kerberosPrincipal | - | 用于访问安全 HBase 的 Kerberos 用户主体 | |
kerberosKeytab | - | 用于访问安全 HBase 的 Kerberos 密钥表 |
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
HBase2Sink
HBase2Sink 相当于 HBase 版本 2 的 HBaseSink。提供的功能和配置参数与 HBaseSink 的情况相同(除了接收器类型中的 hbase2 标签和包/类名称)。
类型是 FQCN:org.apache.flume.sink.hbase2.HBase2Sink。
必需属性位于 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channel | - | ||
type | - | 组件类型名称,需要 hbase2 | |
table | - | 要写入的 HBase 中的表的名称。 | |
columnFamily | - | HBase 中要写入的列族。 | |
zookeeperQuorum | - | 法定人数规格。这是 hbase-site.xml | 中属性 hbase.zookeeper.quorum 的值 |
znodeParent | / hbase | -ROOT-区域的 znode 的基本路径。 zookeeper.znode.parent 的值在 hbase-site.xml 中 | |
batchSize | 100 | 每个 txn 要写入的事件数。 | |
coalesceIncrements | false | 接收器是否应将每个批次的多个增量合并为一个单元格。如果有限数量的单元格有多个增量,这可能会提供更好的性能。 | |
serializer | org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer | 默认增量列=“iCol”,有效负载列=“pCol”。 | |
serializer。* | - | 要传递给序列化程序的属性。 | |
kerberosPrincipal | - | 用于访问安全 HBase 的 Kerberos 用户主体 | |
kerberosKeytab | - | 用于访问安全 HBase 的 Kerberos 密钥表 |
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase2
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.channel = c1
AsyncHBaseSink
此接收器使用异步模型将数据写入 HBase。实现由配置指定的 AsyncHbaseEventSerializer 的类用于将事件转换为 HBase put 和/或增量。这些然后将 put 和 increment 添加到 HBase。此接收器使用 Asynchbase API 写入 HBase。该接收器提供与 HBase 相同的一致性保证,HBase 是当前行的原子性。如果 Hbase 无法写入某些事件,则接收器将重播该事务中的所有事件。 AsyncHBaseSink 只能与 HBase 1.x 一起使用。 AsyncHBaseSink 使用的异步客户端库不适用于 HBase 2.类型为 FQCN:org.apache.flume.sink.hbase.AsyncHBaseSink。必需属性位于 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channel | - | ||
type | - | 组件类型名称,需要 asynchbase | |
table | - | 要写入的 Hbase 中的表的名称。 | |
zookeeperQuorum | - | 法定人数规格。这是 hbase-site.xml | 中属性 hbase.zookeeper.quorum 的值 |
znodeParent | / hbase | -ROOT-区域的 znode 的基本路径。 zookeeper.znode.parent 的值在 hbase-site.xml 中 | |
columnFamily | - | Hbase 中的列族写入。 | |
batchSize | 100 | 每个 txn 要写入的事件数。 | |
coalesceIncrements | false | 接收器是否应将每个批次的多个增量合并为一个单元格。如果有限数量的单元格有多个增量,这可能会提供更好的性能。 | |
timeout | 60000 | 接收器等待事务中所有事件的来自 hbase 的 acks 的时间长度(以毫秒为单位)。 | |
serializer | org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer | ||
serializer。* | - | 要传递给序列化程序的属性。 | |
async。* | - | 要传递给 asyncHbase 库的属性。这些属性优先于旧的 zookeeperQuorum 和 znodeParent 值。您可以在 the documentation page of AsyncHBase 找到可用属性的列表。 |
请注意,此接收器在配置中获取 Zookeeper Quorum 和父 znode 信息。可以在 flume 配置文件中指定 Zookeeper Quorum 和父节点配置。或者,这些配置值取自类路径中的第一个 hbase-site.xml 文件。
如果配置中未提供这些,则接收器将从类路径中的第一个 hbase-site.xml 文件中读取此信息。
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1
MorphlineSolrSink
此接收器从 Flume 事件中提取数据,对其进行转换,并将其近乎实时地加载到 Apache Solr 服务器中,后者又向最终用户或搜索应用程序提供查询。
此接收器非常适合将原始数据流式传输到 HDFS(通过 HdfsSink)并同时提取,转换并将相同数据加载到 Solr(通过 MorphlineSolrSink)的用例。特别是,此接收器可以处理来自不同数据源的任意异构原始数据,并将其转换为对搜索应用程序有用的数据模型。
ETL 功能可使用 morphline configuration file 进行自定义,该函数定义了一系列转换命令,用于将事件记录从一个命令传递到另一个命令。
Morphlines 可以看作是 Unix 管道的演变,其中数据模型被推广为使用通用记录流,包括任意二进制有效载荷。 morphline 命令有点像 Flume 拦截器。 Morphlines 可以嵌入到 Flume 等 Hadoop 组件中。
用于解析和转换一组标准数据格式(如日志文件,Avro,CSV,文本,HTML,XML,PDF,Word,Excel 等)的命令是开箱即用的,还有其他自定义命令和解析器用于其他数据格式可以添加为 morphline 插件。可以索引任何类型的数据格式,并且可以生成任何类型的 Solr 模式的任何 Solr 文档,并且可以注册和执行任何自定义 ETL 逻辑。
Morphlines 操纵连续的记录流。数据模型可以描述如下:记录是一组命名字段,其中每个字段具有一个或多个值的有序列表。值可以是任何 Java 对象。也就是说,记录本质上是一个哈希表,其中每个哈希表条目包含一个 String 键和一个 Java 对象列表作为值。 (该实现使用了 Guava 的 ArrayListMultimap
,这是一个 ListMultimap
)。请注意,字段可以具有多个值,并且任何两个记录都不需要使用公共字段名称。
这个水槽将 Flume 事件的主体填满 _attachment_body
morphline 记录的字段,以及将 Flume 事件的 Headers 复制到同名的记录字段中。然后命令可以对此数据执行操作。
支持路由到 SolrCloud 集群以提高可伸缩性。索引负载可以分布在大量 MorphlineSolrSinks 上,以提高可伸缩性。可以跨多个 MorphlineSolrSinks 复制索引加载高可用性,例如使用 Flume 功能,例如负载均衡 接收器处理器。 MorphlineInterceptor 还可以帮助实现到多个 Solr 集合的动态路由(例如,用于多租户)。
必须将环境所需的 morphline 和 solr jar 放在 Apache Flume 安装的 lib 目录中。
类型是 FQCN:org.apache.flume.sink.solr.morphline.MorphlineSolrSink
必需属性在 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channel | - | ||
type | - | 组件类型名称,需要 org.apache.flume.sink.solr.morphline.MorphlineSolrSink | |
morphlineFile | - | 本地文件系统与 morphline 配置文件的相对或绝对路径。示例: /etc/flume-ng/conf/morphline.conf | |
morphlineId | null | 如果 morphline 配置文件中有多个形态线,则用于标识形态线的可选名称 | |
batchSize | 1000 | 每个水槽事务要采取的最大事件数。 | |
batchDurationMillis | 1000 | 每个水槽事务的最大持续时间(ms)。事务在此持续时间之后或超过 batchSize 时提交,以先到者为准。 | |
handlerClass | org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl | 实现 org.apache.flume.sink.solr.morphline.MorphlineHandler | 的类的 FQCN |
isProductionMode | false | 应为关键任务的大型在线 生产环境 系统启用此标志,这些系统需要在发生不可恢复的异常时无需停机即可取得进展。与未知 Solr 架构字段相关的损坏或格式错误的解析器输入数据,解析器错误和错误会产生不可恢复的异常。 | |
recoverableExceptionClasses | org.apache.solr.client.solrj.SolrServerException | 以逗号分隔的可恢复异常列表,这些异常往往是瞬态的,在这种情况下,可以重试相应的任务。示例包括网络连接错误,超时等。当 生产环境 模式标志设置为 true 时,使用此参数配置的可恢复异常将不会被忽略,因此将导致重试。 | |
isIgnoringRecoverableExceptions | false | 如果不可恢复的异常被意外错误分类为可恢复,则应启用此标志。这使得接收器能够取得进展并避免永远重试事件。 |
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# a1.sinks.k1.morphlineId = morphline1
# a1.sinks.k1.batchSize = 1000
# a1.sinks.k1.batchDurationMillis = 1000
ElasticSearchSink
此接收器将数据写入弹性搜索集群。默认情况下,将写入事件以便 Kibana 图形界面可以显示它们 - 就像 logstash 编写它们一样。
必须将环境所需的 elasticsearch 和 lucene-core jar 放在 Apache Flume 安装的 lib 目录中。 Elasticsearch 要求客户端 JAR 的主要版本与服务器的主要版本匹配,并且两者都运行相同的 JVM 次要版本。如果这不正确,将出现 SerializationExceptions。要选择所需的版本,请首先确定 elasticsearch 的版本以及目标 集群 正在运行的 JVM 版本。然后选择与主要版本匹配的 elasticsearch 客户端库。 0.19.x 客户端可以与 0.19.x 集群 通信; 0.20.x 可以与 0.20.x 对话,0.90.x 可以与 0.90.x 对话。确定 elasticsearch 版本后,读取 pom.xml 文件以确定要使用的正确 lucene-core JAR 版本。运行 ElasticSearchSink 的 Flume 代理程序也应该与目标集群运行的次要版本的 JVM 相匹配。
事件将每天写入新索引。名称将是<indexName> -yyyy-MM-dd,其中<indexName>是 indexName 参数。接收器将在午夜 UTC 开始写入新索引。
默认情况下,ElasticSearchLogStashEventSerializer 会为弹性搜索序列化事件。可以使用 serializer 参数覆盖此行为。此参数接受 org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer 或 org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory 的实现。不推荐使用 ElasticSearchEventSerializer 来支持更强大的 ElasticSearchIndexRequestBuilderFactory。
类型是 FQCN:org.apache.flume.sink.elasticsearch.ElasticSearchSink
必需属性位于 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channel | - | ||
type | - | 组件类型名称,需要 org.apache.flume.sink.elasticsearch.ElasticSearchSink | |
hostNames | - | 逗号分隔的主机名列表:端口,如果端口不存在,将使用默认端口'9300' | |
indexName | flume | 日期所在的索引名称附加到。示例'flume' - >'flume-yyyy-MM-dd'支持任意 Headers 替换,例如。 %替换为命名事件标头 | 的值 |
indexType | logs | 将文档编入索引的类型,默认为“log”支持任意标头替换,例如。 %替换为命名事件标头 | 的值 |
clusterName | elasticsearch | 要连接到 | 的 ElasticSearch 集群的名称 |
batchSize | 100 | 每个 txn 要写入的事件数。 | |
ttl | - | TTL 以天为单位,设置时会导致过期文档自动删除,如果没有设置,文档将永远不会被自动删除。 TTL 仅以较早的整数形式被接受,例如 a1.sinks.k1.ttl = 5 并且还具有限定符 ms(毫秒),s(秒),m(分钟),h(小时),d(天)和 w(星期)。示例 a1.sinks.k1.ttl = 5d 将 TTL 设置为 5 天。请关注 http://www.elasticsearch.org/guide/reference/mapping/ttl-field/ 以获取更多信息。 | |
serializer | org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer | 要使用的 ElasticSearchIndexRequestBuilderFactory 或 ElasticSearchEventSerializer。接受任一类的实现,但首选 ElasticSearchIndexRequestBuilderFactory。 | |
serializer。* | - | 要传递给序列化程序的属性。 |
注意
使用标头替换可以方便地使用事件标头的值来动态决定在存储事件时要使用的 indexName 和 indexType。使用此功能时应谨慎,因为事件提交者现在可以控制 indexName 和 indexType。此外,如果使用 elasticsearch REST 客户端,则事件提交者可以控制所使用的 URL 路径。
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
Kite Dataset Sink
将事件写入 Kite Dataset 的实验接收器。此接收器将反序列化每个传入事件的主体,并将结果记录存储在风筝数据集中。它通过按 URI 加载数据集来确定目标数据集。
唯一受支持的序列化是 avro,并且必须使用 flume.avro.schema.literal
在事件头中传递记录架构
使用 JSON 架构表示或 flume.avro.schema.url
使用可以找到架构的 URL( hdfs:/...
支持 URI)。这与使用 deserializer.schemaType = LITERAL
的 Log4jAppender 水槽客户端和假脱机目录源的 Avro 解串器兼容
。
注 1: flume.avro.schema.hash
Headers 是 not supported 。注意 2:在某些情况下,在超过滚动间隔后可能会略微发生文件滚动。但是,此延迟不会超过 5 秒。在大多数情况下,延迟是可以忽略的。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channel | - | ||
type | - | 必须是 org.apache.flume.sink.kite.DatasetSink | |
kite.dataset.uri | - | 要打开的数据集的 URI | |
kite.repo.uri | - | 要打开的存储库的 URI(不建议使用;请改用 kite.dataset.uri) | |
kite.dataset.namespace | - | 数据集的命名空间,其中将写入记录(不建议使用;请改用 kite.dataset.uri) | |
kite.dataset.name | - | 将写入记录的数据集的名称(不建议使用;请改用 kite.dataset.uri) | |
kite.batchSize | 100 | 每批中要处理的记录数 | |
kite.rollInterval | 30 | 数据文件发布前的最长等待时间(秒) | |
kite.flushable.commitOnBatch | true | 如果 true ,将提交 Flume 事务并在每批 kite.batchSize 条记录上刷新写入程序。此设置仅适用于可刷新数据集。当 true 时,具有提交数据的临时文件可能会保留在数据集目录中。需要手动恢复这些文件,以使数据对 DatasetReaders 可见。 | |
kite.syncable.syncOnBatch | true | 控制接收器在提交事务时是否同步数据。此设置仅适用于可同步数据集。同步 gaurentees 数据将写入远程系统上的稳定存储,同时只刷新数据已离开 Flume 的客户端缓冲区的 gaurentees。当 kite.flushable.commitOnBatch 属性设置为 false 时,此属性也必须设置为 false 。 | |
kite.entityParser | avro | 将 Flume Events 变为 Kite 实体的 Parser。有效值为 avro 和 EntityParser.Builder 接口的实现的完全限定类名。 | |
kite.failurePolicy | retry | 处理不可恢复错误的策略,例如 Event Headers 中缺少 Schema 。默认值 retry 将使当前批次失败,并再次尝试与旧行为匹配。其他有效值是 save ,它将写入原始值 Event 到 kite.error.dataset.uri 数据集,以及 FailurePolicy.Builder 接口的实现的完全限定类名。 | |
kite.error.dataset.uri | - | kite.failurePolicy 设置为 save 时保存失败事件的数据集的 URI。 Required kite.failurePolicy 设置为 save 时。 | |
auth.kerberosPrincipal | - | 用于对 HDFS 进行安全身份验证的 Kerberos 用户主体 | |
auth.kerberosKeytab | - | 主体 | 的 Kerberos keytab 位置(本地 FS) |
auth.proxyUser | - | HDFS 操作的有效用户,如果与 kerberos 主体不同 |
Kafka Sink
这是一个 Flume Sink 实现,可以将数据发布到 Kafka 主题。其中一个目标是将 Flume 与 Kafka 集成,以便基于拉的处理系统可以处理来自各种 Flume 源的数据。
这目前支持 Kafka 服务器版本 0.10.1.0 或更高版本。测试完成了 2.0.1,这是发布时最高的可用版本。
必需属性以粗体字体标记。
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 必须设为 org.apache.flume.sink.kafka.KafkaSink |
kafka.bootstrap.servers | - | Kafka-Sink 将连接到的经纪人列表,以获取主题分区列表这可以是经纪人的部分列表,但我们建议至少两个用于 HA。格式为逗号分隔的主机名列表:port |
kafka.topic | default-flume-topic | Kafka 中将发布消息的主题。如果配置了此参数,则会将消息发布到此主题。如果事件标头包含“主题”字段,则事件将发布到该主题,从而覆盖此处配置的主题。支持任意头部替换,例如。 %将替换为名为“header”的事件标头的值。 (如果使用替换,建议将 Kafka broker 的“auto.create.topics.enable”属性设置为 true。) |
flumeBatchSize | 100 | 一批中要处理的消息数。较大批量可提高吞吐量,同时增加延迟。 |
kafka.producer.acks | 1 | 在考虑成功写入之前,有多少副本必须确认消息。接受的值为 0(从不等待确认),1(仅等待前导),-1(等待所有副本)将此值设置为-1,以避免在某些领导失败的情况下丢失数据。 |
useFlumeEventFormat | false | 默认情况下,事件直接从事件正文中作为字节放到 Kafka 主题上。设置为 true 以将事件存储为 Flume Avro 二进制格式。与 KafkaSource 上的相同属性或 Kafka Channel 上的 parseAsFlumeEvent 属性一起使用时,这将保留生成端的任何 Flume 头。 |
defaultPartitionId | - | 指定要发送到此通道中的所有事件的 Kafka 分区 ID(整数),除非被 partitionIdHeader 覆盖。默认情况下,如果未设置此属性,则事件将由 Kafka Producer 的分区程序分发 - 包括 key (如果已指定)(或由 kafka.partitioner.class 指定的分区程序)。 |
partitionIdHeader | - | 设置后,接收器将从事件标头中获取使用此属性的值命名的字段的值,并将消息发送到主题的指定分区。如果该值表示无效分区,则将抛出 EventDeliveryException。如果标头值存在,则此设置将覆盖 defaultPartitionId 。 |
allowTopicOverride | true | 设置后,接收器将允许将消息生成到 topicHeader 属性指定的主题中(如果提供)。 |
topicHeader | topic | 与 allowTopicOverride 一起设置时,将使用此属性的值生成一个指向名称的值的消息。与 Kafka Source topicHeader 属性一起使用时应小心,以避免创建环回。 |
kafka.producer.security.protocol | PLAINTEXT | 如果使用某种程度的安全性写入 Kafka,则设置为 SASL_PLAINTEXT,SASL_SSL 或 SSL。有关安全设置的其他信息,请参见下文。 |
更多 生产环境 者安全道具 | 如果使用 SASL_PLAINTEXT,SASL_SSL 或 SSL 请参阅 Kafka security 以获取需要在 生产环境 者上设置的其他属性。 | |
其他 Kafka Producer 属性 | - | 这些属性用于配置 Kafka Producer。可以使用 Kafka 支持的任何 生产环境 者属性。唯一的要求是使用前缀 kafka.producer 添加属性名称。例如:kafka.producer.linger.ms |
注意
Kafka Sink 使用 topic
和 key
FlumeEvent 标头中的属性将事件发送到 Kafka。如果 topic
如果存在于标头中,则会将事件发送到该特定主题,从而覆盖为接收器配置的主题。如果 key
存在于头文件中,Kafka 将使用该密钥对主题分区之间的数据进行分区。具有相同密钥的事件将发送到同一分区。如果密钥为空,则将事件发送到随机分区。
Kafka 接收器还为 key.serializer(org.apache.kafka.common.serialization.StringSerializer)和 value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer)提供默认值。不建议修改这些参数。
不推荐使用的属性
属性名称 | 默认 | 说明 |
---|---|---|
brokerList | - | 使用 kafka.bootstrap.servers |
topic | default-flume-topic | 使用 kafka.topic |
batchSize | 100 | 使用 kafka.flumeBatchSize |
requiredAcks | 1 | 使用 kafka.producer.acks |
下面给出 Kafka 接收器的示例配置。以前缀 kafka.producer
开头的属性
Kafka 制片人。创建 Kafka 生成器时传递的属性不限于此示例中给出的属性。此外,可以在此处包含您的自定义属性,并通过作为方法参数传入的 Flume Context 对象在预处理器中访问它们。
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
Security and Kafka Sink:
Flume 和 Kafka 之间的通信渠道支持安全认证和数据加密。对于安全身份验证,可以使用 Kafka 0.9.0 版中的 SASL / GSSAPI(Kerberos V5)或 SSL(即使该参数名为 SSL,实际协议是 TLS 实现)。
截至目前,数据加密仅由 SSL / TLS 提供。
设置 kafka.producer.security.protocol
以下任何值均表示:
SASL_PLAINTEXT - 没有数据加密的 Kerberos 或纯文本身份验证
SASL_SSL - 使用数据加密的 Kerberos 或纯文本身份验证
SSL - 具有可选身份验证的基于 TLS 的加密。
警告
启用 SSL 时性能会下降,其大小取决于 CPU 类型和 JVM 实现。参考: Kafka security overview 和用于跟踪此问题的 jira: KAFKA-2561
TLS and Kafka Sink:
请阅读 Configuring Kafka Clients SSL 中描述的步骤,以了解有关微调的其他配置设置,例如以下任何一项:安全提供程序,密码套件,已启用协议,信任库或密钥库类型。
配置服务器端身份验证和数据加密的示例。
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>
在此处指定信任库是可选的,可以使用全局信任库。有关全局 SSL 设置的更多详细信息,请参阅 SSL/TLS support 部分。
注意:默认情况下属性 ssl.endpoint.identification.algorithm
未定义,因此不执行主机名验证。要启用主机名验证,请设置以下属性
a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
启用后,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):
如果还需要客户端身份验证,则还需要将以下内容添加到 Flume 代理配置中,或者可以使用全局 SSL 设置(请参阅 SSL/TLS support 部分)。每个 Flume 代理都必须拥有其客户证书,Kafka 经纪人必须单独或通过其签名链来信任。常见示例是由单个根 CA 签署每个客户端证书,而后者又由 Kafka 经纪人信任。
# optional, the global keystore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>
如果密钥库和密钥使用不同的密码保护,那么 ssl.key.password
property 将为 生产环境 者密钥库提供所需的额外密钥:
a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>
Kerberos and Kafka Sink:
要将 Kafka 接收器与使用 Kerberos 保护的 Kafka 集群 一起使用,请设置 producer.security.protocol
上面提到的 生产环境 者 property 。与 Kafka 代理一起使用的 Kerberos 密钥表和主体在 JAAS 文件的“KafkaClient”部分中指定。 “客户端”部分描述了 Zookeeper 连接(如果需要)。有关 JAAS 文件内容的信息,请参见 Kafka doc 。可以通过 flume-env.sh 中的 JAVA_OPTS 指定此 JAAS 文件的位置以及可选的系统范围的 kerberos 配置:
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
使用 SASL_PLAINTEXT 的示例安全配置:
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
使用 SASL_SSL 的安全配置示例:
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>
示例 JAAS 文件。有关其内容的参考,请参阅 SASL configuration 的 Kafka 文档中所需认证机制(GSSAPI / PLAIN)的客户端配置部分。与 Kafka Source 或 Kafka Channel 不同,不需要“Client”部分,除非其他连接组件需要它。另外,请确保 Flume 进程的操作系统用户对 jaas 和 keytab 文件具有读权限。
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
HTTP Sink
此接收器的行为是它将从通道获取事件,并将这些事件发送到远程使用 HTTP POST 请求的服务。事件内容作为 POST 正文发送。
此接收器的错误处理行为取决于目标服务器返回的 HTTP 响应。接收器退避/就绪状态是可配置的,事务提交/回滚结果以及事件是否有助于成功的事件排放计数也是可配置的。
状态代码不可读的服务器返回的任何格式错误的 HTTP 响应都将导致退避信号,并且不会从该通道中消耗该事件。
必需属性位于 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
channel | - | ||
type | - | 组件类型名称,需要 http 。 | |
endpoint | - | 要发送到 | 的完全限定的 URL endpoints |
connectTimeout | 5000 | 套接字连接超时(以毫秒为单位) | |
requestTimeout | 5000 | 最大请求处理时间(以毫秒为单位) | |
contentTypeHeader | text / plain | HTTP Content-Type 标头 | |
acceptHeader | text / plain | HTTP Accept 标头值 | |
defaultBackoff | true | 是否在接收所有 HTTP 状态码时默认退避 | |
defaultRollback | true | 是否在接收所有 HTTP 状态码时默认回滚 | |
defaultIncrementMetrics | false | 是否在接收所有 HTTP 状态代码时默认增加指标 | |
backoff.CODE | - | 为个人(即 200)代码或组(即 2XX)代码 | 配置特定退避 |
rollback.CODE | - | 为单个(即 200)代码或组(即 2XX)代码配置特定回滚 | |
incrementMetrics.CODE | - | 为单个(即 200)代码或组(即 2XX)代码配置特定的度量标准增量 |
请注意,最具体的 HTTP 状态代码匹配用于 backoff,rollback 和 incrementMetrics 配置选项。如果存在 2XX 和 200 状态代码的配置值,则 200 个 HTTP 代码将使用 200 值,而 201-299 范围内的所有其他 HTTP 代码将使用 2XX 值。
消耗任何空或空事件而不向 HTTP endpoints 发出任何请求。
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true
自定义接收器
自定义接收器是您自己的 Sink 接口实现。启动 Flume 代理程序时,必须在代理程序的类路径中包含自定义接收器的类及其依赖项。自定义接收器的类型是其 FQCN。必需属性位于 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
channel | - | |
type | - | 组件类型名称,需要是您的 FQCN |
代理名为 a1 的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1
Flume Channels
通道是事件在代理上暂存的存储库。 Source 添加事件,Sink 删除它。
记忆 Channels
事件存储在具有可配置最大大小的内存中队列中。它非常适合需要更高吞吐量的流量,并且在代理发生故障时准备丢失分阶段数据。必需属性位于 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称,需要 memory |
capacity | 100 | 通道中存储的最大事件数 |
transactionCapacity | 100 | 每个事务通道从源获取或提供给接收器的最大事件数 |
keep-alive | 3 | 添加或删除事件的超时秒数 |
byteCapacityBufferPercentage | 20 | 定义 byteCapacity 与通道中所有事件的估计总大小之间的缓冲区百分比,以计算 Headers 中的数据。见下文。 |
byteCapacity | 参见说明 | 允许的最大内存总数 bytes ,作为此通道中所有事件的总和。该实现仅计算事件 body ,这也是提供 byteCapacityBufferPercentage 配置参数的原因。默认为计算值,等于 JVM 可用的最大内存的 80%(即命令行传递的-Xmx 值的 80%)。请注意,如果在单个 JVM 上有多个内存通道,并且它们碰巧保持相同的物理事件(即,如果您使用来自单个源的复制通道选择器),那么这些事件大小可能会被重复计算以用于通道 byteCapacity 目的。将此值设置为 0 将导致此值回退到内部硬限制大约 200 GB。 |
代理名为 a1 的示例:
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
JDBC 通道
事件存储在由数据库支持的持久存储中。 JDBC 通道当前支持嵌入式 Derby。这是一个持久的渠道,非常适合可恢复性很重要的流程。必需属性位于 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
type | - | 组件类型名称,需要 jdbc | |
db.type | DERBY | 数据库供应商,需要是 DERBY。 | |
driver.class | org.apache.derby.jdbc.EmbeddedDriver | 供应商的 JDBC 驱动程序的类 | |
driver.url | (从其他属性构造) | JDBC 连接 URL | |
db.username | “sa” | 数据库连接的用户 ID | |
db.password | - | 数据库连接密码 | |
connection.properties.file | - | JDBC 连接属性文件路径 | |
create.schema | true | 如果为 true,则创建 db 模式(如果不存在) | |
create.index | true | 创建索引以加快查找速度 | |
create.foreignkey | true | ||
transaction.isolation | “READ_COMMITTED” | 数据库会话的隔离级别 READ_UNCOMMITTED,READ_COMMITTED,SERIALIZABLE,REPEATABLE_READ | |
maximum.connections | 10 | 允许 db | 的最大连接数 |
maximum.capacity | 0(无限制) | Channels 中的最大事件数 | |
sysprop。* | DB 供应商特定属性 | ||
sysprop.user.home | 存储嵌入式 Derby 数据库的主路径 |
代理名为 a1 的示例:
a1.channels = c1
a1.channels.c1.type = jdbc
Kafka Channels
事件存储在 Kafka 集群 中(必须单独安装)。 Kafka 提供高可用性和复制,因此如果代理或 kafka 代理崩溃,事件可立即用于其他接收器
Kafka Channels 可用于多种场景:
使用 Flume 源和接收器 - 它为事件提供了可靠且高度可用的通道
使用 Flume 源和拦截器但没有接收器 - 它允许将 Flume 事件写入 Kafka 主题,供其他应用程序使用
使用 Flume 接收器,但没有源 - 它是一种低延迟,容错的方式将事件从 Kafka 发送到 Flume 接收器,如 HDFS,HBase 或 Solr
这目前支持 Kafka 服务器版本 0.10.1.0 或更高版本。测试完成了 2.0.1,这是发布时最高的可用版本。
配置参数组织如下:
与通道相关的配置值通常应用于通道配置级别,例如:a1.channel.k1.type =
与 Kafka 相关的配置值或 Channel 运行方式的前缀为“kafka。”,(这对 CommonClient Configs 是有效的)例如:a1.channels.k1.kafka.topic 和 a1.channels.k1.kafka.bootstrap.servers 。这与 hdfs 接收器的运行方式没有什么不同
特定于 生产环境 者/消费者的属性以 kafka.producer 或 kafka.consumer 为前缀
在可能的情况下,使用 Kafka 参数名称,例如:bootstrap.servers 和 acks
此版本的 flume 与以前的版本向后兼容,但是下表中显示了已弃用的属性,并且在配置文件中存在时会在启动时记录警告消息。
必需属性位于 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
type | - | 组件类型名称,需要 org.apache.flume.channel.kafka.KafkaChannel | |
kafka.bootstrap.servers | - | 渠道使用的 Kafka 集群中的经纪商列表这可以是经纪人的部分列表,但我们建议至少两个用于 HA。格式为逗号分隔的主机名列表:port | |
kafka.topic | flume-channel | Kafka 话题, Channels 将使用 | |
kafka.consumer.group.id | flume | 渠道用于向 Kafka 注册的消费者群组 ID。多个通道必须使用相同的主题和组,以确保当一个代理程序发生故障时,另一个代理程序可以获取数据请注意,具有相同 ID 的非通道使用者可能会导致数据丢失。 | |
parseAsFlumeEvent | true | 期望在 Channels 中使用 FlumeEvent 模式的 Avro 基准。如果 Flume 源写入通道,则应该为 true;如果其他生成器正在写入通道正在使用的主题,则应为 false。通过使用 flume-ng-sdk 工件 | 提供的 org.apache.flume.source.avro.AvroFlumeEvent,可以在 Flume 之外解析到 Kafka 的 Flume 源消息 |
pollTimeout | 500 | 在消费者的“poll()”调用中等待的时间量(以毫秒为单位)。 [https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long](https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long) ) | |
defaultPartitionId | - | 指定要发送到此通道中的所有事件的 Kafka 分区 ID(整数),除非被 partitionIdHeader 覆盖。默认情况下,如果未设置此属性,则事件将由 Kafka Producer 的分区程序分发 - 包括 key (如果已指定)(或由 kafka.partitioner.class 指定的分区程序)。 | |
partitionIdHeader | - | 设置后, 生产环境 者将从事件 Headers 中获取使用此属性值命名的字段的值,并将消息发送到主题的指定分区。如果该值表示无效分区,则该事件将不被接受。如果标头值存在,则此设置将覆盖 defaultPartitionId 。 | |
kafka.consumer.auto.offset.reset | latest | 当 Kafka 中没有初始偏移量或服务器上当前偏移量不再存在时(例如因为该数据已被删除)该怎么办:最早:自动将偏移量重置为最新的最新偏移量:自动将偏移量重置为最新的偏移量:如果没有为消费者的群体找到任何其他偏移量,则向消费者抛出异常:向消费者抛出异常。 | |
kafka.producer.security.protocol | PLAINTEXT | 如果使用某种程度的安全性写入 Kafka,则设置为 SASL_PLAINTEXT,SASL_SSL 或 SSL。有关安全设置的其他信息,请参见下文。 | |
kafka.consumer.security.protocol | PLAINTEXT | 与 kafka.producer.security.protocol 相同,但是从 Kafka 读取/消费。 | |
更多 生产环境 者/消费者安全道具 | 如果使用 SASL_PLAINTEXT,则 SASL_SSL 或 SSL 引用 Kafka security 以获取需要在 生产环境 者/消费者上设置的其他属性。 |
不推荐使用的属性
属性名称 | 默认 | 说明 |
---|---|---|
brokerList | - | 渠道使用的 Kafka 集群中的经纪商列表这可以是经纪人的部分列表,但我们建议至少两个用于 HA。格式为逗号分隔的主机名列表:port |
topic | flume-channel | 使用 kafka.topic |
groupId | flume | 使用 kafka.consumer.group.id |
readSmallestOffset | false | 使用 kafka.consumer.auto.offset.reset |
migrateZookeeperOffsets | true | 当找不到 Kafka 存储的偏移量时,在 Zookeeper 中查找偏移量并将它们提交给 Kafka。这应该是支持从旧版本的 Flume 无缝 Kafka 客户端迁移。迁移后,可以将其设置为 false,但通常不需要这样做。如果未找到 Zookeeper 偏移量,则 kafka.consumer.auto.offset.reset 配置定义如何处理偏移量。 |
注意
由于通道负载均衡 的方式,代理首次启动时可能会出现重复事件
代理名为 a1 的示例:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
Security and Kafka Channel:
Flume 和 Kafka 之间的通信渠道支持安全认证和数据加密。对于安全身份验证,可以使用 Kafka 0.9.0 版中的 SASL / GSSAPI(Kerberos V5)或 SSL(即使该参数名为 SSL,实际协议是 TLS 实现)。
截至目前,数据加密仅由 SSL / TLS 提供。
设置 kafka.producer|consumer.security.protocol
以下任何值均表示:
SASL_PLAINTEXT - 没有数据加密的 Kerberos 或纯文本身份验证
SASL_SSL - 使用数据加密的 Kerberos 或纯文本身份验证
SSL - 具有可选身份验证的基于 TLS 的加密。
警告
启用 SSL 时性能会下降,其大小取决于 CPU 类型和 JVM 实现。参考: Kafka security overview 以及用于跟踪此问题的 jira: KAFKA-2561
TLS and Kafka Channel:
请阅读 Configuring Kafka Clients SSL 中描述的步骤,以了解用于微调的其他配置设置,例如以下任何一项:安全提供程序,密码套件,启用的协议,信任库或密钥库类型。
配置服务器端身份验证和数据加密的示例。
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
在此处指定信任库是可选的,可以使用全局信任库。有关全局 SSL 设置的更多详细信息,请参阅 SSL/TLS support 部分。
注意:默认情况下属性 ssl.endpoint.identification.algorithm
未定义,因此不执行主机名验证。要启用主机名验证,请设置以下属性
a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS
启用后,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):
如果还需要客户端身份验证,则还需要将以下内容添加到 Flume 代理配置中,或者可以使用全局 SSL 设置(请参阅 SSL/TLS support 部分)。每个 Flume 代理都必须拥有必须的客户证书被 Kafka 经纪人单独或通过他们的签名链信任。常见示例是由单个根 CA 签署每个客户端证书,而后者又由 Kafka 经纪人信任。
# optional, the global keystore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore>
# optional, the global keystore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore>
如果密钥库和密钥使用不同的密码保护,那么 ssl.key.password
property 将为消费者和 生产环境 者密钥库提供所需的额外秘密:
a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key>
Kerberos and Kafka Channel:
要将 Kafka 通道与使用 Kerberos 保护的 Kafka 集群 一起使用,请设置 producer/consumer.security.protocol
上面提到的 生产环境 者和/或消费者的 property 。与 Kafka 代理一起使用的 Kerberos 密钥表和主体在 JAAS 文件的“KafkaClient”部分中指定。 “客户端”部分描述了 Zookeeper 连接(如果需要)。有关 JAAS 文件内容的信息,请参阅 Kafka doc 。可以通过 flume-env.sh 中的 JAVA_OPTS 指定此 JAAS 文件的位置以及可选的系统范围的 kerberos 配置:
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
使用 SASL_PLAINTEXT 的示例安全配置:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
使用 SASL_SSL 的安全配置示例:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
示例 JAAS 文件。有关其内容的参考,请参阅 SASL configuration 的 Kafka 文档中所需认证机制(GSSAPI / PLAIN)的客户端配置部分。由于 Kafka Source 也可能连接到 Zookeeper 以进行偏移迁移,因此“Client”部分也添加到此示例中。除非您需要偏移迁移,否则不需要这样做,或者您需要此部分用于其他安全组件。另外,请确保 Flume 进程的操作系统用户对 jaas 和 keytab 文件具有读权限。
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
文件 Channels
必需属性位于 bold 中。
属性名称默认 | 描述 | ||
---|---|---|---|
type | - | 组件类型名称,需要为 file 。 | |
checkpointDir | ~ / .flume / file-channel / checkpoint | 将存储检查点文件的目录 | |
useDualCheckpoints | false | 备份检查点。如果设置为 true ,则设置 backupCheckpointDir must | |
backupCheckpointDir | - | 备份检查点的目录。此目录 must not 与数据目录或检查点目录 | 相同 |
dataDirs | ~ / .flume / file-channel / data | 用于存储日志文件的逗号分隔目录列表。在不同的磁盘上使用多个目录可以改善文件通道的性能 | |
transactionCapacity | 10000 | 通道支持的最大事务大小 | |
checkpointInterval | 30000 | 检查点之间的时间量(以毫秒为单位) | |
maxFileSize | 2146435071 | 单个日志文件的最大大小(以字节为单位) | |
minimumRequiredSpace | 524288000 | 最小所需可用空间(以字节为单位)。为避免数据损坏,当可用空间低于此值时,文件通道停止接受接收/放置请求 | |
capacity | 1000000 | 通道的最大容量 | |
keep-alive | 3 | 等待执行操作的时间(以秒为单位) | |
use-log-replay-v1 | false | 专家:使用旧的重播逻辑 | |
use-fast-replay | false | 专家:不使用队列重播 | |
checkpointOnClose | true | 控制是否在关闭通道时创建检查点。通过避免重放,在关闭时创建检查点可以提高文件通道的后续启动速度。 | |
encryption.activeKey | - | 用于加密新数据的密钥名称 | |
encryption.cipherProvider | - | 密码提供程序类型,支持的类型:AESCTRNOPADDING | |
encryption.keyProvider | - | 密钥提供程序类型,支持的类型:JCEKSFILE | |
encryption.keyProvider.keyStoreFile | - | 密钥库文件的路径 | |
encrpytion.keyProvider.keyStorePasswordFile | - | 密钥库密码文件的路径 | |
encryption.keyProvider.keys | - | 所有键的列表(例如 activeKey 设置的历史记录) | |
encyption.keyProvider.keys。*。passwordFile | - | 可选密钥密码文件的路径 |
注意
默认情况下,文件通道使用上面指定的用户主目录内的检查点和数据目录的路径。因此,如果代理中有多个活动的文件通道实例,则只有一个实例可以锁定目录并导致其他通道初始化失败。因此,有必要为所有已配置的通道提供显式路径,最好是在不同的磁盘上。此外,由于文件通道将在每次提交后同步到磁盘,因此将其与将事件一起批处理的接收器/源耦合可能是必要的,以便在多个磁盘不可用于检查点和数据时提供良好的性能目录。
代理名为 a1 的示例:
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
Encryption
以下是一些示例配置:
使用密钥存储密码分隔生成密码密钥:
keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
-keysize 128 -validity 9000 -keystore test.keystore \
-storetype jceks -storepass keyStorePassword
使用与密钥库密码相同的密码生成密钥:
keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
-keystore src/test/resources/test.keystore -storetype jceks \
-storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-0
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-0
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0
假设您已使用密钥 0 输出密钥,并且应使用密钥 1 加密新文件:
a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
与上面相同的场景,但 key-0 有自己的密码:
a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password
Spillable Memory Channel
事件存储在内存中队列和磁盘上。内存中队列充当主存储,磁盘充当溢出。磁盘存储使用嵌入的文件通道进行管理。当内存中队列已满时,其他传入事件将存储在文件通道中。该通道非常适用于在正常操作期间需要高吞吐量存储器通道的流量,但同时需要更大容量的文件通道,以便更好地容忍间歇性接收器侧中断或降低排出速率。在这种异常情况下,吞吐量将大致降低到文件通道速度。如果代理程序崩溃或重新启动,则只有代理程序联机时才会恢复存储在磁盘上的事件。 This channel is currently experimental and not recommended for use in production.
必需属性在 bold 中。有关其他必需属性,请参阅文件通道。
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称,需要 SPILLABLEMEMORY |
memoryCapacity | 10000 | 内存队列中存储的最大事件数。要禁用内存中队列的使用,请将此值设置为零。 |
overflowCapacity | 100000000 | 溢出磁盘中存储的最大事件数(即文件通道)。要禁用溢出,请将此值设置为零。 |
overflowTimeout | 3 | 内存填满时启用磁盘溢出之前等待的秒数。 |
byteCapacityBufferPercentage | 20 | 定义 byteCapacity 与通道中所有事件的估计总大小之间的缓冲区百分比,以计算 Headers 中的数据。见下文。 |
byteCapacity | 参见说明 | 允许的最大内存数为 bytes ,作为内存队列中所有事件的总和。该实现仅计算事件 body ,这也是提供 byteCapacityBufferPercentage 配置参数的原因。默认为计算值,等于 JVM 可用的最大内存的 80%(即命令行传递的-Xmx 值的 80%)。请注意,如果在单个 JVM 上有多个内存通道,并且它们碰巧保持相同的物理事件(即,如果您使用来自单个源的复制通道选择器),那么这些事件大小可能会被重复计算以用于通道 byteCapacity 目的。将此值设置为 0 将导致此值回退到大约 200 GB 的内部硬限制。 |
avgEventSize | 500 | 事件的估计平均大小(以字节为单位)进入通道 |
<文件通道属性> | 参见文件通道 | 可以使用除“保持活动”和“容量”之外的任何文件通道属性。文件通道的保持活动由 Spillable Memory Channel 管理。使用'overflowCapacity'设置文件通道的容量。 |
如果达到 memoryCapacity 或 byteCapacity 限制,则内存中队列被视为已满。
代理名为 a1 的示例:
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
禁用内存中队列的使用和函数,如文件通道:
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 0
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
要禁用溢出磁盘并仅将其用作内存通道:
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 100000
a1.channels.c1.overflowCapacity = 0
伪 Transaction 渠道
警告
伪事务通道仅用于单元测试目的,不适用于 生产环境 用途。
必需属性在 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称,需要 org.apache.flume.channel.PseudoTxnMemoryChannel |
capacity | 50 | 通道中存储的最大事件数 |
keep-alive | 3 | 添加或删除事件的超时秒数 |
自定义渠道
自定义通道是您自己的 Channel 接口实现。启动 Flume 代理程序时,必须在代理程序的类路径中包含自定义通道的类及其依赖项。自定义渠道的类型是其 FQCN。必需属性位于 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称,需要是 FQCN |
代理名为 a1 的示例:
a1.channels = c1
a1.channels.c1.type = org.example.MyChannel
Flume 通道选择器
如果未指定类型,则默认为“复制”。
复制通道选择器(默认)
需要属性在 bold 。
属性名称 | 默认 | 说明 | |
---|---|---|---|
selector.type | 复制 | 组件类型名称,需要 replicating | |
selector.optional | - | 要标记为 optional | 的通道组 |
名为 a1 的代理程序示例及其名为 r1 的源代码:
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3
在上面的配置中,c3 是可选的通道。无法写入 c3 只是被忽略了。由于 c1 和 c2 未标记为可选,因此无法写入这些通道将导致事务失败。
多路复用通道选择器
必需属性在 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
selector.type | 复制 | 组件类型名称,需要 multiplexing |
selector.header | flume.selector.header | |
selector.default | - | |
selector.mapping。* | - |
名为 a1 的代理程序示例及其名为 r1 的源代码:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
自定义 Channels 选择器
自定义通道选择器是您自己的 ChannelSelector 接口实现。启动 Flume 代理程序时,自定义通道选择器的类及其依赖项必须包含在代理程序的类路径中。自定义通道选择器的类型是其 FQCN。
属性名称 | 默认 | 说明 |
---|---|---|
selector.type | - | 组件类型名称,需要是您的 FQCN |
名为 a1 的代理示例及其源名为 r1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector
Flume Sink 处理器
接收组允许用户将多个接收器分组到一个实体中。接收器处理器可用于在组内的所有接收器上提供负载均衡 功能,或在时间故障的情况下实现从一个接收器到另一个接收器的故障转移。
必需属性在 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
sinks | - | 参与该组的以空格分隔的接收器列表 |
processor.type | default | 组件类型名称,需要为 default , failover 或 load_balance |
代理名为 a1 的示例:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
默认接收器处理器
默认接收器只接受一个接收器。用户不必为单个接收器创建处理器(接收器组)。相反,用户可以遵循本用户指南中上面解释的源 - 通道 - 接收器模式。
故障转移接收器
故障转移接收器维护一个优先级的接收器列表,保证只要有一个可用的事件将被处理(传递)。
故障转移机制的工作原理是将故障接收器降级到池中,在池中为它们分配一个冷却期,在重试之前随顺序故障而增加。接收器成功发送事件后,它将恢复到实时池。接收器具有与之相关的优先级,数量越大,优先级越高。如果在发送事件时接收器发生故障,则接下来将尝试下一个具有最高优先级的接收器以发送事件。例如,在优先级为 80 的接收器之前激活优先级为 100 的接收器。如果未指定优先级,则根据配置中指定接收器的顺序确定 thr 优先级。
要配置,请将接收器组处理器设置为 failover
并为所有单个接收器设置优先级。所有指定的优先级必须是唯一的此外,可以使用 maxpenalty
设置故障转移时间的上限(以毫秒为单位)
属性。
必需属性位于 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
sinks | - | 参与该组的以空格分隔的接收器列表 |
processor.type | default | 组件类型名称,需要 failover |
processor.priority.<sinkName> | - | 优先值。 <sinkName>必须是与当前接收器组关联的接收器实例之一。较高优先级值 Sink 较早被激活。绝对值越大表示优先级越高 |
processor.maxpenalty | 30000 | 失败的接收器的最大退避周期(毫秒) |
代理名为 a1 的示例:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
负载均衡接收器处理器
负载均衡 接收器处理器提供了在多个接收器上进行负载均衡流量的功能。它维护一个索引的活动接收器列表,必须在其上分配负载。实现支持使用 round_robin
分配负载
或 random
选择机制。选择机制的选择默认为 round_robin
类型,但可以通过配置覆盖。自定义选择机制通过继承自 AbstractSinkSelector
的自定义类支持
。
调用时,此选择器使用其配置的选择机制选择下一个接收器并调用它。对于 round_robin
和 random
如果所选接收器未能传送事件,则处理器通过其配置的选择机制选择下一个可用接收器。此实现不会将失败的接收器列入黑名单,而是继续乐观地尝试每个可用的接收器。如果所有接收器调用都导致失败,则选择器将故障传播到接收器运行器。
如果 backoff
启用后,接收器处理器会将失败的接收器列入黑名单,将其删除以供给定超时的选择。当超时结束时,如果接收器仍然没有响应,则超时会以指数方式增加,以避免在无响应的接收器上长时间等待时卡住。在禁用此功能的情况下,在循环中,所有失败的接收器负载将被传递到下一个接收器,因此不均衡
必需属性在 bold 中。
属性名称 | 默认 | 说明 | |
---|---|---|---|
processor.sinks | - | 以空格分隔的参与组的接收器列表 | |
processor.type | default | 组件类型名称,需要 load_balance | |
processor.backoff | false | 应该以指数方式退回失败的接收器。 | |
processor.selector | round_robin | 选择机制。必须是 round_robin , random 或继承自 AbstractSinkSelector | 的自定义类的 FQCN |
processor.selector.maxTimeOut | 30000 | 由退避选择器用于限制指数退避(以毫秒为单位) |
代理名为 a1 的示例:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
自定义接收器处理器
目前不支持自定义接收器处理器。
事件序列化程序
file_roll
下沉和 hdfs
下沉既支持 EventSerializer
接口。下面提供了随 Flume 一起提供的 EventSerializer 的详细信息。
Body Text Serializer
别名: text
。此拦截器将事件的主体写入输出流,而不进行任何转换或修改。事件 Headers 将被忽略。配置选项如下:
属性名称 | 默认 | 说明 |
---|---|---|
appendNewline | true | 是否在写入时将换行符附加到每个事件。由于遗留原因,默认值为 true 假定事件不包含换行符。 |
代理名为 a1 的示例:
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false
“Flume Event”Avro Event Serializer
别名: avro_event
。
此拦截器将 Flume 事件序列化为 Avro 容器文件。使用的模式与 Avro RPC 机制中用于 Flume 事件的模式相同。
此序列化程序继承自 AbstractAvroEventSerializer
类。
配置选项如下:
属性名称 | 默认 | 说明 |
---|---|---|
syncIntervalBytes | 2048000 | Avro 同步间隔,近似字节。 |
compressionCodec | null | Avro 压缩编解码器。有关受支持的编解码器,请参阅 Avro 的 CodecFactory 文档。 |
代理名为 a1 的示例:
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy
Avro Event Serializer
别名:此序列化程序没有别名,必须使用完全限定的类名类名指定。
这将 Flume 事件序列化为 Avro 容器文件,如“Flume Event”Avro Event Serializer,但记录模式是可配置的。记录模式可以指定为 Flume 配置属性,也可以在事件头中传递。
要将记录模式作为 Flume 配置的一部分传递,请使用属性 schemaURL
如下所列。
要在事件标头中传递记录模式,请指定事件标头 flume.avro.schema.literal
包含架构的 JSON 格式表示或 flume.avro.schema.url
使用可以找到架构的 URL( hdfs:/...
支持 URI)。
此序列化程序继承自 AbstractAvroEventSerializer
类。
配置选项如下:
属性名称 | 默认 | 说明 |
---|---|---|
syncIntervalBytes | 2048000 | Avro 同步间隔,近似字节。 |
compressionCodec | null | Avro 压缩编解码器。有关受支持的编解码器,请参阅 Avro 的 CodecFactory 文档。 |
schemaURL | null | Avro 架构 URL。 Headers 中指定的模式 ovverride 此选项。 |
代理名为 a1 的示例:
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.compressionCodec = snappy
a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc
水槽拦截器
Flume 具有在飞行中修改/删除事件的能力。这是在拦截器的帮助下完成的。拦截器是实现 org.apache.flume.interceptor.Interceptor
的类
接口。拦截器可以根据拦截器开发者选择的任何标准修改甚至删除事件。Flume 支持拦截器的链接。通过在配置中指定拦截器构建器类名列表,可以实现此目的。拦截器在源配置中指定为以空格分隔的列表。指定拦截器的顺序是它们被调用的顺序。一个拦截器返回的事件列表将传递给链中的下一个拦截器。拦截器可以修改或删除事件。如果拦截器需要删除事件,它就不会在它返回的列表中返回该事件。如果要删除所有事件,则只返回一个空列表。拦截器是命名组件,下面是它们如何通过配置创建的示例:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1
请注意,拦截器构建器将传递给类型 config 参数。拦截器本身是可配置的,可以传递配置值,就像传递给任何其他可配置组件一样。在上面的示例中,事件首先传递给 HostInterceptor,然后 HostInterceptor 返回的事件传递给 TimestampInterceptor。您可以指定完全限定的类名(FQCN)或别名 timestamp
。如果您有多个收集器写入相同的 HDFS 路径,那么您也可以使用 HostInterceptor。
时间戳拦截器
此拦截器将事件标头插入到事件标头中,以毫秒为单位处理事件。此拦截器使用键 timestamp
插入标头
(或 header
指定的
property)其值是相关的时间戳。如果已在配置中存在,则此拦截器可以保留现有时间戳。
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称必须为 timestamp 或 FQCN |
headerName | timestamp | 用于放置生成的时间戳的标头的名称。 |
preserveExisting | false | 如果时间戳已存在,是否应该保留 - 是或否 |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
主机拦截器
此拦截器插入运行此代理程序的主机的主机名或 IP 地址。它插入一个带有键的 Headers host
或者配置的密钥,其值是主机的主机名或 IP 地址,具体取决于配置。
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称必须为 host |
preserveExisting | false | 如果主机标头已存在,是否应保留 - true 或 false |
useIP | true | 如果为 true,则使用 IP 地址,否则使用 hostname。 |
hostHeader | host | 要使用的标头密钥。 |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
静态拦截器
静态拦截器允许用户将具有静态值的静态头附加到所有事件。
当前实现不允许一次指定多个标头。相反,用户可能会链接多个静态拦截器,每个静态拦截器定义一个
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称必须为 static |
preserveExisting | true | 如果已配置的标头已存在,是否应保留 - true 或 false |
key | key | 应创建的 Headers 名称 |
value | value | 应创建的静态值 |
代理名为 a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
删除 Headers 拦截器
此拦截器通过删除一个或多个标头来操纵 Flume 事件标头。它可以根据列表中的正则表达式或 Headers 删除静态定义的 Headers , Headers 。如果未定义这些,或者没有标头符合条件,则不会修改 Flume 事件。
请注意,如果只需要删除一个标头,则按名称指定标头可提供相对于其他两种方法的性能优势。
属性名称 | 默认 | 说明 | |
---|---|---|---|
type | - | 组件类型名称必须为 remove_header | |
withName | - | 要删除的 Headers 的名称 | |
fromList | - | 要删除的 Headers 列表,用 fromListSeparator | 指定的分隔符分隔 |
fromListSeparator | \ s *,\ s * | 正则表达式,用于分隔 fromList 指定的列表中的多个 Headers 名称。默认是一个逗号,由任意数量的空白字符包围 | |
匹配 | - | 删除名称与此正则表达式匹配的所有 Headers |
UUID 拦截器
此拦截器在所有截获的事件上设置通用唯一标识符。一个示例 UUID 是 b5755073-77a9-43c1-8fad-b7a586fc1b97
,表示 128 位值。
如果事件的应用程序级别唯一键不可用,请考虑使用 UUIDInterceptor 自动为事件分配 UUID。一旦 UUID 进入 Flume 网络就将其分配给事件非常重要;也就是说,在流量的第一个 Flume Source 中。这使得在 Flume 网络中面对复制和重新发送时事件的后续重复数据删除可以实现高可用性和高性能。如果应用程序级别密钥可用,则优于自动生成的 UUID,因为它使用所述公知的应用程序级别密钥在数据存储中启用后续更新和事件删除。
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称必须为 org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder |
headerName | id | 要修改的 Flume Headers 的名称 |
preserveExisting | true | 如果 UUID 标头已存在,是否应保留 - true 或 false |
前缀 | “” | 前缀为每个生成的 UUID 的前缀字符串常量 |
Morphline Interceptor
此拦截器通过 morphline configuration file 过滤事件,该定义了一系列转换命令,用于将记录从一个命令传送到另一个命令。例如,morphline 可以忽略某些事件或通过基于正则表达式的模式匹配来更改或插入某些事件头,或者它可以通过 Apache Tika 在截获的事件上自动检测和设置 MIME 类型。例如,这种数据包嗅探可用于 Flume 拓扑中基于内容的动态路由。 MorphlineInterceptor 还可以帮助实现到多个 Apache Solr 集合的动态路由(例如,用于多租户)。
目前,存在一个限制,即拦截器的形态线不能为每个输入事件生成多个输出记录。此拦截器不适用于重型 ETL 处理 - 如果您需要,请考虑将 ETL 处理从 Flume Source 移动到 Flume Sink,例如,到 MorphlineSolrSink。
必需属性位于 bold 中。
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称必须为 org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder |
morphlineFile | - | 本地文件系统与 morphline 配置文件的相对或绝对路径。示例: /etc/flume-ng/conf/morphline.conf |
morphlineId | null | 如果 morphline 配置文件中有多个形态线,则用于标识形态线的可选名称 |
示例 flume.conf 文件:
a1.sources.avroSrc.interceptors = morphlineinterceptor
a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
搜索和替换拦截器
此拦截器基于 Java 正则表达式提供简单的基于字符串的搜索和替换功能。还可以进行回溯/群组捕捉。此拦截器使用与 Java Matcher.replaceAll()方法中相同的规则。
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称必须为 search_replace |
searchPattern | - | 要搜索和替换的模式。 |
replaceString | - | 替换字符串。 |
charset | UTF-8 | 事件正文的字符集。默认假设为 UTF-8。 |
配置示例:
a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace
# Remove leading alphanumeric characters in an event body.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
a1.sources.avroSrc.interceptors.search-replace.replaceString =
另一个例子:
a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace
# Use grouping operators to reorder and munge words on a line.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+)
a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1
正则表达式过滤拦截器
此拦截器通过将事件主体解释为文本并将文本与配置的正则表达式进行匹配来有选择地过滤事件。提供的正则表达式可用于包括事件或排除事件。
属性名称 | 默认 | 说明 |
---|---|---|
type | - | 组件类型名称必须为 regex_filter |
regex | “。*” | 用于匹配事件的正则表达式 |
excludeEvents | false | 如果为 true,则 regex 确定要排除的事件,否则 regex 确定要包括的事件。 |
正则表达式提取器拦截器
此拦截器使用指定的正则表达式提取正则表达式匹配组,并将匹配组作为 Headers 附加到事件上。它还支持可插入序列化程序,用于在将匹配组添加为事件标头之前对其进行格式化。
属性名称 | 默认 | 说明 | |
---|---|---|---|
type | - | 组件类型名称必须为 regex_extractor | |
regex | - | 用于匹配事件的正则表达式 | |
serializers | - | 以空格分隔的序列化程序列表,用于映射与 Headers 名称匹配并序列化其值。 (参见下面的示例)Flume 为以下序列化程序提供内置支持: org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer | |
serializers。<s1> .type | default | 必须是 default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer), org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer 或实现 org.apache.flume.interceptor.RegexExtractorInterceptorSerializer | 的自定义类的 FQCN |
序列化器。<s1>。 name | - | ||
序列化程序。* | - | 特定于序列化程序的属性 |
序列化器用于将匹配映射到 Headers 名称和格式化 Headers 值;默认情况下,您只需指定 Headers 名称和默认 org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
将会被使用。此序列化程序只是将匹配映射到指定的 Headers 名称,并在正则表达式提取时传递值。您可以使用完全限定的类名(FQCN)将自定义序列化程序实现插入到提取器中,以便以您喜欢的方式格式化匹配。
示例 1:
如果 Flume 事件正文包含 1:2:3.4foobar5
并使用以下配置
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three
提取的事件将包含相同的正文,但会添加以下 Headers one=>1, two=>2, three=>3
示例 2:
如果 Flume 事件正文包含 2012-10-18 18:47:57,614 some log line
并使用以下配置
a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
提取的事件将包含相同的正文,但会添加以下 Headers timestamp=>1350611220000
水槽属性
属性名称 | 默认 | 说明 |
---|---|---|
flume.called.from.service | - | 如果指定了此属性,则 Flume 代理将继续轮询配置文件,即使在预期位置找不到配置文件也是如此。否则,如果配置在预期位置不存在,Flume 代理将终止。设置此属性时不需要属性值(例如,只需指定-Dflume.called.from.service 即可) |
property :flume.called.from.service
Flume 每隔 30 秒定期轮询一次指定配置文件的更改。如果首次轮询现有文件,或者自上次轮询以来现有文件的修改日期发生更改,Flume 代理将从配置文件加载新配置。重命名或移动文件不会更改其修改时间。当 Flume 代理轮询一个不存在的文件时,会发生以下两种情况之一:1。当代理首次轮询不存在的配置文件时,代理将根据 flume.called.from.service 属性执行操作。如果设置了属性,则代理将继续轮询(始终在同一时间 - 每 30 秒)。如果未设置该属性,则代理会立即终止。 ...或... 2.当代理轮询一个不存在的配置文件并且这不是第一次轮询文件时,代理不会对此轮询周期进行配置更改。代理继续轮询而不是终止。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论