返回介绍

调整检查点和大状态

发布于 2025-05-02 18:19:19 字数 8924 浏览 0 评论 0 收藏

此页面提供了如何配置和调整使用大状态的应用程序的指南。

概览

要使 Flink 应用程序可以大规模可靠运行,必须满足两个条件:

  • 应用程序需要能够可靠地获取检查点
  • 在发生故障后,资源需要足以赶上输入数据流

第一部分讨论如何大规模获得良好的检查点。最后一节介绍了有关计划使用多少资源的一些最佳实践。

监测状态和检查点

监视检查点行为的最简单方法是通过 UI 的检查点部分。 检查点监视 的文档显示了如何访问可用的检查点度量标准。

扩大检查点时特别感兴趣的两个数字是:

  • 算子启动检查点的时间:此时间目前尚未直接公开,但对应于:

    checkpoint_start_delay = end_to_end_duration - synchronous_duration - asynchronous_duration

    当触发检查点的时间一直非常高时,这意味着 检查点障碍 需要很长时间才能从源头移动到算子。这通常表明系统在恒定的背压下运行。

  • 在对齐期间缓冲的数据量。对于一次性语义,Flink 在接收多个输入流的 算子处 对齐 流,为该对齐缓冲一些数据。理想情况下缓冲的数据量较低 - 较高的数量意味着在不同输入流的非常不同的时间接收检查点障碍。

请注意,当存在瞬态背压,数据偏斜或网络问题时,此处指示的数字偶尔会很高。但是,如果数字一直很高,则意味着 Flink 将许多资源放入检查点。

调整检查点

应用程序可以定期触发检查点。当检查点比检查点间隔花费更长时间时,在正在进行的检查点完成之前不会触发下一个检查点。默认情况下,一旦正在进行的检查点完成,将立即触发下一个检查点。

当检查点最终频繁占用超过基准时间间隔时(例如因为状态增长超过计划,或者存储检查点的存储暂时变慢),系统会不断地检查点(一旦完成,新系统会立即启动) 。这可能意味着在检查点上经常捆绑太多资源,而且算子的进展太少。此行为对使用异步检查点状态的流应用程序的影响较小,但可能仍会对整体应用程序性能产生影响。

为防止出现这种情况,应用程序可以定义 检查点之间 的 最短持续时间 :

StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

此持续时间是在最新检查点结束和下一个检查点开始之间必须经过的最小时间间隔。下图说明了这对检查点的影响。

插图检查点之间的最小时间参数如何影响检查点行为。

注意: 可以配置应用程序(通过 CheckpointConfig )以允许多个检查点同时进行。对于 Flink 中具有大状态的应用程序,这通常会将太多资源绑定到检查点。当手动触发保存点时,它可能正在与正在进行的检查点同时进行。

调整网络缓冲区

在 Flink 1.3 之前,网络缓冲区数量的增加也导致检查点时间增加,因为保存更多的飞行数据意味着检查点障碍被延迟。从 Flink 1.3 开始,每个传出/传入通道使用的网络缓冲区数量是有限的,因此可以配置网络缓冲区而不会影响检查点时间(请参阅 网络缓冲区配置 )。

尽可能使状态检查点异步

当状态是 异步_SNAPSHOT 时,检查点比 同步_SNAPSHOT 状态时更好地扩展。特别是在具有多个连接,协同函数或窗口的更复杂的流应用程序中,这可能会产生深远的影响。

要异步创建状态,应用程序必须做两件事:

  1. 使用 由 Flink 管理的状态:托管状态表示 Flink 提供存储状态的数据结构。目前,这是真正的 被 Keys 化状态 ,这就好比接口背后抽象 ValueStateListStateReducingState ,...
  2. 使用支持异步 SNAPSHOT 的状态后台。在 Flink 1.2 中,只有 RocksDB 状态后台使用完全异步 SNAPSHOT。从 Flink 1.3 开始,基于堆的状态后台也支持异步 SNAPSHOT。

