返回介绍

升级应用程序和 Flink 版本

发布于 2025-05-02 18:19:20 字数 8475 浏览 0 评论 0 收藏

Flink DataStream 程序通常设计为长时间运行,例如数周,数月甚至数年。与所有长期运行的服务一样,需要维护 Flink 流应用程序,包括修复错误,实施改进或将应用程序迁移到更高版本的 Flink 集群。

本文档介绍如何更新 Flink 流应用程序以及如何将正在运行的流应用程序迁移到其他 Flink 群集。

重新启动流应用程序

升级流应用程序或将应用程序迁移到其他群集的 算子操作系列基于 Flink 的 Savepoint 函数。保存点是特定时间点应用程序状态的一致 SNAPSHOT。

有两种方法可以从正在运行的流应用程序中获取保存点。

  • 采取保存点并继续处理。
> ./bin/flink savepoint <jobID> [pathToSavepoint]

建议定期获取保存点,以便能够从之前的某个时间点重新启动应用程序。

  • 获取保存点并将应用程序作为单个 算子操作停止。
> ./bin/flink cancel -s [pathToSavepoint] <jobID>

这意味着应用程序在保存点完成后立即取消,即保存点后没有其他检查点。

给定从应用程序获取的保存点,可以从该保存点启动相同或兼容的应用程序(请参阅下面的“ 应用程序状态兼容性” 部分)。从保存点启动应用程序意味着初始化其 算子的状态,并在保存点中保存 算子状态。这是通过使用保存点启动应用程序来完成的。

> ./bin/flink run -d -s [pathToSavepoint] ~/application.jar

启动应用程序的算子在获取保存点时使用原始应用程序的算子状态(即从中获取保存点的应用程序)进行初始化。启动的应用程序从这一点开始继续处理。

注意 :即使 Flink 始终恢复应用程序的状态,它也无法恢复对外部系统的写入。如果从未停止应用程序的保存点恢复,则可能会出现问题。在这种情况下,应用程序可能在获取保存点后发出数据。重新启动的应用程序可能(取决于您是否更改了应用程序逻辑)再次发出相同的数据。根据 SinkFunction 存储系统的 Distinct,此行为的确切影响可能会有很大差异。如果对像 Cassandra 这样的键值存储进行幂等写入 算子操作,则发出两次的数据可能是正常的,但如果附加到像 Kafka 这样的持久日志中则会出现问题。无论如何,您应该仔细检查并测试重新启动的应用程序的行为。

应用状态兼容性

在升级应用程序以修复错误或改进应用程序时,通常的目标是在保存其状态的同时替换正在运行的应用程序的应用程序逻辑。我们通过从原始应用程序中获取的保存点启动升级的应用程序来完成此 算子操作。但是,这仅在两个应用程序都是 状态兼容的 情况下才有效,这意味着升级后的应用程序的 算子能够使用原始应用程序的 算子的状态初始化其状态。

在本节中,我们将讨论如何修改应用程序以保持状态兼容。

匹配算子状态

从保存点重新启动应用程序时,Flink 会将保存点中存储的 算子状态与已启动应用程序的有状态 算子进行匹配。匹配基于算子 ID 完成,算子 ID 也存储在保存点中。每个 算子都有一个默认 ID,该 ID 是从 算子在应用程序 算子拓扑中的位置派生而来的。因此,可以始终从其自己的保存点之一重新启动未修改的应用程序。但是,如果修改了应用程序,则 算子的默认 ID 可能会更改。因此,如果已明确指定了 算子 ID,则只能从保存点启动已修改的应用程序。为 算子分配 ID 非常简单,使用以下 uid(String) 方法完成:

val mappedEvents: DataStream[(Int, Long)] = events
  .map(new MyStatefulMapFunc()).uid("mapper-1")

注意: 由于存储在保存点中的 算子 ID 和要启动的应用程序中的 算子的 ID 必须相等,因此强烈建议为将来可能升级的应用程序的所有 算子分配唯一的 ID。此建议适用于所有 算子,即具有和不具有显式声明的 算子状态的 算子,因为某些 算子具有用户不可见的内部状态。升级没有分配算子 ID 的应用程序要困难得多,并且可能只能通过使用该 setUidHash() 方法的低级解决方法来实现。

重要提示: 从 1.3.x 开始,这也适用于属于链的算子。

