- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
YARN 设置
快速开始
在 YARN 上启动长时间运行的 Flink 集群
与 4 个 TaskManager(每个具有 4 GB 的 Heapspace)启动 YARN 会话:
# get the hadoop2 package from the Flink download page at
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
# http://flink.apache.org/downloads.html
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
curl -O <flink_hadoop2_download_url>
tar xvzf flink-1.7-SNAPSHOT-bin-hadoop2.tgz
cd flink-1.7-SNAPSHOT/
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
指定 -s
每个 TaskManager 的处理槽数的标志。我们建议将插槽数设置为每台计算机的处理器数。
会话启动后,您可以使用该 ./bin/flink
工具将作业提交到群集。
在 YARN 上运行 Flink 作业
# get the hadoop2 package from the Flink download page at
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
# http://flink.apache.org/downloads.html
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
curl -O <flink_hadoop2_download_url>
tar xvzf flink-1.7-SNAPSHOT-bin-hadoop2.tgz
cd flink-1.7-SNAPSHOT/
./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar
Flink YARN Session
Apache Hadoop YARN 是一个集群资源管理框架。它允许在群集上运行各种分布式应用程序。Flink 在其他应用程序旁边的 YARN 上运行。如果已经有 YARN 设置,用户不必设置或安装任何东西。
要求
- 至少是 Apache Hadoop 2.2
- HDFS(Hadoop 分布式文件系统)(或 Hadoop 支持的其他分布式文件系统)
如果您在使用 Flink YARN 客户端时遇到麻烦,请查看 FAQ 部分 。
启动 Flink 会话
请按照以下说明了解如何在 YARN 群集中启动 Flink 会话。
会话将启动所有必需的 Flink 服务(JobManager 和 TaskManagers),以便您可以将程序提交到群集。请注意,您可以在每个会话中运行多个程序。
下载 Flink
从 下载页面下载 Hadoop> = 2 的 Flink 软件包。它包含所需的文件。
使用以下方法解压缩包:
tar xvzf flink-1.7-SNAPSHOT-bin-hadoop2.tgz
cd flink-1.7-SNAPSHOT/
启动一个会话
使用以下命令启动会话
./bin/yarn-session.sh
此命令将显示以下概述:
Usage:
Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <arg> Dynamic properties
-d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-nm,--name Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for HA mode
请注意,客户要求 YARN_CONF_DIR
或 HADOOP_CONF_DIR
环境变量设置为 read YARN 和 HDFS 配置。
示例: 发出以下命令以分配 10 个 TaskManager,每个管理器具有 8 GB 内存和 32 个处理插槽:
./bin/yarn-session.sh -n 10 -tm 8192 -s 32
系统将使用配置 conf/flink-conf.yaml
。如果您想更改某些内容,请按照我们的 配置指南 算子操作 。
YARN 上的 Flink 将覆盖以下配置参数 jobmanager.rpc.address
(因为 JobManager 总是在不同的机器上分配), taskmanager.tmp.dirs
(我们使用 YARN 给出的 tmp 目录)以及 parallelism.default
是否已指定插槽数。
如果您不想更改配置文件以设置配置参数,则可以选择通过 -D
标志传递动态属性。所以你可以这样传递参数: -Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624
。
示例调用启动了 11 个容器(即使只请求了 10 个容器),因为 ApplicationMaster 和 Job Manager 还有一个额外的容器。
在 YARN 群集中部署 Flink 后,它将显示 JobManager 的连接详细信息。
通过停止 unix 进程(使用 CTRL + C)或在客户端输入“stop”来停止 YARN 会话。
如果群集上有足够的资源,则 YARN 上的 Flink 将仅启动所有请求的容器。大多数 YARN 调度程序考虑了所请求的容器内存,一些帐户也考虑了 vcores 的数量。默认情况下,vcores 的数量等于处理 slots( -s
)参数。在 yarn.containers.vcores
允许覆盖 vcores 的数量与自定义值。为了使此参数起作用,您应该在群集中启用 CPU 调度。
分离的 YARN 会话
如果您不想让 Flink YARN 客户端一直运行,也可以启动 分离的 YARN 会话。该参数称为 -d
或 --detached
。
在这种情况下,Flink YARN 客户端将仅向群集提交 Flink,然后自行关闭。请注意,在这种情况下,无法使用 Flink 停止 YARN 会话。
使用 YARN 实用程序( yarn application -kill <appId>
)来停止 YARN 会话。
附加到现有会话
使用以下命令启动会话
./bin/yarn-session.sh
此命令将显示以下概述:
Usage:
Required
-id,--applicationId <yarnAppId> YARN application Id
如前所述, YARN_CONF_DIR
或者 HADOOP_CONF_DIR
必须设置环境变量以读取 YARN 和 HDFS 配置。
示例: 发出以下命令以附加到正在运行的 Flink YARN 会话 application_1463870264508_0029
:
./bin/yarn-session.sh -id application_1463870264508_0029
附加到正在运行的会话使用 YARN ResourceManager 来确定 JobManagerRPC 端口。
通过停止 unix 进程(使用 CTRL + C)或在客户端输入“stop”来停止 YARN 会话。
向 Flink 提交工作
使用以下命令将 Flink 程序提交到 YARN 群集:
./bin/flink
请参阅 命令行客户端 的文档。
该命令将显示如下的帮助菜单:
[...]
Action "run" compiles and runs a program.
Syntax: run [OPTIONS] <jar-file> <arguments>
"run" action arguments:
-c,--class <classname> Class with the program entry point ("main"
method or "getPlan()" method. Only needed
if the JAR file does not specify the class
in its manifest.
-m,--jobmanager <host:port> Address of the JobManager (master) to
which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration
使用 运行 算子操作将作业提交给 YARN。客户端能够确定 JobManager 的地址。在极少数问题的情况下,您还可以使用 -m
参数传递 JobManager 地址。JobManager 地址在 YARN 控制台中可见。
例
wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
./bin/flink run ./examples/batch/WordCount.jar \
hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt
如果出现以下错误,请确保所有 TaskManagers 都已启动:
Exception in thread "main" org.apache.flink.compiler.CompilerException:
Available instances could not be determined from job manager: Connection timed out.
您可以在 JobManager Web 界面中检查 TaskManagers 的数量。该接口的地址打印在 YARN 会话控制台中。
如果一分钟后 TaskManagers 没有出现,您应该使用日志文件调查问题。
在 YARN 上运行单个 Flink 作业
上面的文档描述了如何在 Hadoop YARN 环境中启动 Flink 集群。也可以仅在执行单个作业时在 YARN 中启动 Flink。
请注意,客户端然后期望设置 -yn
值(TaskManagers 的数量)。
例:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
YARN 会话的命令行选项也可用于该 ./bin/flink
工具。它们以 y
or 或 yarn
(对于长参数选项)作为前缀。
注意:您可以通过设置环境变量为每个作业使用不同的配置目录 FLINK_CONF_DIR
。要使用此副本,请 conf
从 Flink 分发中复制目录,并根据每个作业修改日志记录设置。
注意:可以 -m yarn-cluster
与分离的 YARN 提交( -yd
)组合以“触发并忘记”到 YARN 群集的 Flink 作业。在这种情况下,您的应用程序将不会从 ExecutionEnvironment.execute()调用获得任何累加器结果或异常!
用户 jar 和 Classpath
默认情况下,Flink 将在运行单个作业时将用户 jar 包含到系统类路径中。可以使用 yarn.per-job-cluster.include-user-jar
参数控制此行为。
将此设置为 DISABLED
Flink 时,将在用户类路径中包含 jar。
可以通过将参数设置为以下之一来控制类路径中的 user-jar 位置:
ORDER
:(默认)根据字典顺序将 jar 添加到系统类路径。FIRST
:将 jar 添加到系统类路径的开头。LAST
:将 jar 添加到系统类路径的末尾。
Flink 在 YARN 上的恢复行为
Flink 的 YARN 客户端具有以下配置参数来控制容器故障时的行为方式。可以 conf/flink-conf.yaml
使用 -D
参数从启动 YARN 会话时或在启动 YARN 会话时设置这些参数。
yarn.reallocate-failed
:此参数控制 Flink 是否应重新分配失败的 TaskManager 容器。默认值:trueyarn.maximum-failed-containers
:ApplicationMaster 在 YARN 会话失败之前接受的最大失败容器数。默认值:最初请求的 TaskManagers(-n
)的数量。yarn.application-attempts
:ApplicationMaster(+其 TaskManager 容器)尝试的数量。如果此值设置为 1(默认值),则当 Application master 失败时,整个 YARN 会话将失败。较高的值指定 YARN 重新启动 ApplicationMaster 的次数。
调试失败的 YARN 会话
Flink YARN 会话部署失败的原因有很多。配置错误的 Hadoop 设置(HDFS 权限,YARN 配置),版本不兼容(在 Cloudera Hadoop 上运行 Flink 与 vanilla Hadoop 依赖关系)或其他错误。
日志文件
如果 Flink YARN 会话在部署期间失败,则用户必须依赖 Hadoop YARN 的日志记录函数。最有用的函数是 YARN 日志聚合 。要启用它,用户必须将 yarn.log-aggregation-enable
属性设置 true
为 yarn-site.xml
文件。启用后,用户可以使用以下命令检索(失败的)YARN 会话的所有日志文件。
yarn logs -applicationId <application ID>
请注意,会话结束后需要几秒钟才会显示日志。
YARN 客户端控制台和 Web 界面
如果在运行期间发生错误,Flink YARN 客户端还会在终端中打印错误消息(例如,如果 TaskManager 在一段时间后停止工作)。
除此之外,还有 YARN Resource Manager Web 界面(默认情况下在端口 8088 上)。资源管理器 Web 界面的端口由 yarn.resourcemanager.webapp.address
配置值确定。
它允许访问日志文件以运行 YARN 应用程序,并显示失败应用程序的诊断信息。
为特定的 Hadoop 版本构建 YARN 客户端
使用 Hortonworks,Cloudera 或 MapR 等公司的 Hadoop 发行版的用户可能必须针对其特定版本的 Hadoop(HDFS)和 YARN 构建 Flink。有关更多详细信息,请阅读 构建说明 。
在防火墙后面的 YARN 上运行 Flink
某些 YARN 群集使用防火墙来控制群集与网络其余部分之间的网络流量。在这些设置中,Flink 作业只能从群集网络内(防火墙后面)提交到 YARN 会话。如果这对生产使用不可行,Flink 允许为所有相关服务配置端口范围。通过配置这些范围,用户还可以通过防火墙向 Flink 提交作业。
目前,提交工作需要两项服务:
- JobManager(YARN 中的 ApplicationMaster)
- 在 JobManager 中运行的 BlobServer。
向 Flink 提交作业时,BlobServer 会将带有用户代码的 jar 分发给所有工作节点(TaskManagers)。JobManager 自己接收作业并触发执行。
用于指定端口的两个配置参数如下:
yarn.application-master.port
blob.server.port
这两个配置选项接受单个端口(例如:“50010”),范围(“50000-50025”)或两者的组合(“50010,50011,50020-50025,50050-50075”)。
(Hadoop 使用类似的机制,在那里调用配置参数 yarn.app.mapreduce.am.job.client.port-range
。)
背景/内部
本节简要介绍 Flink 和 YARN 如何交互。
YARN 客户端需要访问 Hadoop 配置以连接到 YARN 资源管理器和 HDFS。它使用以下策略确定 Hadoop 配置:
- 测试是否
YARN_CONF_DIR
,HADOOP_CONF_DIR
或HADOOP_CONF_PATH
设置(按顺序)。如果设置了其中一个变量,则使用它们来读取配置。 - 如果上述策略失败(在正确的 YARN 设置中不应该这样),则客户端正在使用
HADOOP_HOME
环境变量。如果已设置,则客户端尝试访问$HADOOP_HOME/etc/hadoop
(Hadoop 2)和$HADOOP_HOME/conf
(Hadoop 1)。
启动新的 Flink YARN 会话时,客户端首先检查所请求的资源(容器和内存)是否可用。之后,它将包含 Flink 和配置的 jar 上传到 HDFS(步骤 1)。
客户端的下一步是请求(步骤 2)YARN 容器以启动 ApplicationMaster (步骤 3)。由于客户端将配置和 jar 文件注册为容器的资源,因此在该特定机器上运行的 YARN 的 NodeManager 将负责准备容器(例如,下载文件)。完成后,将启动 ApplicationMaster (AM)。
该 JobManager 和 AM 在同一容器中运行。一旦它们成功启动,AM 就知道 JobManager(它自己的主机)的地址。它正在为 TaskManagers 生成一个新的 Flink 配置文件(以便它们可以连接到 JobManager)。该文件也上传到 HDFS。此外, AM 容器还提供 Flink 的 Web 界面。YARN 代码分配的所有端口都是 临时端口 。这允许用户并行执行多个 Flink YARN 会话。
之后,AM 开始为 Flink 的 TaskManagers 分配容器,这将从 HDFS 下载 jar 文件和修改后的配置。完成这些步骤后,即可建立 Flink 并准备接受作业。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论