返回介绍

亚马逊网络服务(AWS)

发布于 2025-05-02 18:19:18 字数 19504 浏览 0 评论 0 收藏

Amazon Web Services 提供可以运行 Flink 的云计算服务。

EMR:弹性 MapReduce

Amazon Elastic MapReduce (Amazon EMR)是一种 Web 服务,可以轻松快速地设置 Hadoop 集群。这是在 AWS 上运行 Flink 的 推荐方法 ,因为它负责设置所有内容。

标准 EMR 安装

Flink 是 Amazon EMR 上受支持的应用程序。 亚马逊的文档 描述了配置 Flink,创建和监控集群以及处理作业。

自定义 EMR 安装

Amazon EMR 服务会定期更新到新版本,但可以在库存 EMR 群集中手动安装不可用的 Flink 版本。

创建 EMR 集群

EMR 文档包含 显示如何启动 EMR 群集的示例 。您可以按照该指南安装任何 EMR 版本。您不需要安装 EMR 版本的 All Applications 部分,但可以坚持使用 Core Hadoop 。

注意 访问 S3 存储区需要 在创建 EMR 集群时 配置 IAM 角色

在 EMR 群集上安装 Flink

创建群集后,您可以 连接到主节点 并安装 Flink:

  1. 转到 下载页面下载 与您的 EMR 集群 的 Hadoop 版本匹配的 Flink 二进制版本 ,例如 Hadoop 2.7 for EMR 版本 4.3.0,4.4.0 或 4.5.0。
  2. 解压缩 Flink 发行版,您可以在 设置 Hadoop 配置目录 后 通过 YARN 部署 Flink 作业 :
HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/streaming/WordCount.jar

S3:简单存储服务

Amazon Simple Storage Service (Amazon S3)为各种用例提供云对象存储。您可以将 S3 与 Flink 一起用于 读取写入数据 以及 状态后台 甚至作为 YARN 对象存储。

您可以通过以下格式指定路径来使用常规文件等 S3 对象:

s3://<your-bucket>/<endpoint>

端点可以是单个文件或目录,例如:

// Read from S3 bucket
env.readTextFile("s3://<bucket>/<endpoint>");

// Write to S3 bucket
stream.writeAsText("s3://<bucket>/<endpoint>");

// Use S3 as FsStatebackend
env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"));

请注意,这些示例 并非 详尽无遗,您也可以在其他地方使用 S3,包括 高可用性设置RocksDBStateBackend ; Flink 期望文件系统 URI 到处都是。

对于大多数用例,您可以使用我们的 shaded flink-s3-fs-hadoopflink-s3-fs-presto S3 文件系统打包器之一,这些打包器非常容易设置。但是,对于某些情况,例如,使用 S3 作为 YARN 的资源存储目录,可能需要设置特定的 Hadoop S3 FileSystem 实现。两种方式如下所述。

Shaded Hadoop / Presto S3 文件系统(推荐)

注意: 如果 在 EMR 上运行 Flink, 则无需手动配置。

要使用 flink-s3-fs-hadoopflink-s3-fs-presto ,请在启动 Flink 之前将相应的 JAR 文件从 opt 目录复制 到 lib Flink 分发的目录,例如

cp ./opt/flink-s3-fs-presto-1.7-SNAPSHOT.jar ./lib/

双方 flink-s3-fs-hadoopflink-s3-fs-presto 注册与 URI 的默认文件系统的打包 s3:// 方案, flink-s3-fs-hadoop 也注册了 s3a://

配置访问凭据

设置 S3 FileSystem 打包器后,您需要确保允许 Flink 访问您的 S3 存储桶。

身份和访问管理(IAM)(推荐)

在 AWS 上设置凭据的推荐方法是通过 身份和访问管理(IAM) 。您可以使用 IAM 函数为 Flink 实例安全地提供他们访问 S3 存储桶所需的凭据。有关如何执行此 算子操作的详细信息超出了本文档的范围。请参阅 AWS 用户指南。您正在寻找的是 IAM 角色