以上两点意味着大状态通常应保持为被 Keys 化状态,而不是算子状态。

调整 RocksDB

许多大型 Flink 流应用程序的状态存储主力是 RocksDB State 后台 。后台远远超出主存储器,可靠地存储大 被 Keys 化状态

不幸的是,RocksDB 的性能可能因配置而异,并且几乎没有关于如何正确调整 RocksDB 的文档。例如,默认配置是针对 SSD 定制的,并且在旋转磁盘上执行次优。

增量检查点

与完整检查点相比,增量检查点可以显着缩短检查点时间,但代价是(可能)更长的恢复时间。核心思想是增量检查点仅记录对先前完成的检查点的所有更改,而不是生成状态后台的完整,自包含备份。像这样,增量检查点建立在先前的检查点上。Flink 以一种随时间自我整合的方式利用 RocksDB 的内部备份机制。因此,Flink 中的增量检查点历史记录不会无限增长,并且最终会将旧检查点包含在内并自动修剪。`

虽然我们强烈建议对大型状态使用增量检查点,但请注意,这是一项新函数,目前默认情况下未启用。要启用此函数,用户可以 RocksDBStateBackend 在构造函数集中使用相应的布尔标志来实例化 a true ,例如:

 RocksDBStateBackend backend =
    new RocksDBStateBackend(filebackend, true);

RocksDB 计时器

对于 RocksDB,用户可以选择计时器是存储在堆上(默认)还是存储在 RocksDB 中。基于堆的定时器可以为较少数量的定时器提供更好的性能,而在 RocksDB 中存储定时器可提供更高的可扩展性,因为 RocksDB 中的定时器数量可能超过可用的主内存(溢出到磁盘)。

当使用 RockDB 作为状态后台时,可以通过 Flink 的配置通过选项键选择定时器存储的类型 state.backend.rocksdb.timer-service.factory 。可能的选择是 heap (在堆上存储定时器,默认)和 rocksdb (在 RocksDB 中存储定时器)。

注意 RocksDB 状态后台/增量检查点/基于堆的定时器的组合当前不支持定时器状态的异步 SNAPSHOT。其他状态如被 Keys 化状态仍然是异步 SNAPSHOT。请注意,这不是以前版本的回归,将通过解决 FLINK-10026

将选项传递给 RocksDB

RocksDBStateBackend.setOptions(new MyOptions());

public class MyOptions implements OptionsFactory {

  @Override
  public DBOptions createDBOptions() {
    return new DBOptions()
      .setIncreaseParallelism(4)
      .setUseFsync(false)
      .setDisableDataSync(true);
  }

  @Override
  public ColumnFamilyOptions createColumnOptions() {

    return new ColumnFamilyOptions()
      .setTableFormatConfig(
        new BlockBasedTableConfig()
          .setBlockCacheSize(256 * 1024 * 1024)  // 256 MB
          .setBlockSize(128 * 1024));      // 128 KB
  }
}

预定义选项

Flink 为 RocksDB 提供了一些预定义的选项集合,用于不同的设置,例如可以设置 RocksDBStateBacked.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)

我们希望随着时间的推移积累更多这样的配置文件 当您发现一组运行良好且对某些工作负载具有代表性的选项时,请随意提供此类预定义选项配置文件。

注意 RocksDB 是一个本机库,它直接从进程分配内存,而不是从 JVM 分配内存。您必须考虑分配给 RocksDB 的任何内存,通常是将 TaskManagers 的 JVM 堆大小 Reduce 相同的量。不这样做可能导致 YARN / Mesos / etc 终止 JVM 进程以分配比配置更多的内存。

容量规划

本节讨论如何确定应该使用多少资源来使 Flink 作业可靠地运行。容量规划的基本经验法则是:

  • 正常运行应具有足够的容量,以便在恒定的 背压下 不运行。有关如何检查应用程序是否在背压下运行的详细信息,请参阅 背压监测
  • 在无故障时间内无需背压即可运行程序所需的资源之上提供一些额外资源。需要这些资源来“赶上”在应用程序恢复期间累积的输入数据。这应该取决于恢复 算子操作通常需要多长时间(这取决于故障转移时需要加载到新 TaskManagers 中的状态的大小)以及该方案需要多快才能恢复。

    重要事项 :应该在激活检查点的情况下建立基线,因为检查点会占用一些资源(例如网络带宽)。

  • 临时背压通常是可以的,并且是在负载峰值期间,追赶阶段期间或外部系统(写入水槽中)表现出暂时减速时执行流控制的重要部分。
  • 某些 算子操作(如大窗口)会导致下游算子出现尖峰负载:对于 Windows,下游算子在构建窗口时可能没什么可做的,并且在窗口发出时有负载。下游并行度的计划需要考虑窗口发射的程度以及需要处理这种尖峰的速度。

要点: 为了以后允许添加资源,请确保将数据流程序的 最大并行 度设置为合理的数字。最大并行度定义了在重新缩放程序时(通过保存点)设置程序并行度的高度。

Flink 的内部副本记录以最大并行度 - 许多 关键组 的粒度跟踪并行状态。即使执行低并行度的程序,Flink 的设计也力求使其具有非常高的最大并行度值。

压缩

Flink 为所有检查点和保存点提供可选压缩(默认:关闭)。目前,压缩总是使用 snappy 压缩算法(版本 1.1.4), 但我们计划在将来支持自定义压缩算法。压缩适用于被 Keys 化状态下的键组的粒度,即每个键组可以单独解压缩,这对于重新缩放很重要。

压缩可以通过以下方式激活 ExecutionConfig

 ExecutionConfig executionConfig = new ExecutionConfig();
    executionConfig.setUseSnapshotCompression(true);

注意压缩选项对增量 SNAPSHOT 没有影响,因为它们使用的是 RocksDB 的内部格式,它始终使用开箱即用的快速压缩。

任务本地恢复

愿景

在 Flink 的检查点中,每个任务都会生成其状态的 SNAPSHOT,然后将其写入分布式存储。每个任务通过发送描述分布式存储中状态位置的句柄来确认成功将状态写入 JobManager。反过来,JobManager 从所有任务中收集句柄并将它们捆绑到检查点对象中。

在恢复的情况下,JobManager 打开最新的检查点对象并将句柄发送回相应的任务,然后可以从分布式存储中恢复其状态。使用分布式存储来存储状态有两个重要的优点。首先,存储是容错的,其次,分布式存储中的所有状态都可以被所有节点访问,并且可以容易地重新分配(例如,用于重新分级)。

但是,使用远程分布式存储也有一个很大的缺点:所有任务必须通过网络从远程位置读取其状态。在许多情况下,恢复可以将失败的任务重新安排到与上一次运行相同的 TaskManager(当然还有例如机器故障),但我们仍然必须读取远程状态。即使单台机器上只有很小的故障,这也可能导致 大型状态的恢复时间过长 。

方法

任务本地状态恢复完全针对这个长恢复时间的问题,主要思想如下:对于每个检查点,每个任务不仅将任务状态写入分布式存储,而且还保存 状态 SNAPSHOT 的辅助副本。任务本地的存储 (例如,在本地磁盘或内存中)。请注意,SNAPSHOT 的主存储必须仍然是分布式存储,因为本地存储不能确保节点故障下的持久性,也不能为其他节点提供访问以重新分发状态,此函数仍需要主副本。

但是,对于可以重新安排到先前位置进行恢复的每个任务,我们可以从辅助本地副本恢复状态,并避免远程读取状态的成本。鉴于 许多故障不是节点故障,并且节点故障通常一次只影响一个或很少的节点, 很可能在恢复中大多数任务可以返回到它们先前的位置并且找到它们的本地状态。这使得本地恢复有效 Reduce 恢复时间。

请注意,根据所选的状态后台和检查点策略,每个检查点可能会产生一些额外费用,用于创建和存储辅助本地状态副本。例如,在大多数情况下,实现将简单地将对分布式存储的写入复制到本地文件。

检查点的例证与任务地方恢复的。

主(分布式存储)和辅助(任务 - 本地)状态 SNAPSHOT 的关系

任务本地状态始终被视为辅助副本,检查点状态的基本事实是分布式存储中的主副本。这对于检查点和恢复期间本地状态的问题有影响:

  • 对于检查点, 主副本必须成功, 并且生成 辅助本地副本的失败不会 使检查点 失败 。如果无法创建主副本,则检查点将失败,即使已成功创建辅助副本也是如此。
  • 只有主副本由 JobManager 确认和管理,辅助副本由 TaskManager 拥有,其生命周期可以独立于其主副本。例如,可以将 3 个最新检查点的历史记录保存为主副本,并仅保存最新检查点的任务本地状态。
  • 对于恢复,如果匹配的辅助副本可用,Flink 将始终首先 尝试从任务本地状态恢复 。如果从辅助副本恢复期间出现任何问题,Flink 将 透明地重试从主副本恢复任务 。如果主要和(可选)辅助副本失败,则恢复仅失败。在这种情况下,根据配置,Flink 仍然可以回退到较旧的检查点。
  • 任务本地副本可能仅包含完整任务状态的一部分(例如,在写入一个本地文件时出现异常)。在这种情况下,Flink 将首先尝试在本地恢复本地部分,从主副本恢复非本地状态。主状态必须始终完整,并且是 任务本地状态 的 超集 。
  • 任务本地状态可以具有与主状态不同的格式,它们不需要是字节相同的。例如,任务本地状态甚至可能是由堆对象组成的内存中,而不是存储在任何文件中。
  • 如果 TaskManager 丢失,则其所有任务的本地状态将丢失。

配置任务本地恢复

默认情况下 ,任务本地恢复已 取消激活 ,可以通过 Flink 的配置使用 state.backend.local-recovery 指定的 Keys 激活 CheckpointingOptions.LOCAL_RECOVERY 。此设置的值可以为 true ,也可以为 false (默认值)以禁用本地恢复。

有关不同状态后台的任务本地恢复的详细信息

限制 :目前,任务本地恢复仅涵盖被 Keys 化状态后台。被 Keys 化状态通常是该状态最大的部分。在不久的将来,我们还将涵盖算子的状态和计时器。

以下状态后台可以支持任务本地恢复。

  • FsStateBackend:被 Keys 化状态支持任务本地恢复。实现将将状态复制到本地文件。这可能会引入额外的写入成本并占用本地磁盘空间。将来,我们还可能提供一种将任务本地状态保存在内存中的实现。
  • RocksDBStateBackend:被 Keys 化状态支持任务本地恢复。对于 完整检查点 ,状态将复制到本地文件。这可能会引入额外的写入成本并占用本地磁盘空间。对于 增量 SNAPSHOT ,本地状态基于 RocksDB 的本机检查点机制。此机制也用作创建主副本的第一步,这意味着在这种情况下,不会引入额外的成本来创建辅助副本。我们只是保存原生检查点目录,而不是在上传到分布式商店后删除它。此本地副本可以与 RocksDB 的工作目录共享活动文件(通过硬链接),因此对于活动文件,也不会为使用增量 SNAPSHOT 的任务本地恢复消耗额外的磁盘空间。

分配保存调度

任务本地恢复假设在失败时保存分配任务调度,其工作如下。每个任务都会记住其先前的分配,并 请求完全相同的插槽 在恢复时重新启动。如果此插槽不可用,则任务将从资源管理器请求 新的新插槽 。这样,如果 TaskManager 不再可用,则无法返回其先前位置的 任务将不会驱动其先前插槽中的其他恢复任务 。我们的理由是,当 TaskManager 不再可用时,前一个插槽只能消失,在这种情况下是 一些 任务必须要求新的插槽。通过我们的调度策略,我们可以为最大数量的任务提供从本地状态恢复的机会,并避免任务窃取其先前插槽之间的级联效应。

分配保存调度不适用于 Flink 的传统模式。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。