- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
状态后台
用 Data Stream API 编写的程序通常以各种形式保存状态:
- Windows 会在触发数据元或聚合之前收集数据元或聚合
- 转换函数可以使用键/值状态接口来存储值
- 转换函数可以实现
CheckpointedFunction
接口以使其局部变量具有容错能力
另请参阅流 API 指南中的 状态部分 。
激活检查点时,检查点会持续保持此类状态,以防止数据丢失并始终如一地恢复。状态如何在内部表示,以及在检查点上如何以及在何处持续取决于所选择的 状态后台 。
可用的状态后台
开箱即用,Flink 捆绑了这些状态后台:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
如果没有配置其他任何内容,系统将使用 MemoryStateBackend。
MemoryStateBackend
该 MemoryStateBackend 保存数据在内部作为 Java 堆的对象。键/值状态和窗口 算子包含存储值,触发器等的哈希表。
在检查点时,此状态后台将对状态进行 SNAPSHOT,并将其作为检查点确认消息的一部分发送到 JobManager(主服务器),JobManager 也将其存储在其堆上。
可以将 MemoryStateBackend 配置为使用异步 SNAPSHOT。虽然我们强烈建议使用异步 SNAPSHOT 来避免阻塞管道,但请注意,默认情况下,此函数目前处于启用状态。要禁用此函数,用户可以 MemoryStateBackend
在构造函数中将相应的布尔标志实例化为 false
(这应该仅用于调试),例如:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
MemoryStateBackend 的局限性:
- 默认情况下,每个状态的大小限制为 5 MB。可以在 MemoryStateBackend 的构造函数中增加此值。
- 无论配置的最大状态大小如何,状态都不能大于 akka 帧大小(请参阅 配置 )。
- 聚合状态必须适合 JobManager 内存。
鼓励 MemoryStateBackend 用于:
- 本地开发和调试
- 几乎没有状态的作业,例如仅包含一次记录函数的作业(Map,FlatMap,Filter,...)。Kafka 消费者需要很少的状态。
FsStateBackend
所述 FsStateBackend 配置有文件系统 URL(类型,地址,路径),如“HDFS://名称节点:40010 /Flink/检查点”或“文件:///数据/Flink/检查点”。
FsStateBackend 将正在运行的数据保存在 TaskManager 的内存中。在检查点时,它将状态 SNAPSHOT 写入配置的文件系统和目录中的文件。最小元数据存储在 JobManager 的内存中(或者,在高可用性模式下,存储在元数据检查点中)。
FsStateBackend 默认 使用 异步 SNAPSHOT, 以避免在编写状态检查点时阻塞处理管道。要禁用此函数,用户可以 FsStateBackend
在构造函数集中使用相应的布尔标志来实例化 a false
,例如:
new FsStateBackend(path, false);
鼓励 FsStateBackend:
- 具有大状态,长窗口,大键/值状态的作业。
- 所有高可用性设置。
RocksDBStateBackend
所述 RocksDBStateBackend 配置有文件系统 URL(类型,地址,路径),如“HDFS://名称节点:40010 /Flink/检查点”或“文件:///数据/Flink/检查点”。
RocksDBStateBackend 将 RocksDB 数据库中的飞行中数据保存在(默认情况下)存储在 TaskManager 数据目录中。在检查点时,整个 RocksDB 数据库将被检查点到配置的文件系统和目录中。最小元数据存储在 JobManager 的内存中(或者,在高可用性模式下,存储在元数据检查点中)。
RocksDBStateBackend 始终执行异步 SNAPSHOT。
RocksDBStateBackend 的局限性:
- 由于 RocksDB 的 JNI 桥接 API 基于 byte [],因此每个 Keys 和每个值的最大支持大小为 2 ^ 31 个字节。重要提示:在 RocksDB 中使用合并 算子操作的状态(例如 ListState)可以静默地累积> 2 ^ 31 字节的值大小,然后在下次检索时失败。这是目前 RocksDB JNI 的一个限制。
我们鼓励 RocksDBStateBackend:
- 具有非常大的状态,长窗口,大键/值状态的作业。
- 所有高可用性设置。
请注意,您可以保存的状态量仅受可用磁盘空间量的限制。与将状态保持在内存中的 FsStateBackend 相比,这允许保持非常大的状态。但是,这也意味着使用此状态后台可以实现的最大吞吐量更低。对此后台的所有读/写都必须通过去/序列化来检索/存储状态对象,这比使用堆基表示正在进行的堆上表示更昂贵。
RocksDBStateBackend 是目前唯一提供增量检查点的后台(见 这里 )。
配置状态后台
如果您不指定任何内容,则默认状态后台是 JobManager。如果要为群集上的所有作业建立不同的默认值,可以通过在 flink-conf.yaml 中 定义新的默认状态后台来 实现 。可以基于每个作业覆盖默认状态后台,如下所示。
设置每个作业状态后台
每个作业状态后台 StreamExecutionEnvironment
在作业上设置,如下例所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
设置默认状态后台
可以 flink-conf.yaml
使用配置 Keys 在配置中配置默认状态后台 state.backend
。
config 条目的可能值包括 jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend),或实现状态后台工厂 FsStateBackendFactory 的类的完全限定类名,例如 org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
RocksDBStateBackend。
该 state.checkpoints.dir
选项定义所有后台写入检查点数据和元数据文件的目录。您可以 在此处 找到有关检查点目录结构的更多详细信息。
配置文件中的示例部分可能如下所示:
# The backend that will be used to store operator state checkpoints
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
state.backend: filesystem
# Directory for storing checkpoints
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论