默认情况下,存储在保存点中的所有状态必须与启动应用程序的 算子匹配。但是,用户可以明确同意跳过(从而丢弃)从保存点启动应用程序时无法与算子匹配的状态。在保存点中找不到状态的有状态 算子将使用其默认状态进行初始化。

有状态 算子和用户函数

升级应用程序时,可以通过一个限制自由修改用户函数和算子。无法更改 算子状态的数据类型。这很重要,因为从保存点开始的状态在加载到 算子之前(当前)不能转换为不同的数据类型。因此,在升级应用程序时更改算子状态的数据类型会中断应用程序状态一致性,并阻止升级的应用程序从保存点重新启动。

算子状态可以是用户定义的,也可以是内部的。

  • 用户定义的 算子状态: 在具有用户定义的 算子状态的函数中,状态的类型由用户显式定义。虽然无法更改 算子状态的数据类型,但是克服此限制的解决方法可以是定义具有不同数据类型的第二个状态,并实现将状态从原始状态迁移到新状态的逻辑。这种方法需要良好的迁移策略和对 Keys 分区状态 行为的充分理解。
  • 内部 算子状态: 窗口或连接 算子等 算子保持不向用户公开的内部 算子状态。对于这些 算子,内部状态的数据类型取决于 算子的输入或输出类型。因此,更改相应的输入或输出类型会中断应用程序状态一致性并阻止升级。下表列出了具有内部状态的 算子,并显示了状态数据类型与其输入和输出类型的关系。对于应用于被 Key 化的数据流的 算子,键类型(KEY)也始终是状态数据类型的一部分。
算子内部算子状态的数据类型
ReduceFunction [IOT]物联网(输入和输出类型)[,KEY]
FoldFunction [IT,OT]OT(输出类型)[,KEY]
WindowFunction [IT,OT,KEY,WINDOW]IT(输入类型),KEY
AllWindowFunction [IT,OT,WINDOW]IT(输入类型)
JoinFunction [IT1,IT2,OT]IT1,IT2(类型 1 和 2.输入),KEY
CoGroupFunction [IT1,IT2,OT]IT1,IT2(类型 1 和 2.输入),KEY
内置聚合(sum,min,max,minBy,maxBy)输入类型[,KEY]

应用拓扑

除了改变一个或多个现有 算子的逻辑之外,还可以通过更改应用程序的拓扑结构来升级应用程序,即通过添加或删除 算子,更改 算子的并行性或修改 算子链接行为。

通过更改其拓扑来升级应用程序时,需要考虑一些事项以保持应用程序状态的一致性。

  • 添加或删除无状态 算子: 除非以下情况之一适用,否则这没有问题。
  • 添加有状态 算子: 算子 的状态将使用默认状态初始化,除非它接管另一个 算子的状态。
  • 删除有状态 算子: 除非另一个 算子将其删除,否则删除的 算子的状态将丢失。启动升级后的应用程序时,您必须明确同意丢弃该状态。
  • 更改 算子的输入和输出类型: 在具有内部状态的 算子之前或之后添加新 算子时,必须确保不修改有状态 算子的输入或输出类型以保存内部 算子状态的数据类型(详见上文)。
  • 更改算子链接: 算子可以链接在一起以提高性能。从 1.3.x 以后的保存点恢复时,可以在保持状态一致性的同时修改链。有可能打破链条,使有状态的 算子移出链。还可以将新的或现有的有状态 算子附加或注入链中,或修改链中的 算子顺序。但是,将保存点升级到 1.3.x 时,拓扑在链接方面没有变化是至关重要的。应为链中一部分的所有 算子分配一个 ID,如上面的 匹配 算子状态 部分所述。

升级 Flink Framework 版本

本节介绍了跨版本升级 Flink 以及在版本之间迁移作业的一般方法。

简而言之,此过程包括两个基本步骤:

  1. 在以前的旧 Flink 版本中获取要迁移的作业的保存点。
  2. 从先前获取的保存点恢复新 Flink 版本下的作业。

除了这两个基本步骤之外,还可能需要一些额外的步骤,这些步骤取决于您希望更改 Flink 版本的方式。在本指南中,我们区分了两种跨 Flink 版本升级的方法: 就地 升级和卷 影副本 升级。