如果您正确设置此选项,则可以在 AWS 中管理对 S3 的访问,并且不需要将任何访问 Keys 分发给 Flink。

访问 Keys(气馁)

可以通过您的 访问和 Keys 对 授予对 S3 的 访问权限 。请注意,自从 引入 IAM 角色 以来,不鼓励这样做。

您需要同时配置 s3.access-keys3.secret-key 在 Flink 的 flink-conf.yaml

s3.access-key: your-access-key
s3.secret-key: your-secret-key

Hadoop 提供的 S3 文件系统 - 手动设置

注意: 如果 在 EMR 上运行 Flink, 则无需手动配置。

这个设置有点复杂,我们建议使用我们的阴影 Hadoop / Presto 文件系统(见上文),除非另有要求,例如通过 fs.defaultFS Hadoop 中的配置属性将 S3 用作 YARN 的资源存储目录 core-site.xml

设置 S3 FileSystem

与 S3 的交互通过 Hadoop 的 S3 FileSystem 客户端之一进行

  1. S3AFileSystem推荐 用于 Hadoop 2.7 及更高版本):用于在内部使用 Amazon SDK 读取和写入常规文件的文件系统。没有最大文件大小并且与 IAM 角色一起使用。
  2. NativeS3FileSystem (对于 Hadoop 2.6 及更早版本):用于读写常规文件的文件系统。最大对象大小为 5GB,不适用于 IAM 角色。
S3AFileSystem (推荐的)

这是推荐使用的 S3 FileSystem 实现。它在内部使用 Amazon 的 SDK 并与 IAM 角色一起使用(请参阅 配置访问凭据 )。

您需要将 Flink 指向有效的 Hadoop 配置,该配置包含以下属性 core-site.xml

<configuration>

<property>
  <name>fs.s3.impl</name>
  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>

<!-- Comma separated list of local directories used to buffer
   large results prior to transmitting them to S3\. -->
<property>
  <name>fs.s3a.buffer.dir</name>
  <value>/tmp</value>
</property>

</configuration>

这将注册 S3AFileSystem 为具有该 s3a:// 方案的 URI 的默认文件系统。

NativeS3FileSystem

此文件系统仅限于最大 5GB 的文件,并且不适用于 IAM 角色(请参阅 配置访问凭据 ),这意味着您必须在 Hadoop 配置文件中手动配置 AWS 凭据。

您需要将 Flink 指向有效的 Hadoop 配置,该配置包含以下属性 core-site.xml

<property>
  <name>fs.s3.impl</name>
  <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>

这将注册 NativeS3FileSystem 为具有该 s3:// 方案的 URI 的默认文件系统。

Hadoop 配置

例如,您可以通过各种方式指定 Hadoop 配置 ,将 Flink 指向 Hadoop 配置目录的路径

  • 通过设置环境变量 HADOOP_CONF_DIR ,或
  • 通过 fs.hdfs.hadoopconfflink-conf.yaml 以下位置设置配置选项:
fs.hdfs.hadoopconf: /path/to/etc/hadoop

这将 /path/to/etc/hadoop 在 Flink 中注册为 Hadoop 的配置目录。Flink 将查找指定目录中的 core-site.xmlhdfs-site.xml 文件。

配置访问凭据

注意: 如果 在 EMR 上运行 Flink, 则无需手动配置。

设置 S3 FileSystem 后,您需要确保允许 Flink 访问您的 S3 存储桶。

身份和访问管理(IAM)(推荐)

使用时 S3AFileSystem ,在 AWS 上设置凭据的推荐方法是通过 身份和访问管理(IAM) 。您可以使用 IAM 函数为 Flink 实例安全地提供他们访问 S3 存储桶所需的凭据。有关如何执行此 算子操作的详细信息超出了本文档的范围。请参阅 AWS 用户指南。您正在寻找的是 IAM 角色

如果您正确设置此选项,则可以在 AWS 中管理对 S3 的访问,并且不需要将任何访问 Keys 分发给 Flink。

请注意,这只适用于 S3AFileSystem 而不是 NativeS3FileSystem

访问 Keys S3AFileSystem (不鼓励)

