返回介绍

YARN 设置

发布于 2025-05-02 18:19:18 字数 11698 浏览 0 评论 0 收藏

快速开始

在 YARN 上启动长时间运行的 Flink 集群

与 4 个 TaskManager(每个具有 4 GB 的 Heapspace)启动 YARN 会话:

# get the hadoop2 package from the Flink download page at

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

# http://flink.apache.org/downloads.html

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

curl -O <flink_hadoop2_download_url>
tar xvzf flink-1.7-SNAPSHOT-bin-hadoop2.tgz
cd flink-1.7-SNAPSHOT/
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m

指定 -s 每个 TaskManager 的处理槽数的标志。我们建议将插槽数设置为每台计算机的处理器数。

会话启动后,您可以使用该 ./bin/flink 工具将作业提交到群集。

在 YARN 上运行 Flink 作业

# get the hadoop2 package from the Flink download page at

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

# http://flink.apache.org/downloads.html

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

curl -O <flink_hadoop2_download_url>
tar xvzf flink-1.7-SNAPSHOT-bin-hadoop2.tgz
cd flink-1.7-SNAPSHOT/
./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar

Flink YARN Session

Apache Hadoop YARN 是一个集群资源管理框架。它允许在群集上运行各种分布式应用程序。Flink 在其他应用程序旁边的 YARN 上运行。如果已经有 YARN 设置,用户不必设置或安装任何东西。

要求

  • 至少是 Apache Hadoop 2.2
  • HDFS(Hadoop 分布式文件系统)(或 Hadoop 支持的其他分布式文件系统)

如果您在使用 Flink YARN 客户端时遇到麻烦,请查看 FAQ 部分

启动 Flink 会话

请按照以下说明了解如何在 YARN 群集中启动 Flink 会话。

会话将启动所有必需的 Flink 服务(JobManager 和 TaskManagers),以便您可以将程序提交到群集。请注意,您可以在每个会话中运行多个程序。

下载 Flink

下载页面下载 Hadoop> = 2 的 Flink 软件包。它包含所需的文件。

使用以下方法解压缩包:

tar xvzf flink-1.7-SNAPSHOT-bin-hadoop2.tgz
cd flink-1.7-SNAPSHOT/

启动一个会话

使用以下命令启动会话

./bin/yarn-session.sh

此命令将显示以下概述:

Usage:
   Required
   -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
   -D <arg>            Dynamic properties
   -d,--detached           Start detached
   -jm,--jobManagerMemory <arg>  Memory for JobManager Container with optional unit (default: MB)
   -nm,--name            Set a custom name for the application on YARN
   -q,--query            Display available YARN resources (memory, cores)
   -qu,--queue <arg>         Specify YARN queue.
   -s,--slots <arg>        Number of slots per TaskManager
   -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
   -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for HA mode

请注意,客户要求 YARN_CONF_DIRHADOOP_CONF_DIR 环境变量设置为 read YARN 和 HDFS 配置。

示例: 发出以下命令以分配 10 个 TaskManager,每个管理器具有 8 GB 内存和 32 个处理插槽:

./bin/yarn-session.sh -n 10 -tm 8192 -s 32

系统将使用配置 conf/flink-conf.yaml 。如果您想更改某些内容,请按照我们的 配置指南 算子操作

YARN 上的 Flink 将覆盖以下配置参数 jobmanager.rpc.address (因为 JobManager 总是在不同的机器上分配), taskmanager.tmp.dirs (我们使用 YARN 给出的 tmp 目录)以及 parallelism.default 是否已指定插槽数。

如果您不想更改配置文件以设置配置参数,则可以选择通过 -D 标志传递动态属性。所以你可以这样传递参数: -Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624

示例调用启动了 11 个容器(即使只请求了 10 个容器),因为 ApplicationMaster 和 Job Manager 还有一个额外的容器。

在 YARN 群集中部署 Flink 后,它将显示 JobManager 的连接详细信息。

通过停止 unix 进程(使用 CTRL + C)或在客户端输入“stop”来停止 YARN 会话。

如果群集上有足够的资源,则 YARN 上的 Flink 将仅启动所有请求的容器。大多数 YARN 调度程序考虑了所请求的容器内存,一些帐户也考虑了 vcores 的数量。默认情况下,vcores 的数量等于处理 slots( -s )参数。在 yarn.containers.vcores 允许覆盖 vcores 的数量与自定义值。为了使此参数起作用,您应该在群集中启用 CPU 调度。

分离的 YARN 会话

如果您不想让 Flink YARN 客户端一直运行,也可以启动 分离的 YARN 会话。该参数称为 -d--detached

