- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
任务生命周期
Flink 中的任务是执行的基本单位。它是执行 算子的每个并行实例的位置。例如,并行度为 5 的 算子将使每个实例由单独的任务执行。
它 StreamTask
是 Flink 流处理中所有不同任务子类型的基础。本文档 StreamTask
介绍了生命周期中的不同阶段,并描述了代表这些阶段的主要方法。
简而言之,算子生命周期
因为任务是执行 算子的并行实例的实体,所以它的生命周期与 算子的生命周期紧密集成。因此,我们将简要提及代表算子生命周期的基本方法,然后再深入了解算子的生命周期 StreamTask
。下面按照调用每个方法的顺序列出了该列表。假设 算子可以具有用户定义的函数( UDF ),则在每个 算子方法下面,我们还会在它调用的 UDF 的生命周期中呈现(缩进)方法。如果 算子扩展了这些方法,则这些方法可用 AbstractUdfStreamOperator
,这是执行 UDF 的所有 算子的基本类。
// initialization phase
OPERATOR::setup
UDF::setRuntimeContext
OPERATOR::initializeState
OPERATOR::open
UDF::open
// processing phase (called on every element/watermark)
OPERATOR::processElement
UDF::run
OPERATOR::processWatermark
// checkpointing phase (called asynchronously on every checkpoint)
OPERATOR::snapshotState
// termination phase
OPERATOR::close
UDF::close
OPERATOR::dispose
简而言之, setup()
调用它来初始化一些特定于算子的机制,例如它 RuntimeContext
及其度量集合数据结构。在此之后, initializeState()
为算子提供其初始状态,并且该 open()
方法执行任何特定于算子的初始化,例如在该情况下打开用户定义的函数 AbstractUdfStreamOperator
。
注意的 initializeState()
同时包含它的初始执行过程中初始化 算子操作者的状态的逻辑( 例如 寄存器任何键入的状态),并且还从在发生故障后的检查点检索其状态的逻辑。有关此内容的更多信息,请参见本页的其余部
现在一切都已设置,算子已准备好处理传入的数据。传入数据元可以是以下之一:输入数据元,水印和检查点障碍。它们中的每一个都有一个特殊的数据元来处理它。数据元由 processElement()
方法处理,水印由 processWatermark()
和检查点障碍触发一个检查点,该检查点调用(异步) snapshotState()
方法,我们在下面描述。对于每个传入数据元,根据其类型调用上述方法之一。请注意, processElement()
它也是调用 UDF 逻辑的位置, 例如 map()
您的方法 MapFunction
。
最后,在算子正常,无故障终止的情况下( 例如, 如果流是有限的并且到达其结束), close()
则调用该方法以执行算子逻辑所需的任何最终副本记录 算子操作( 例如, 关闭任何连接)或者在算子执行期间打开的 I / O 流), dispose()
之后调用它以释放算子持有的任何资源( 例如 算子数据所持有的本机内存)。
在由于故障或由于手动取消而终止的情况下,执行直接跳转到 dispose()
并且跳过算子在故障发生时所处的阶段与之间的任何中间阶段 dispose()
。
检查点:snapshotState()
只要收到检查点障碍,就会调用算子的方法与上述其他方法异步。检查点在处理阶段执行, 即 在算子打开之后和关闭之前执行。此方法的职责是将算子的当前状态存储到指定的 状态后台 ,当作业在失败后恢复执行时将从该 后台 检索该状态。下面我们将简要介绍 Flink 的检查点机制,有关 Flink 中检查点的原则的更详细讨论,请阅读相应的文档: Data Streaming Fault Tolerance 。
任务生命周期
在简要介绍了算子的主要阶段之后,本节将更详细地描述任务如何在群集上执行期间调用相应的方法。这里描述的阶段的顺序主要包括在该类的 invoke()
方法中 StreamTask
。本文档的其余部分分为两个小节,一个描述在任务的常规无故障执行期间的阶段(参见 正常执行 ),以及(更短的)一个描述在任务被取消时遵循的不同序列(请参阅“ 中断执行” ,手动或由于某些其他原因, 例如 执行期间抛出的异常。
正常执行
执行任务直到完成而不被中断的步骤如下所示:
TASK::setInitialState
TASK::invoke
create basic utils (config, etc) and load the chain of operators
setup-operators
task-specific-init
initialize-operator-states
open-operators
run
close-operators
dispose-operators
task-specific-cleanup
common-cleanup
如上所示,在恢复任务配置并初始化一些重要的运行时参数之后,该任务的第一步是检索其初始的任务范围状态。这是在 setInitialState()
,这在两种情况下尤其重要:
- 当任务从故障中恢复并从上一个成功检查点重新启动时
- 从 保存点 恢复时。
如果是第一次执行任务,则初始任务状态为空。
恢复任何初始状态后,任务进入其 invoke()
方法。在那里,它首先通过调用 setup()
它们中的每一个的方法来初始化参与本地计算的 算子,然后通过调用本地 init()
方法来执行其任务特定的初始化。通过特定的任务,我们的意思是根据任务(类型 SourceTask
, OneInputStreamTask
或 TwoInputStreamTask
等),这个步骤可能会有所不同,但在任何情况下,这里是必要的任务范围内获取资源。作为示例, OneInputStreamTask
表示期望具有单个输入流的任务,将连接初始化为与本地任务相关的输入流的不同分区的位置。
获得必要的资源后,不同的算子和用户定义的函数就可以从上面检索的任务范围状态获取其各自的状态。这是在 initializeState()
方法中完成的,该方法调用 initializeState()
每个单独的 算子。每个有状态 算子都应该覆盖此方法,并且应该包含状态初始化逻辑,这两者都是第一次执行作业,也适用于任务从故障中恢复或使用保存点时的情况。
现在任务中的所有算子都已初始化, open()
每个单独的算子的 openAllOperators()
方法都由该方法调用 StreamTask
。此方法执行所有 算子操作初始化,例如使用计时器服务注册任何检索到的计时器。单个任务可能正在执行多个 算子,其中一个 算子消耗其前任的输出。在这种情况下, open()
从最后一个 算子( 即 其输出也是任务本身的输出的 算子)调用该方法到第一个 算子。这样做是为了当第一个算子开始处理任务的输入时,所有下游算子都准备好接收其输出。
注意任务中的连续算子从最后一个到第一个打开。
现在任务可以恢复执行,算子可以开始处理新的输入数据。这 run()
是调用特定于任务的方法的位置。此方法将一直运行,直到没有更多输入数据(有限流),或任务被取消(手动或不手动)。这是 调用 算子特定 processElement()
和 processWatermark()
方法的位置。
在运行直到完成的情况下, 即 没有更多的输入数据要处理,在退出该 run()
方法之后,任务进入其关闭过程。最初,定时器服务停止注册任何新的定时器( 例如, 从正在执行的触发定时器),清除所有尚未启动的定时器,并等待当前正在执行的定时器的完成。然后 closeAllOperators()
尝试通过调用 close()
每个 算子的方法来优雅地关闭计算中涉及的运算符。然后,刷新任何缓冲的输出数据,以便下游任务可以处理它们,最后任务尝试通过调用它来清除算子持有的所有资源。 dispose()
每个人的方法。打开不同的算子时,我们提到订单是从最后一个到第一个。结束以相反的方式发生,从头到尾。
注意任务中的连续算子从第一个到最后一个关闭。
最后,当所有算子都已关闭并释放所有资源时,任务会关闭其计时器服务,执行特定于任务的清理, 例如 清除其所有内部缓冲区,然后执行其通用任务清理,其中包括关闭所有 算子操作输出通道和清理任何输出缓冲区。
检查点: 以前我们看到过,在 initializeState()
从故障中恢复的过程中,任务及其所有算子和函数检索在故障之前的最后一个成功检查点期间持久保存到稳定存储的状态。Flink 中的检查点是根据用户指定的间隔定期执行的,并且由与主任务线程不同的线程执行。这就是为什么它们不包含在任务生命周期的主要阶段中。简而言之,调用的特殊数据元 CheckpointBarriers
由输入数据流中的作业的源任务定期注入,并随实际数据从源传递到接收器。源任务在运行模式后注入这些障碍,并假设为 CheckpointCoordinator
也在运行。每当任务接收到这样的障碍时,它就会调度由检查点线程执行的任务,该线程调用 snapshotState()
任务中的 算子。在执行检查点时,任务仍然可以接收输入数据,但是数据被缓冲并且仅在检查点成功完成后处理并向下游发射。
执行中断
在前面的部分中,我们描述了一直运行到完成的任务的生命周期。如果任务在任何时候被取消,则正常执行被中断,从那一点开始执行的唯一 算子操作是定时器服务关闭,特定于任务的清理,算子的处理以及一般任务清理,如如上所述。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论