可以通过您的 访问和 Keys 对 授予对 S3 的 访问权限 。请注意,自从 引入 IAM 角色 以来,不鼓励这样做。

对于 S3AFileSystem 您需要配置 fs.s3a.access.keyfs.s3a.secret.key 在 Hadoop 的 core-site.xml

<property>
  <name>fs.s3a.access.key</name>
  <value></value>
</property>

<property>
  <name>fs.s3a.secret.key</name>
  <value></value>
</property>
访问 Keys NativeS3FileSystem (不鼓励)

可以通过您的 访问和 Keys 对 授予对 S3 的 访问权限 。但是这是不鼓励的,你应该使用 S3AFileSystem 所需的 IAM 角色

对于 NativeS3FileSystem 您需要配置 fs.s3.awsAccessKeyIdfs.s3.awsSecretAccessKey 在 Hadoop 的 core-site.xml

<property>
  <name>fs.s3.awsAccessKeyId</name>
  <value></value>
</property>

<property>
  <name>fs.s3.awsSecretAccessKey</name>
  <value></value>
</property>

提供 S3 FileSystem 依赖关系

注意: 如果 在 EMR 上运行 Flink, 则无需手动配置。

Hadoop 的 S3 FileSystem 客户端打包在 hadoop-aws 工件中(Hadoop 2.6 及更高版本)。需要将此 JAR 及其所有依赖项添加到 Flink 的类路径中,即 Job 和 TaskManagers 的类路径。根据您使用的 FileSystem 实现以及您使用的 Flink 和 Hadoop 版本,您需要提供不同的依赖项(请参阅下文)。

有多种方法可以将 JAR 添加到 Flink 的类路径中,最简单的方法就是将 JAR 放在 Flink 的 lib 文件夹中。您需要复制 hadoop-aws 具有所有依赖项的 JAR。您还可以 HADOOP_CLASSPATH 在所有计算机上将包含这些 JAR 的目录导出为环境变量的一部分。

Flink for Hadoop 2.7

根据您使用的文件系统,请添加以下依赖项。您可以在以下位置找到这些作为 Hadoop 二进制文件的一部分 hadoop-2.7/share/hadoop/tools/lib

  • S3AFileSystem
    • hadoop-aws-2.7.3.jar
    • aws-java-sdk-s3-1.11.183.jar 及其依赖:
      • aws-java-sdk-core-1.11.183.jar
      • aws-java-sdk-kms-1.11.183.jar
      • jackson-annotations-2.6.7.jar
      • jackson-core-2.6.7.jar
      • jackson-databind-2.6.7.jar
      • joda-time-2.8.1.jar
      • httpcore-4.4.4.jar
      • httpclient-4.5.3.jar
  • NativeS3FileSystem
    • hadoop-aws-2.7.3.jar
    • guava-11.0.2.jar

请注意, hadoop-common 它可作为 Flink 的一部分提供,但番石榴被 Flink 遮蔽。

Flink for Hadoop 2.6

根据您使用的文件系统,请添加以下依赖项。您可以在以下位置找到这些作为 Hadoop 二进制文件的一部分 hadoop-2.6/share/hadoop/tools/lib

  • S3AFileSystem
    • hadoop-aws-2.6.4.jar
    • aws-java-sdk-1.7.4.jar 及其依赖:
      • jackson-annotations-2.1.1.jar
      • jackson-core-2.1.1.jar
      • jackson-databind-2.1.1.jar
      • joda-time-2.2.jar
      • httpcore-4.2.5.jar
      • httpclient-4.2.5.jar
  • NativeS3FileSystem
    • hadoop-aws-2.6.4.jar
    • guava-11.0.2.jar

请注意, hadoop-common 它可作为 Flink 的一部分提供,但番石榴被 Flink 遮蔽。

Flink for Hadoop 2.4 及更早版本

这些 Hadoop 版本只支持 NativeS3FileSystem 。这是预装了 Flink for Hadoop 2 的一部分 hadoop-common 。您不需要向类路径添加任何内容。

常见问题