在这种情况下,Flink YARN 客户端将仅向群集提交 Flink,然后自行关闭。请注意,在这种情况下,无法使用 Flink 停止 YARN 会话。

使用 YARN 实用程序( yarn application -kill &lt;appId&gt; )来停止 YARN 会话。

附加到现有会话

使用以下命令启动会话

./bin/yarn-session.sh

此命令将显示以下概述:

Usage:
   Required
   -id,--applicationId <yarnAppId> YARN application Id

如前所述, YARN_CONF_DIR 或者 HADOOP_CONF_DIR 必须设置环境变量以读取 YARN 和 HDFS 配置。

示例: 发出以下命令以附加到正在运行的 Flink YARN 会话 application_1463870264508_0029

./bin/yarn-session.sh -id application_1463870264508_0029

附加到正在运行的会话使用 YARN ResourceManager 来确定 JobManagerRPC 端口。

通过停止 unix 进程(使用 CTRL + C)或在客户端输入“stop”来停止 YARN 会话。

向 Flink 提交工作

使用以下命令将 Flink 程序提交到 YARN 群集:

./bin/flink

请参阅 命令行客户端 的文档。

该命令将显示如下的帮助菜单:

[...]
Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action arguments:
   -c,--class <classname>       Class with the program entry point ("main"
                    method or "getPlan()" method. Only needed
                    if the JAR file does not specify the class
                    in its manifest.
   -m,--jobmanager <host:port>    Address of the JobManager (master) to
                    which to connect. Use this flag to connect
                    to a different JobManager than the one
                    specified in the configuration.
   -p,--parallelism <parallelism>   The parallelism with which to run the
                    program. Optional flag to override the
                    default value specified in the
                    configuration

使用 运行 算子操作将作业提交给 YARN。客户端能够确定 JobManager 的地址。在极少数问题的情况下,您还可以使用 -m 参数传递 JobManager 地址。JobManager 地址在 YARN 控制台中可见。

wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
./bin/flink run ./examples/batch/WordCount.jar \
    hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt

如果出现以下错误,请确保所有 TaskManagers 都已启动:

Exception in thread "main" org.apache.flink.compiler.CompilerException:
  Available instances could not be determined from job manager: Connection timed out.

您可以在 JobManager Web 界面中检查 TaskManagers 的数量。该接口的地址打印在 YARN 会话控制台中。

如果一分钟后 TaskManagers 没有出现,您应该使用日志文件调查问题。

在 YARN 上运行单个 Flink 作业

上面的文档描述了如何在 Hadoop YARN 环境中启动 Flink 集群。也可以仅在执行单个作业时在 YARN 中启动 Flink。

请注意,客户端然后期望设置 -yn 值(TaskManagers 的数量)。

例:

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

YARN 会话的命令行选项也可用于该 ./bin/flink 工具。它们以 y or 或 yarn (对于长参数选项)作为前缀。

注意:您可以通过设置环境变量为每个作业使用不同的配置目录 FLINK_CONF_DIR 。要使用此副本,请 conf 从 Flink 分发中复制目录,并根据每个作业修改日志记录设置。

注意:可以 -m yarn-cluster 与分离的 YARN 提交( -yd )组合以“触发并忘记”到 YARN 群集的 Flink 作业。在这种情况下,您的应用程序将不会从 ExecutionEnvironment.execute()调用获得任何累加器结果或异常!

用户 jar 和 Classpath

默认情况下,Flink 将在运行单个作业时将用户 jar 包含到系统类路径中。可以使用 yarn.per-job-cluster.include-user-jar 参数控制此行为。

将此设置为 DISABLED Flink 时,将在用户类路径中包含 jar。

可以通过将参数设置为以下之一来控制类路径中的 user-jar 位置:

  • ORDER :(默认)根据字典顺序将 jar 添加到系统类路径。
  • FIRST :将 jar 添加到系统类路径的开头。
  • LAST :将 jar 添加到系统类路径的末尾。

Flink 在 YARN 上的恢复行为

Flink 的 YARN 客户端具有以下配置参数来控制容器故障时的行为方式。可以 conf/flink-conf.yaml 使用 -D 参数从启动 YARN 会话时或在启动 YARN 会话时设置这些参数。

  • yarn.reallocate-failed :此参数控制 Flink 是否应重新分配失败的 TaskManager 容器。默认值:true
  • yarn.maximum-failed-containers :ApplicationMaster 在 YARN 会话失败之前接受的最大失败容器数。默认值:最初请求的 TaskManagers( -n )的数量。
  • yarn.application-attempts :ApplicationMaster(+其 TaskManager 容器)尝试的数量。如果此值设置为 1(默认值),则当 Application master 失败时,整个 YARN 会话将失败。较高的值指定 YARN 重新启动 ApplicationMaster 的次数。

调试失败的 YARN 会话

Flink YARN 会话部署失败的原因有很多。配置错误的 Hadoop 设置(HDFS 权限,YARN 配置),版本不兼容(在 Cloudera Hadoop 上运行 Flink 与 vanilla Hadoop 依赖关系)或其他错误。

日志文件

如果 Flink YARN 会话在部署期间失败,则用户必须依赖 Hadoop YARN 的日志记录函数。最有用的函数是 YARN 日志聚合 。要启用它,用户必须将 yarn.log-aggregation-enable 属性设置 trueyarn-site.xml 文件。启用后,用户可以使用以下命令检索(失败的)YARN 会话的所有日志文件。

yarn logs -applicationId <application ID>

请注意,会话结束后需要几秒钟才会显示日志。

YARN 客户端控制台和 Web 界面

如果在运行期间发生错误,Flink YARN 客户端还会在终端中打印错误消息(例如,如果 TaskManager 在一段时间后停止工作)。

除此之外,还有 YARN Resource Manager Web 界面(默认情况下在端口 8088 上)。资源管理器 Web 界面的端口由 yarn.resourcemanager.webapp.address 配置值确定。

它允许访问日志文件以运行 YARN 应用程序,并显示失败应用程序的诊断信息。

为特定的 Hadoop 版本构建 YARN 客户端

使用 Hortonworks,Cloudera 或 MapR 等公司的 Hadoop 发行版的用户可能必须针对其特定版本的 Hadoop(HDFS)和 YARN 构建 Flink。有关更多详细信息,请阅读 构建说明

在防火墙后面的 YARN 上运行 Flink

某些 YARN 群集使用防火墙来控制群集与网络其余部分之间的网络流量。在这些设置中,Flink 作业只能从群集网络内(防火墙后面)提交到 YARN 会话。如果这对生产使用不可行,Flink 允许为所有相关服务配置端口范围。通过配置这些范围,用户还可以通过防火墙向 Flink 提交作业。

目前,提交工作需要两项服务:

  • JobManager(YARN 中的 ApplicationMaster)
  • 在 JobManager 中运行的 BlobServer。

向 Flink 提交作业时,BlobServer 会将带有用户代码的 jar 分发给所有工作节点(TaskManagers)。JobManager 自己接收作业并触发执行。

用于指定端口的两个配置参数如下:

  • yarn.application-master.port
  • blob.server.port

这两个配置选项接受单个端口(例如:“50010”),范围(“50000-50025”)或两者的组合(“50010,50011,50020-50025,50050-50075”)。

(Hadoop 使用类似的机制,在那里调用配置参数 yarn.app.mapreduce.am.job.client.port-range 。)

背景/内部

本节简要介绍 Flink 和 YARN 如何交互。

YARN 客户端需要访问 Hadoop 配置以连接到 YARN 资源管理器和 HDFS。它使用以下策略确定 Hadoop 配置:

  • 测试是否 YARN_CONF_DIRHADOOP_CONF_DIRHADOOP_CONF_PATH 设置(按顺序)。如果设置了其中一个变量,则使用它们来读取配置。
  • 如果上述策略失败(在正确的 YARN 设置中不应该这样),则客户端正在使用 HADOOP_HOME 环境变量。如果已设置,则客户端尝试访问 $HADOOP_HOME/etc/hadoop (Hadoop 2)和 $HADOOP_HOME/conf (Hadoop 1)。

启动新的 Flink YARN 会话时,客户端首先检查所请求的资源(容器和内存)是否可用。之后,它将包含 Flink 和配置的 jar 上传到 HDFS(步骤 1)。

客户端的下一步是请求(步骤 2)YARN 容器以启动 ApplicationMaster (步骤 3)。由于客户端将配置和 jar 文件注册为容器的资源,因此在该特定机器上运行的 YARN 的 NodeManager 将负责准备容器(例如,下载文件)。完成后,将启动 ApplicationMaster (AM)。

该 JobManager 和 AM 在同一容器中运行。一旦它们成功启动,AM 就知道 JobManager(它自己的主机)的地址。它正在为 TaskManagers 生成一个新的 Flink 配置文件(以便它们可以连接到 JobManager)。该文件也上传到 HDFS。此外, AM 容器还提供 Flink 的 Web 界面。YARN 代码分配的所有端口都是 临时端口 。这允许用户并行执行多个 Flink YARN 会话。

之后,AM 开始为 Flink 的 TaskManagers 分配容器,这将从 HDFS 下载 jar 文件和修改后的配置。完成这些步骤后,即可建立 Flink 并准备接受作业。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。