对于 就地 更新,在获取保存点后,您需要:

  1. 停止/取消所有正在运行的作业
  2. 关闭运行旧 Flink 版本的群集。
  3. 将 Flink 升级到群集上的较新版本。
  4. 在新版本下重新启动群集。

对于卷 影副本 ,您需要:

  1. 在从保存点恢复之前,除了旧的 Flink 安装之外,还要设置新 Flink 版本的新安装。
  2. 使用新的 Flink 安装从保存点恢复。
  3. 如果一切正常,请停止并关闭旧的 Flink 集群。

在下文中,我们将首先介绍成功迁移工作的前提条件,然后详细介绍我们之前概述的步骤。

前提条件

在开始迁移之前,请检查您尝试迁移的作业是否遵循 保存点 的最佳做法。另外,请查看 API 迁移指南 ,了解是否存在与将保存点迁移到较新版本相关的任何 API 更改。

特别是,我们建议您检查是否 uid 为您的工作中的算子设置了明确的 s。

这是一个 软 前置条件,如果您忘记分配 s ,恢复 应该 仍然有效 uid 。如果遇到无效的情况,可以使用该调用 手动 将以前 Flink 版本生成的旧版顶点 ID 添加到作业中 setUidHash(String hash) 。对于每个 算子(在 算子链中:只有头 算子),您必须分配 32 个字符的十六进制字符串,表示您可以在 web ui 中看到的哈希值或 算子的日志。

除了算子 uid 之外,目前有两个 难以 进行的作业迁移前提条件会导致迁移失败:

  1. 我们不支持使用 semi-asynchronous 模式检查点的 RocksDB 中的状态迁移 。如果您的旧作业使用此模式,您仍然可以 fully-asynchronous 在使用用作迁移基础的保存点之前将作业更改为使用 模式。
  2. 另一个 重要的 前提条件是,对于 Flink 1.3.x 之前的保存点,所有保存点数据必须可从新安装访问并驻留在相同的绝对路径下。在 Flink 1.3.x 之前,保存点数据通常不会仅在创建的保存点文件中自包含。可以从保存点文件内部引用其他文件(例如,状态后台 SNAPSHOT 的输出)。自 Flink 1.3.x 以来,这不再是一个限制; 可以使用典型的文件系统 算子操作重定位保存点。

第 1 步:使用旧 Flink 版本中的保存点。

作业迁移的第一个主要步骤是在较旧的 Flink 版本中运行您的作业的保存点。您可以使用以下命令执行此 算子操作:

$ bin/flink savepoint :jobId [:targetDirectory]

有关更多详细信息,请阅读 保存点文档

第 2 步:将群集更新为新的 Flink 版本。

在此步骤中,我们将更新群集的框架版本。这基本上意味着用新版本替换 Flink 安装的内容。此步骤可能取决于您在群集中运行 Flink 的方式(例如,独立,在 Mesos 上......)。

如果您不熟悉在群集中安装 Flink,请阅读 部署和群集设置文档

步骤 3:从保存点恢复新 Flink 版本下的作业。

作为作业迁移的最后一步,您将从上面在更新的群集上获取的保存点恢复。您可以使用以下命令执行此 算子操作:

$ bin/flink run -s :savepointPath [:runArgs]

再次,有关更多详细信息,请查看 保存点文档

兼容性表

保存点与 Flink 版本兼容,如下表所示:

Created with \ Resumed with1.1.x1.2.x1.3.x1.4.x1.5.x1.6.x限制
1.1.xØØØ   从 Flink 1.1.x 迁移到 1.2.x +的作业的最大并行度目前已确定为作业的并行性。这意味着迁移后不能增加并行性。在将来的错误修复版本中可能会删除此限制。
1.2.x ØØØØØ从 Flink 1.2.x 迁移到 Flink 1.3.x +时,不支持同时更改并行性。用户必须在迁移到 Flink 1.3.x +后首先获取保存点,然后更改并行度。为 CEP 应用程序创建的保存点无法在 1.4.x +中恢复。
1.3.x  ØØØØ如果保存点包含 Scala 案例类,则从 Flink 1.3.0 迁移到 Flink 1.4。[0,1]将失败。用户必须直接迁移到 1.4.2+。
1.4.X   ØØØ 
1.5.x    ØØ 
1.6.x 版     Ø

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。