返回介绍

保存点

发布于 2025-05-02 18:19:19 字数 5971 浏览 0 评论 0 收藏

概览

保存点是外部存储的自包含检查点,可用于停止和恢复或更新 Flink 程序。他们使用 Flink 的 检查点机制 来创建流程序状态的(非增量)SNAPSHOT,并将检查点数据和元数据写入外部文件系统。

此页面介绍了触发,恢复和处理保存点所涉及的所有步骤。有关 Flink 如何处理状态和故障的更多详细信息,请查看 Streaming Programs 页面中的 State

注意: 为了允许程序和 Flink 版本之间的升级,请务必查看以下有关 为算子分配 ID 的 部分。

分配算子 ID

这是 强烈建议 您调整自己的方案,作为为了能够在将来升级你的程序在本节中描述。主要的必要更改是通过该 uid(String) 方法手动指定算子 ID 。这些 ID 用于确定每个 算子的状态。

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

如果您未手动指定 ID,则会自动生成这些 ID。只要这些 ID 不变,您就可以从保存点自动恢复。生成的 ID 取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些 ID。

保存点状态

您可以将保存点视为 Operator ID -&gt; State 包含每个有状态 算子的映射:

Operator ID | State
------------+------------------------
source-id   | State of StatefulSource
mapper-id   | State of StatefulMapper

在上面的例子中,打印接收器是无状态的,因此不是保存点状态的一部分。默认情况下,我们尝试将保存点的每个条目映射回新程序。

算子操作

您可以使用 命令行客户端 ,以 触发保存点 , 取消作业用的保存点 , 从保存点恢复 和 处理保存点 。

使用 Flink> = 1.2.0,也可以使用 webui 从保存点恢复 。

触发保存点

触发保存点时,会创建一个新的保存点目录,其中将存储数据和元数据。可以通过 配置默认目标目录 或使用触发器命令指定自定义目标目录来控制此目录的位置(请参阅 :targetDirectory 参数 )。

注意: 目标目录必须是 JobManager(s)和 TaskManager(例如分布式文件系统上的位置)可访问的位置。

例如,使用 FsStateBackendRocksDBStateBackend

# Savepoint target directory

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

/savepoints/

# Savepoint directory

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

/savepoints/savepoint-:shortjobid-:savepointid/

# Savepoint file contains the checkpoint meta data

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

/savepoints/savepoint-:shortjobid-:savepointid/_metadata

# Savepoint state

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

/savepoints/savepoint-:shortjobid-:savepointid/...

注意: 虽然看起来好像可以移动保存点,但由于 _metadata 文件中的绝对路径,目前无法进行保存。请按照 FLINK-5778 了解取消此限制的进度。

请注意,如果使用 MemoryStateBackend ,则元数据 和 保存点状态将存储在 _metadata 文件中。由于它是自包含的,您可以移动文件并从任何位置恢复。

触发保存点

$ bin/flink savepoint :jobId [:targetDirectory]

这将触发具有 ID 的作业的保存点 :jobId ,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。

使用 YARN 触发保存点

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

这将触发具有 ID :jobId 和 YARN 应用程序 ID 的作业的保存点 :yarnAppId ,并返回创建的保存点的路径。

使用 Savepoint 取消作业

$ bin/flink cancel -s [:targetDirectory] :jobId

这将以原子方式触发具有 ID 的作业的保存点 :jobid 并取消作业。此外,您可以指定目标文件系统目录以存储保存点。该目录需要可由 JobManager 和 TaskManager 访问。

从 Savepoints 恢复

$ bin/flink run -s :savepointPath [:runArgs]

这将提交作业并指定要从中恢复的保存点。您可以指定保存点目录或 _metadata 文件的路径。

允许未恢复状态

默认情况下,resume 算子操作将尝试将保存点的所有状态映射回要恢复的程序。如果删除了 算子,则可以通过 --allowNonRestoredState (short -n :) 选项跳过无法映射到新程序的状态:

$ bin/flink run -s :savepointPath -n [:runArgs]

处理保存点

$ bin/flink savepoint -d :savepointPath

这将处理存储的保存点 :savepointPath

请注意,也可以通过常规文件系统 算子操作手动删除保存点,而不会影响其他保存点或检查点(请记住,每个保存点都是自包含的)。直到 Flink 1.2,这是一个更乏味的任务,使用上面的 savepoint 命令执行。

配置

您可以通过 state.savepoints.dir Keys 配置默认保存点目标目录。触发保存点时,此目录将用于存储保存点。您可以通过使用触发器命令指定自定义目标目录来覆盖默认值(请参阅 :targetDirectory 参数 )。

# Default savepoint target directory

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

state.savepoints.dir: hdfs:///flink/savepoints

如果既未配置缺省值也未指定自定义目标目录,则触发保存点将失败。

注意: 目标目录必须是 JobManager(s)和 TaskManager(例如分布式文件系统上的位置)可访问的位置。

常问问题

我应该为我的工作中的所有算子分配 ID 吗?

根据经验,是的。严格地说,仅通过该 uid 方法将 ID 分配给作业中的有状态 算子就足够了。保存点仅包含这些 算子的状态,无状态 算子不是保存点的一部分。

在实践中,建议将其分配给所有 算子,因为 Flink 的一些内置 算子(如 Window 算子)也是有状态的,并且不清楚哪些内置 算子实际上是有状态的,哪些不是。如果您完全确定算子是无状态的,则可以跳过该 uid 方法。

如果我在工作中添加一个需要状态的新算子,会发生什么?

当您向作业添加新算子时,它将在没有任何状态的情况下进行初始化。保存点包含每个有状态 算子的状态。无状态 算子根本不属于保存点。新 算子的行为类似于无状态 算子。

如果我删除一个有我工作状态的算子,会发生什么?

默认情况下,保存点还原将尝试将所有状态匹配回还原的作业。如果从包含已删除算子状态的保存点还原,则会因此失败。

您可以通过使用 run 命令设置 --allowNonRestoredState (short :) 来允许非恢复状态 -n

$ bin/flink run -s :savepointPath -n [:runArgs]

如果我在工作中重新排序有状态 算子会发生什么?

如果您为这些 算子分配了 ID,它们将照常恢复。

如果您没有分配 ID,则重新排序后,有状态 算子的自动生成 ID 很可能会更改。这将导致您无法从以前的保存点恢复。

如果我添加,删除或重新排序在我的工作中没有状态的 算子会发生什么?

如果为有状态 算子分配了 ID,则无状态 算子不会影响保存点还原。

如果您没有分配 ID,则重新排序后,有状态 算子的自动生成 ID 很可能会更改。这将导致您无法从以前的保存点恢复。

当我在恢复时更改程序的并行性时会发生什么?

如果使用 Flink> = 1.2.0 触发保存点并且不使用弃用状态 API Checkpointed ,则只需从保存点恢复程序并指定新的并行度即可。

如果从 Flink <1.2.0 触发的保存点恢复或使用现已弃用的 API,则首先必须将作业和保存点迁移到 Flink> = 1.2.0,然后才能更改并行度。请参阅 升级作业和 Flink 版本指南

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。