以下部分列出了在 AWS 上使用 Flink 时的常见问题。

缺少 S3 文件系统配置

如果您的作业提交失败,并显示异常消息,指出 No file system found with scheme s3 这意味着没有为 S3 配置 FileSystem。有关如何正确配置的详细信息,请查看我们的 着色 Hadoop / Presto通用 Hadoop 文件系统的配置部分。

org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
  Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error:
  No file system found with scheme s3, referenced in file URI 's3://<bucket>/<endpoint>'. [...]
Caused by: java.io.IOException: No file system found with scheme s3,
  referenced in file URI 's3://<bucket>/<endpoint>'.
  at o.a.f.core.fs.FileSystem.get(FileSystem.java:296)
  at o.a.f.core.fs.Path.getFileSystem(Path.java:311)
  at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
  at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
  at o.a.f.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)

未指定 AWS 访问 KeysID 和密钥访问 Keys

如果您发现您的作业失败并显示异常 AWS Access Key ID and Secret Access Key must be specified as the username or password ,并且未正确设置您的访问凭据。有关如何配置此信息的详细信息,请参阅我们的 着色 Hadoop / Presto通用 Hadoop 文件系统的访问凭据部分。

org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
  Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
Caused by: java.io.IOException: The given file URI (s3://<bucket>/<endpoint>) points to the
  HDFS NameNode at <bucket>, but the File System could not be initialized with that address:
  AWS Access Key ID and Secret Access Key must be specified as the username or password
  (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId
  or fs.s3n.awsSecretAccessKey properties (respectively) [...]
Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must
  be specified as the username or password (respectively) of a s3 URL, or by setting
  the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively) [...]
  at o.a.h.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
  at o.a.h.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at o.a.h.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
  at o.a.h.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
  at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source)
  at o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330)
  at o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)

ClassNotFoundException:找不到 NativeS3FileSystem / S3AFileSystem

如果您看到此异常,则 S3 FileSystem 不是 Flink 的类路径的一部分。有关如何正确配置的详细信息,请参阅 S3 FileSystem 相关性部分

Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186)
  at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
  at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
  at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
  at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
  at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
  at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
  at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
  ... 25 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2154)
  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2178)
  ... 32 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152)
  ... 33 more

IOException 异常: 400: Bad Request

如果您已正确配置所有内容但获得 Bad Request 异常 S3 存储桶位于区域中 eu-central-1 ,则可能正在运行 S3 客户端,该客户端不支持 Amazon 的签名版本 4

[...]
Caused by: java.io.IOException: s3://<bucket-in-eu-central-1>/<endpoint> : 400 : Bad Request [...]
Caused by: org.jets3t.service.impl.rest.HttpException [...]

要么

com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: [...]

这不应该适用于我们的阴影 Hadoop / Presto S3 文件系统,但可以适用于 Hadoop 提供的 S3 文件系统。特别是,所有运行高达 2.7.2 的 Hadoop 版本 NativeS3FileSystem (取决于 JetS3t 0.9.0 版本 > = 0.9.4 )都会受到影响,但用户也会报告这种情况 S3AFileSystem

除了更改存储区域之外,您还可以通过 请求用于请求身份验证的签名版本 4 来解决此问题 ,例如将其添加到 Flink 的 JVM 选项中 flink-conf.yaml (请参阅 配置 ):

env.java.opts: -Dcom.amazonaws.services.s3.enableV4

org.apache.hadoop.fs.LocalDirAllocator 中的 NullPointerException

此异常通常是由跳过本地缓冲区目录配置引起 fs.s3a.buffer.dirS3AFileSystem 。请参阅 S3AFileSystem 配置 部分以了解如何 S3AFileSystem 正确配置。

[...]
Caused by: java.lang.NullPointerException at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at
o.a.h.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at
o.a.h.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at
o.a.h.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
o.a.h.fs.FileSystem.create(FileSystem.java:907) at
o.a.h.fs.FileSystem.create(FileSystem.java:888) at
o.a.h.fs.FileSystem.create(FileSystem.java:785) at
o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at
o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at
... 25 more

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。