- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
亚马逊网络服务(AWS)
Amazon Web Services 提供可以运行 Flink 的云计算服务。
EMR:弹性 MapReduce
Amazon Elastic MapReduce (Amazon EMR)是一种 Web 服务,可以轻松快速地设置 Hadoop 集群。这是在 AWS 上运行 Flink 的 推荐方法 ,因为它负责设置所有内容。
标准 EMR 安装
Flink 是 Amazon EMR 上受支持的应用程序。 亚马逊的文档 描述了配置 Flink,创建和监控集群以及处理作业。
自定义 EMR 安装
Amazon EMR 服务会定期更新到新版本,但可以在库存 EMR 群集中手动安装不可用的 Flink 版本。
创建 EMR 集群
EMR 文档包含 显示如何启动 EMR 群集的示例 。您可以按照该指南安装任何 EMR 版本。您不需要安装 EMR 版本的 All Applications 部分,但可以坚持使用 Core Hadoop 。
注意 访问 S3 存储区需要 在创建 EMR 集群时 配置 IAM 角色 。
在 EMR 群集上安装 Flink
创建群集后,您可以 连接到主节点 并安装 Flink:
- 转到 下载页面 并 下载 与您的 EMR 集群 的 Hadoop 版本匹配的 Flink 二进制版本 ,例如 Hadoop 2.7 for EMR 版本 4.3.0,4.4.0 或 4.5.0。
- 解压缩 Flink 发行版,您可以在 设置 Hadoop 配置目录 后 通过 YARN 部署 Flink 作业 :
HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/streaming/WordCount.jar
S3:简单存储服务
Amazon Simple Storage Service (Amazon S3)为各种用例提供云对象存储。您可以将 S3 与 Flink 一起用于 读取 和 写入数据 以及 流 状态后台 甚至作为 YARN 对象存储。
您可以通过以下格式指定路径来使用常规文件等 S3 对象:
s3://<your-bucket>/<endpoint>
端点可以是单个文件或目录,例如:
// Read from S3 bucket
env.readTextFile("s3://<bucket>/<endpoint>");
// Write to S3 bucket
stream.writeAsText("s3://<bucket>/<endpoint>");
// Use S3 as FsStatebackend
env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"));
请注意,这些示例 并非 详尽无遗,您也可以在其他地方使用 S3,包括 高可用性设置 或 RocksDBStateBackend ; Flink 期望文件系统 URI 到处都是。
对于大多数用例,您可以使用我们的 shaded flink-s3-fs-hadoop
和 flink-s3-fs-presto
S3 文件系统打包器之一,这些打包器非常容易设置。但是,对于某些情况,例如,使用 S3 作为 YARN 的资源存储目录,可能需要设置特定的 Hadoop S3 FileSystem 实现。两种方式如下所述。
Shaded Hadoop / Presto S3 文件系统(推荐)
注意: 如果 在 EMR 上运行 Flink, 则无需手动配置。
要使用 flink-s3-fs-hadoop
或 flink-s3-fs-presto
,请在启动 Flink 之前将相应的 JAR 文件从 opt
目录复制 到 lib
Flink 分发的目录,例如
cp ./opt/flink-s3-fs-presto-1.7-SNAPSHOT.jar ./lib/
双方 flink-s3-fs-hadoop
并 flink-s3-fs-presto
注册与 URI 的默认文件系统的打包 s3://
方案, flink-s3-fs-hadoop
也注册了 s3a://
。
配置访问凭据
设置 S3 FileSystem 打包器后,您需要确保允许 Flink 访问您的 S3 存储桶。
身份和访问管理(IAM)(推荐)
在 AWS 上设置凭据的推荐方法是通过 身份和访问管理(IAM) 。您可以使用 IAM 函数为 Flink 实例安全地提供他们访问 S3 存储桶所需的凭据。有关如何执行此 算子操作的详细信息超出了本文档的范围。请参阅 AWS 用户指南。您正在寻找的是 IAM 角色 。
如果您正确设置此选项,则可以在 AWS 中管理对 S3 的访问,并且不需要将任何访问 Keys 分发给 Flink。
访问 Keys(气馁)
可以通过您的 访问和 Keys 对 授予对 S3 的 访问权限 。请注意,自从 引入 IAM 角色 以来,不鼓励这样做。
您需要同时配置 s3.access-key
和 s3.secret-key
在 Flink 的 flink-conf.yaml
:
s3.access-key: your-access-key
s3.secret-key: your-secret-key
Hadoop 提供的 S3 文件系统 - 手动设置
注意: 如果 在 EMR 上运行 Flink, 则无需手动配置。
这个设置有点复杂,我们建议使用我们的阴影 Hadoop / Presto 文件系统(见上文),除非另有要求,例如通过 fs.defaultFS
Hadoop 中的配置属性将 S3 用作 YARN 的资源存储目录 core-site.xml
。
设置 S3 FileSystem
与 S3 的交互通过 Hadoop 的 S3 FileSystem 客户端之一进行 :
S3AFileSystem
( 推荐 用于 Hadoop 2.7 及更高版本):用于在内部使用 Amazon SDK 读取和写入常规文件的文件系统。没有最大文件大小并且与 IAM 角色一起使用。NativeS3FileSystem
(对于 Hadoop 2.6 及更早版本):用于读写常规文件的文件系统。最大对象大小为 5GB,不适用于 IAM 角色。
S3AFileSystem
(推荐的)
这是推荐使用的 S3 FileSystem 实现。它在内部使用 Amazon 的 SDK 并与 IAM 角色一起使用(请参阅 配置访问凭据 )。
您需要将 Flink 指向有效的 Hadoop 配置,该配置包含以下属性 core-site.xml
:
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3\. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
</configuration>
这将注册 S3AFileSystem
为具有该 s3a://
方案的 URI 的默认文件系统。
NativeS3FileSystem
此文件系统仅限于最大 5GB 的文件,并且不适用于 IAM 角色(请参阅 配置访问凭据 ),这意味着您必须在 Hadoop 配置文件中手动配置 AWS 凭据。
您需要将 Flink 指向有效的 Hadoop 配置,该配置包含以下属性 core-site.xml
:
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>
这将注册 NativeS3FileSystem
为具有该 s3://
方案的 URI 的默认文件系统。
Hadoop 配置
例如,您可以通过各种方式指定 Hadoop 配置 ,将 Flink 指向 Hadoop 配置目录的路径
- 通过设置环境变量
HADOOP_CONF_DIR
,或 - 通过
fs.hdfs.hadoopconf
在flink-conf.yaml
以下位置设置配置选项:
fs.hdfs.hadoopconf: /path/to/etc/hadoop
这将 /path/to/etc/hadoop
在 Flink 中注册为 Hadoop 的配置目录。Flink 将查找指定目录中的 core-site.xml
和 hdfs-site.xml
文件。
配置访问凭据
注意: 如果 在 EMR 上运行 Flink, 则无需手动配置。
设置 S3 FileSystem 后,您需要确保允许 Flink 访问您的 S3 存储桶。
身份和访问管理(IAM)(推荐)
使用时 S3AFileSystem
,在 AWS 上设置凭据的推荐方法是通过 身份和访问管理(IAM) 。您可以使用 IAM 函数为 Flink 实例安全地提供他们访问 S3 存储桶所需的凭据。有关如何执行此 算子操作的详细信息超出了本文档的范围。请参阅 AWS 用户指南。您正在寻找的是 IAM 角色 。
如果您正确设置此选项,则可以在 AWS 中管理对 S3 的访问,并且不需要将任何访问 Keys 分发给 Flink。
请注意,这只适用于 S3AFileSystem
而不是 NativeS3FileSystem
。
访问 Keys S3AFileSystem
(不鼓励)
可以通过您的 访问和 Keys 对 授予对 S3 的 访问权限 。请注意,自从 引入 IAM 角色 以来,不鼓励这样做。
对于 S3AFileSystem
您需要配置 fs.s3a.access.key
并 fs.s3a.secret.key
在 Hadoop 的 core-site.xml
:
<property>
<name>fs.s3a.access.key</name>
<value></value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value></value>
</property>
访问 Keys NativeS3FileSystem
(不鼓励)
可以通过您的 访问和 Keys 对 授予对 S3 的 访问权限 。但是这是不鼓励的,你应该使用 S3AFileSystem
所需的 IAM 角色 。
对于 NativeS3FileSystem
您需要配置 fs.s3.awsAccessKeyId
并 fs.s3.awsSecretAccessKey
在 Hadoop 的 core-site.xml
:
<property>
<name>fs.s3.awsAccessKeyId</name>
<value></value>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<value></value>
</property>
提供 S3 FileSystem 依赖关系
注意: 如果 在 EMR 上运行 Flink, 则无需手动配置。
Hadoop 的 S3 FileSystem 客户端打包在 hadoop-aws
工件中(Hadoop 2.6 及更高版本)。需要将此 JAR 及其所有依赖项添加到 Flink 的类路径中,即 Job 和 TaskManagers 的类路径。根据您使用的 FileSystem 实现以及您使用的 Flink 和 Hadoop 版本,您需要提供不同的依赖项(请参阅下文)。
有多种方法可以将 JAR 添加到 Flink 的类路径中,最简单的方法就是将 JAR 放在 Flink 的 lib
文件夹中。您需要复制 hadoop-aws
具有所有依赖项的 JAR。您还可以 HADOOP_CLASSPATH
在所有计算机上将包含这些 JAR 的目录导出为环境变量的一部分。
Flink for Hadoop 2.7
根据您使用的文件系统,请添加以下依赖项。您可以在以下位置找到这些作为 Hadoop 二进制文件的一部分 hadoop-2.7/share/hadoop/tools/lib
:
S3AFileSystem
:hadoop-aws-2.7.3.jar
aws-java-sdk-s3-1.11.183.jar
及其依赖:aws-java-sdk-core-1.11.183.jar
aws-java-sdk-kms-1.11.183.jar
jackson-annotations-2.6.7.jar
jackson-core-2.6.7.jar
jackson-databind-2.6.7.jar
joda-time-2.8.1.jar
httpcore-4.4.4.jar
httpclient-4.5.3.jar
NativeS3FileSystem
:hadoop-aws-2.7.3.jar
guava-11.0.2.jar
请注意, hadoop-common
它可作为 Flink 的一部分提供,但番石榴被 Flink 遮蔽。
Flink for Hadoop 2.6
根据您使用的文件系统,请添加以下依赖项。您可以在以下位置找到这些作为 Hadoop 二进制文件的一部分 hadoop-2.6/share/hadoop/tools/lib
:
S3AFileSystem
:hadoop-aws-2.6.4.jar
aws-java-sdk-1.7.4.jar
及其依赖:jackson-annotations-2.1.1.jar
jackson-core-2.1.1.jar
jackson-databind-2.1.1.jar
joda-time-2.2.jar
httpcore-4.2.5.jar
httpclient-4.2.5.jar
NativeS3FileSystem
:hadoop-aws-2.6.4.jar
guava-11.0.2.jar
请注意, hadoop-common
它可作为 Flink 的一部分提供,但番石榴被 Flink 遮蔽。
Flink for Hadoop 2.4 及更早版本
这些 Hadoop 版本只支持 NativeS3FileSystem
。这是预装了 Flink for Hadoop 2 的一部分 hadoop-common
。您不需要向类路径添加任何内容。
常见问题
以下部分列出了在 AWS 上使用 Flink 时的常见问题。
缺少 S3 文件系统配置
如果您的作业提交失败,并显示异常消息,指出 No file system found with scheme s3
这意味着没有为 S3 配置 FileSystem。有关如何正确配置的详细信息,请查看我们的 着色 Hadoop / Presto 或 通用 Hadoop 文件系统的配置部分。
org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error:
No file system found with scheme s3, referenced in file URI 's3://<bucket>/<endpoint>'. [...]
Caused by: java.io.IOException: No file system found with scheme s3,
referenced in file URI 's3://<bucket>/<endpoint>'.
at o.a.f.core.fs.FileSystem.get(FileSystem.java:296)
at o.a.f.core.fs.Path.getFileSystem(Path.java:311)
at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
at o.a.f.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
未指定 AWS 访问 KeysID 和密钥访问 Keys
如果您发现您的作业失败并显示异常 AWS Access Key ID and Secret Access Key must be specified as the username or password
,并且未正确设置您的访问凭据。有关如何配置此信息的详细信息,请参阅我们的 着色 Hadoop / Presto 或 通用 Hadoop 文件系统的访问凭据部分。
org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
Caused by: java.io.IOException: The given file URI (s3://<bucket>/<endpoint>) points to the
HDFS NameNode at <bucket>, but the File System could not be initialized with that address:
AWS Access Key ID and Secret Access Key must be specified as the username or password
(respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId
or fs.s3n.awsSecretAccessKey properties (respectively) [...]
Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must
be specified as the username or password (respectively) of a s3 URL, or by setting
the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively) [...]
at o.a.h.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
at o.a.h.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at o.a.h.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at o.a.h.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source)
at o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330)
at o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
ClassNotFoundException:找不到 NativeS3FileSystem / S3AFileSystem
如果您看到此异常,则 S3 FileSystem 不是 Flink 的类路径的一部分。有关如何正确配置的详细信息,请参阅 S3 FileSystem 相关性部分 。
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
... 25 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2154)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2178)
... 32 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152)
... 33 more
IOException 异常: 400: Bad Request
如果您已正确配置所有内容但获得 Bad Request
异常 且 S3 存储桶位于区域中 eu-central-1
,则可能正在运行 S3 客户端,该客户端不支持 Amazon 的签名版本 4 。
[...]
Caused by: java.io.IOException: s3://<bucket-in-eu-central-1>/<endpoint> : 400 : Bad Request [...]
Caused by: org.jets3t.service.impl.rest.HttpException [...]
要么
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: [...]
这不应该适用于我们的阴影 Hadoop / Presto S3 文件系统,但可以适用于 Hadoop 提供的 S3 文件系统。特别是,所有运行高达 2.7.2 的 Hadoop 版本 NativeS3FileSystem
(取决于 JetS3t 0.9.0
版本 > = 0.9.4 )都会受到影响,但用户也会报告这种情况 S3AFileSystem
。
除了更改存储区域之外,您还可以通过 请求用于请求身份验证的签名版本 4 来解决此问题 ,例如将其添加到 Flink 的 JVM 选项中 flink-conf.yaml
(请参阅 配置 ):
env.java.opts: -Dcom.amazonaws.services.s3.enableV4
org.apache.hadoop.fs.LocalDirAllocator 中的 NullPointerException
此异常通常是由跳过本地缓冲区目录配置引起 fs.s3a.buffer.dir
的 S3AFileSystem
。请参阅 S3AFileSystem 配置 部分以了解如何 S3AFileSystem
正确配置。
[...]
Caused by: java.lang.NullPointerException at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at
o.a.h.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at
o.a.h.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at
o.a.h.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
o.a.h.fs.FileSystem.create(FileSystem.java:907) at
o.a.h.fs.FileSystem.create(FileSystem.java:888) at
o.a.h.fs.FileSystem.create(FileSystem.java:785) at
o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at
o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at
... 25 more
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论