- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
保存点
概览
保存点是外部存储的自包含检查点,可用于停止和恢复或更新 Flink 程序。他们使用 Flink 的 检查点机制 来创建流程序状态的(非增量)SNAPSHOT,并将检查点数据和元数据写入外部文件系统。
此页面介绍了触发,恢复和处理保存点所涉及的所有步骤。有关 Flink 如何处理状态和故障的更多详细信息,请查看 Streaming Programs 页面中的 State 。
注意: 为了允许程序和 Flink 版本之间的升级,请务必查看以下有关 为算子分配 ID 的 部分。
分配算子 ID
这是 强烈建议 您调整自己的方案,作为为了能够在将来升级你的程序在本节中描述。主要的必要更改是通过该 uid(String)
方法手动指定算子 ID 。这些 ID 用于确定每个 算子的状态。
DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID
如果您未手动指定 ID,则会自动生成这些 ID。只要这些 ID 不变,您就可以从保存点自动恢复。生成的 ID 取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些 ID。
保存点状态
您可以将保存点视为 Operator ID -> State
包含每个有状态 算子的映射:
Operator ID | State
------------+------------------------
source-id | State of StatefulSource
mapper-id | State of StatefulMapper
在上面的例子中,打印接收器是无状态的,因此不是保存点状态的一部分。默认情况下,我们尝试将保存点的每个条目映射回新程序。
算子操作
您可以使用 命令行客户端 ,以 触发保存点 , 取消作业用的保存点 , 从保存点恢复 和 处理保存点 。
使用 Flink> = 1.2.0,也可以使用 webui 从保存点恢复 。
触发保存点
触发保存点时,会创建一个新的保存点目录,其中将存储数据和元数据。可以通过 配置默认目标目录 或使用触发器命令指定自定义目标目录来控制此目录的位置(请参阅 :targetDirectory
参数 )。
注意: 目标目录必须是 JobManager(s)和 TaskManager(例如分布式文件系统上的位置)可访问的位置。
例如,使用 FsStateBackend
或 RocksDBStateBackend
:
# Savepoint target directory
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
/savepoints/
# Savepoint directory
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
/savepoints/savepoint-:shortjobid-:savepointid/
# Savepoint file contains the checkpoint meta data
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
/savepoints/savepoint-:shortjobid-:savepointid/_metadata
# Savepoint state
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
/savepoints/savepoint-:shortjobid-:savepointid/...
注意: 虽然看起来好像可以移动保存点,但由于 _metadata
文件中的绝对路径,目前无法进行保存。请按照 FLINK-5778 了解取消此限制的进度。
请注意,如果使用 MemoryStateBackend
,则元数据 和 保存点状态将存储在 _metadata
文件中。由于它是自包含的,您可以移动文件并从任何位置恢复。
触发保存点
$ bin/flink savepoint :jobId [:targetDirectory]
这将触发具有 ID 的作业的保存点 :jobId
,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。
使用 YARN 触发保存点
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
这将触发具有 ID :jobId
和 YARN 应用程序 ID 的作业的保存点 :yarnAppId
,并返回创建的保存点的路径。
使用 Savepoint 取消作业
$ bin/flink cancel -s [:targetDirectory] :jobId
这将以原子方式触发具有 ID 的作业的保存点 :jobid
并取消作业。此外,您可以指定目标文件系统目录以存储保存点。该目录需要可由 JobManager 和 TaskManager 访问。
从 Savepoints 恢复
$ bin/flink run -s :savepointPath [:runArgs]
这将提交作业并指定要从中恢复的保存点。您可以指定保存点目录或 _metadata
文件的路径。
允许未恢复状态
默认情况下,resume 算子操作将尝试将保存点的所有状态映射回要恢复的程序。如果删除了 算子,则可以通过 --allowNonRestoredState
(short -n
:) 选项跳过无法映射到新程序的状态:
$ bin/flink run -s :savepointPath -n [:runArgs]
处理保存点
$ bin/flink savepoint -d :savepointPath
这将处理存储的保存点 :savepointPath
。
请注意,也可以通过常规文件系统 算子操作手动删除保存点,而不会影响其他保存点或检查点(请记住,每个保存点都是自包含的)。直到 Flink 1.2,这是一个更乏味的任务,使用上面的 savepoint 命令执行。
配置
您可以通过 state.savepoints.dir
Keys 配置默认保存点目标目录。触发保存点时,此目录将用于存储保存点。您可以通过使用触发器命令指定自定义目标目录来覆盖默认值(请参阅 :targetDirectory
参数 )。
# Default savepoint target directory
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
state.savepoints.dir: hdfs:///flink/savepoints
如果既未配置缺省值也未指定自定义目标目录,则触发保存点将失败。
注意: 目标目录必须是 JobManager(s)和 TaskManager(例如分布式文件系统上的位置)可访问的位置。
常问问题
我应该为我的工作中的所有算子分配 ID 吗?
根据经验,是的。严格地说,仅通过该 uid
方法将 ID 分配给作业中的有状态 算子就足够了。保存点仅包含这些 算子的状态,无状态 算子不是保存点的一部分。
在实践中,建议将其分配给所有 算子,因为 Flink 的一些内置 算子(如 Window 算子)也是有状态的,并且不清楚哪些内置 算子实际上是有状态的,哪些不是。如果您完全确定算子是无状态的,则可以跳过该 uid
方法。
如果我在工作中添加一个需要状态的新算子,会发生什么?
当您向作业添加新算子时,它将在没有任何状态的情况下进行初始化。保存点包含每个有状态 算子的状态。无状态 算子根本不属于保存点。新 算子的行为类似于无状态 算子。
如果我删除一个有我工作状态的算子,会发生什么?
默认情况下,保存点还原将尝试将所有状态匹配回还原的作业。如果从包含已删除算子状态的保存点还原,则会因此失败。
您可以通过使用 run 命令设置 --allowNonRestoredState
(short :) 来允许非恢复状态 -n
:
$ bin/flink run -s :savepointPath -n [:runArgs]
如果我在工作中重新排序有状态 算子会发生什么?
如果您为这些 算子分配了 ID,它们将照常恢复。
如果您没有分配 ID,则重新排序后,有状态 算子的自动生成 ID 很可能会更改。这将导致您无法从以前的保存点恢复。
如果我添加,删除或重新排序在我的工作中没有状态的 算子会发生什么?
如果为有状态 算子分配了 ID,则无状态 算子不会影响保存点还原。
如果您没有分配 ID,则重新排序后,有状态 算子的自动生成 ID 很可能会更改。这将导致您无法从以前的保存点恢复。
当我在恢复时更改程序的并行性时会发生什么?
如果使用 Flink> = 1.2.0 触发保存点并且不使用弃用状态 API Checkpointed
,则只需从保存点恢复程序并指定新的并行度即可。
如果从 Flink <1.2.0 触发的保存点恢复或使用现已弃用的 API,则首先必须将作业和保存点迁移到 Flink> = 1.2.0,然后才能更改并行度。请参阅 升级作业和 Flink 版本指南 。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论