返回介绍

API 迁移指南

发布于 2025-05-02 18:19:18 字数 15046 浏览 0 评论 0 收藏

从 Flink 1.2 迁移到 Flink 1.3

自 Flink 1.2 以来,有一些 API 已被更改。大多数更改都记录在其特定文档中。以下是 API 更改的综合列表以及升级到 Flink 1.3 时迁移详细信息的链接。

TypeSerializer 界面变化

这主要适用 TypeSerializer 于为其状态实施自定义的用户。

从 Flink 1.3 开始,添加了两个与保存点恢复的串行器兼容性相关的其他方法。 有关如何实现这些方法的更多详细信息,请参阅 处理序列化程序升级和兼容性

ProcessFunction 总是一个 RichFunction

在 Flink 1.2 中,引入了 ProcessFunction 其丰富的变体 RichProcessFunction 。自 Flink 1.3 以来, RichProcessFunction 已被删除, ProcessFunction 现在始终 RichFunction 可以访问生命周期方法和运行时上下文。

Flink CEP 库 API 更改

Flink 1.3 中的 CEP 库附带了许多新函数,这些函数导致 API 发生了一些变化。有关详细信息,请访问 CEP 迁移文档

从 Flink 核心工件中删除了 Logger 依赖项

在 Flink 1.3 中,为了确保用户可以使用他们自己的自定义日志记录框架,核心 Flink 工件现在可以清除特定的记录器依赖项。

示例和快速入门原型已经指定了记录器,不应受到影响。对于其他自定义项目,请确保添加记录器依赖项。例如,在 Maven 中 pom.xml ,您可以添加:

<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-log4j12</artifactId>
  <version>1.7.7</version>
</dependency>

<dependency>
  <groupId>log4j</groupId>
  <artifactId>log4j</artifactId>
  <version>1.2.17</version>
</dependency>

从 Flink 1.1 迁移到 Flink 1.2

状态文档 中所述,Flink 有两种类型的状态: 被 Keys 化 状态 和 非被 Keys 化 状态(也称为 算子 状态)。这两种类型都可用于 算子和用户定义的函数。本文档将指导您完成将 Flink 1.1 函数代码迁移到 Flink 1.2 的过程,并介绍 Flink 1.2 中引入的一些重要内部更改,这些更改涉及 Flink 1.1 中对齐窗口 算子的弃用(请参阅 对齐处理时间窗口 算子 )。

迁移过程将有两个目标:

  1. 允许您的函数利用 Flink 1.2 中引入的新函数,例如重新缩放,
  2. 确保您的新 Flink 1.2 作业能够从其 Flink 1.1 前身生成的保存点恢复执行。

按照本指南中的步骤 算子操作后,您可以将正在运行的作业从 Flink 1.1 迁移到 Flink 1.2,只需在 Flink 1.1 作业中使用 保存点 并将其作为起点提供给 Flink 1.2 作业。这将允许 Flink 1.2 作业从其 Flink 1.1 前任中断的位置恢复执行。

示例用户函数

作为本文档其余部分的运行示例,我们将使用 CountMapperBufferingSink 函数。第一个是具有 被 Keys 化 状态的函数的示例,而第二个是具有 非被 Keys 化 状态的函数。Flink 1.1 中上述两个函数的代码如下:

public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

  private transient ValueState<Integer> counter;

  private final int numberElements;

  public CountMapper(int numberElements) {
    this.numberElements = numberElements;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    counter = getRuntimeContext().getState(
      new ValueStateDescriptor<>("counter", Integer.class, 0));
  }

  @Override
  public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    int count = counter.value() + 1;
    counter.update(count);

    if (count % numberElements == 0) {
      out.collect(Tuple2.of(value.f0, count));
      counter.update(0); // reset to 0
    }
  }
}

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
  Checkpointed<ArrayList<Tuple2<String, Integer>>> {

  private final int threshold;

  private ArrayList<Tuple2<String, Integer>> bufferedElements;

  BufferingSink(int threshold) {
    this.threshold = threshold;
    this.bufferedElements = new ArrayList<>();
  }

  @Override
  public void invoke(Tuple2<String, Integer> value) throws Exception {
    bufferedElements.add(value);
    if (bufferedElements.size() == threshold) {
      for (Tuple2<String, Integer> element: bufferedElements) {
        // send it to the sink
      }
      bufferedElements.clear();
    }
  }

  @Override
  public ArrayList<Tuple2<String, Integer>> snapshotState(
    long checkpointId, long checkpointTimestamp) throws Exception {
    return bufferedElements;
  }

  @Override
  public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    bufferedElements.addAll(state);
  }
}

CountMapper 是一个 RichFlatMapFunction 假定按表格分组的输入流 (word, 1) 。该函数为每个传入的键( ValueState&lt;Integer&gt; counter )保存一个计数器,如果某个单词的出现次数超过用户提供的阈值,则会发出一个包含单词本身和出现次数的元组。

BufferingSink 是一个 SinkFunction 接收元件(的潜在的输出 CountMapper ),并直到达到某个用户指定的阈值,将它们发射到最终水槽之前缓冲它们。这是避免对数据库或外部存储系统进行许多昂贵调用的常用方法。为了以容错方式进行缓冲,缓冲数据元保存在列表( bufferedElements )中,该列表定期检查点。

状态 API 迁移

要利用 Flink 1.2 的新函数,应修改上面的代码以使用新的状态抽象。完成这些更改后,您将能够更改作业的并行度(向上或向下扩展),并确保您的新版本的作业将从其前任停止的位置开始。

Keys 状态: 在深入研究迁移过程的细节之前需要注意的是,如果您的函数 只有被 Keys 化状态 ,那么 Flink 1.1 的完全相同的代码也适用于 Flink 1.2,完全支持新函数和完全向后兼容性。可以仅针对更好的代码组织进行更改,但这仅仅是一种风格问题。

如上所述,本节的其余部分重点介绍 非被 Keys 化状态

重新缩放和新状态抽象

第一个修改是从旧 Checkpointed&lt;T extends Serializable&gt; 状态接口到新状态接口的转换。在 Flink 1.2 中,有状态函数可以实现更通用的 CheckpointedFunction 接口或 ListCheckpointed&lt;T extends Serializable&gt; 接口,它在语义上更接近旧接口 Checkpointed

在这两种情况中,非键合状态被预期是一个 List 的 序列化的 对象,彼此独立的,因而在重新缩放获再分配。换句话说,这些对象是可以重新分区非被 Keys 化状态的最细粒度。例如,如果并行性 1 BufferingSink 包含数据元的检查点状态, (test1, 2) 并且 (test2, 2) 当将并行性增加到 2 时, (test1, 2) 可能最终在任务 0 中,而 (test2, 2) 将进入任务 1。

有关被 Keys 化状态和非被 Keys 化状态重新缩放的原则的更多详细信息,请参阅 状态文档

ListCheckpointed

ListCheckpointed 接口需要实现两种方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

它们的语义与旧 Checkpointed 界面中的对应物相同。唯一的区别是现在 snapshotState() 应该将对象列表返回到检查点,如前所述,并且 restoreState 必须在恢复时处理此列表。如果状态不是重新分区,可以随时返回 Collections.singletonList(MY_STATE)snapshotState() 。更新的代码 BufferingSink 包括在下面:

public class BufferingSinkListCheckpointed implements
    SinkFunction<Tuple2<String, Integer>>,
    ListCheckpointed<Tuple2<String, Integer>>,
    CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {

  private final int threshold;

  private transient ListState<Tuple2<String, Integer>> checkpointedState;

  private List<Tuple2<String, Integer>> bufferedElements;

  public BufferingSinkListCheckpointed(int threshold) {
    this.threshold = threshold;
    this.bufferedElements = new ArrayList<>();
  }

  @Override
  public void invoke(Tuple2<String, Integer> value) throws Exception {
    this.bufferedElements.add(value);
    if (bufferedElements.size() == threshold) {
      for (Tuple2<String, Integer> element: bufferedElements) {
        // send it to the sink
      }
      bufferedElements.clear();
    }
  }

  @Override
  public List<Tuple2<String, Integer>> snapshotState(
      long checkpointId, long timestamp) throws Exception {
    return this.bufferedElements;
  }

  @Override
  public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
    if (!state.isEmpty()) {
      this.bufferedElements.addAll(state);
    }
  }

  @Override
  public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    // this is from the CheckpointedRestoring interface.
    this.bufferedElements.addAll(state);
  }
}

如代码所示,更新的函数也实现了 CheckpointedRestoring 接口。这是出于向后兼容性原因,更多细节将在本节末尾解释。

CheckpointedFunction

CheckpointedFunction 接口需要再次执行两种方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

与在 Flink 1.1 中一样, snapshotState() 每当执行检查点时都会调用它,但是每次初始化用户定义的函数时,都会调用 initializeState() 它(它的对应部分 restoreState() ),而不是仅在我们从故障中恢复的情况下。鉴于此, initializeState() 不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑。 CheckpointedFunction 接口的实现 BufferingSink 如下所示。

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
    CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {

  private final int threshold;

  private transient ListState<Tuple2<String, Integer>> checkpointedState;

  private List<Tuple2<String, Integer>> bufferedElements;

  public BufferingSink(int threshold) {
    this.threshold = threshold;
    this.bufferedElements = new ArrayList<>();
  }

  @Override
  public void invoke(Tuple2<String, Integer> value) throws Exception {
    bufferedElements.add(value);
    if (bufferedElements.size() == threshold) {
      for (Tuple2<String, Integer> element: bufferedElements) {
        // send it to the sink
      }
      bufferedElements.clear();
    }
  }

  @Override
  public void snapshotState(FunctionSnapshotContext context) throws Exception {
    checkpointedState.clear();
    for (Tuple2<String, Integer> element : bufferedElements) {
      checkpointedState.add(element);
    }
  }

  @Override
  public void initializeState(FunctionInitializationContext context) throws Exception {
    checkpointedState = context.getOperatorStateStore().
      getSerializableListState("buffered-elements");

    if (context.isRestored()) {
      for (Tuple2<String, Integer> element : checkpointedState.get()) {
        bufferedElements.add(element);
      }
    }
  }

  @Override
  public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    // this is from the CheckpointedRestoring interface.
    this.bufferedElements.addAll(state);
  }
}

initializeState 把参数作为一个 FunctionInitializationContext 。这用于初始化非被 Keys 化状态“容器”。这是一个类型的容器, ListState 其中非被 Keys 化状态对象将在检查点存储:

this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");

在初始化容器之后,我们使用 isRestored() 上下文的方法来检查我们是否在失败后恢复。如果是这样 true , 即 我们正在恢复,则应用恢复逻辑。

如修改的代码所示,在状态初始化期间恢复的 BufferingSink 这个 ListState 被保存在类变量中以供将来使用 snapshotState() 。在那里, ListState 被清除由先前的检查点包含的所有对象,然后填充我们要设置检查点新的。

作为旁注,被 Keys 化状态也可以在 initializeState() 方法中初始化。这可以使用 FunctionInitializationContext 给定的参数来完成,而不是 RuntimeContext Flink 1.1 的情况。如果 CheckpointedFunction 要在 CountMapper 示例中使用该接口,则 open() 可以删除旧方法,new snapshotState()initializeState() 方法如下所示:

public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
    implements CheckpointedFunction {

  private transient ValueState<Integer> counter;

  private final int numberElements;

  public CountMapper(int numberElements) {
    this.numberElements = numberElements;
  }

  @Override
  public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    int count = counter.value() + 1;
    counter.update(count);

    if (count % numberElements == 0) {
      out.collect(Tuple2.of(value.f0, count));
      counter.update(0); // reset to 0
    }
  }

  @Override
  public void snapshotState(FunctionSnapshotContext context) throws Exception {
    // all managed, nothing to do.
  }

  @Override
  public void initializeState(FunctionInitializationContext context) throws Exception {
    counter = context.getKeyedStateStore().getState(
      new ValueStateDescriptor<>("counter", Integer.class, 0));
  }
}

请注意,该 snapshotState() 方法为空,因为 Flink 本身负责在检查点时 SNAPSHOT 托管的被 Keys 化状态。

向后兼容 Flink 1.1

到目前为止,我们已经看到如何修改我们的函数以利用 Flink 1.2 引入的新函数。剩下的问题是“我可以确保我的修改后的(Flink 1.2)作业将从我从 Flink 1.1 运行的作业停止的位置开始吗?”。

答案是肯定的,而这样做的方式非常简单。对于被 Keys 化状态,您必须什么都不做。Flink 将负责从 Flink 1.1 恢复状态。对于非被 Keys 化状态,新函数必须实现 CheckpointedRestoring 接口,如上面的代码所示。这有一个方法,熟悉 Flink 1.1 restoreState() 的旧 Checkpointed 接口。如修改后的代码所示 BufferingSink ,该 restoreState() 方法与其前身相同。

对齐处理时间窗口 算子

在 Flink 1.1 中,只有在没有指定的逐出器或触发器的 处理时间 上运行 时 , timeWindow() 被 Key 化的数据流上的命令才会实例化特殊类型 WindowOperator 。这可以是一个 AggregatingProcessingTimeWindowOperator 或一个 AccumulatingProcessingTimeWindowOperator 。这两个 算子都被称为 对齐 窗口 算子,因为它们假设它们的输入数据元按顺序到达。这在处理时间内有效,因为数据元在到达窗口算子时获得挂钟时间的时间戳。这些 算子仅限于使用内存状态后台,并且具有优化的数据结构,用于存储利用有序输入数据元到达的每窗口数据元。

在 Flink 1.2 中,不推荐使用对齐的窗口 算子,并且所有窗口 算子操作都通过泛型 WindowOperator 。此迁移不需要更改 Flink 1.1 作业的代码,因为 Flink 将透明地读取 Flink 1.1 保存点中对齐的窗口 算子存储的状态,将其转换为与泛型相兼容的格式 WindowOperator ,并使用通用的 WindowOperator

注意尽管已弃用,但您仍然可以使用 Flink 1.2 中的对齐窗口 算子,通过特殊 WindowAssigners 介绍来实现此目的。这些 assigners 是 SlidingAlignedProcessingTimeWindowsTumblingAlignedProcessingTimeWindows assigners,滑动和翻滚分别窗口。使用对齐窗口的 Flink 1.2 作业必须是一项新工作,因为在使用这些 算子时无法从 Flink 1.1 保存点恢复执行。

注意对齐的窗口 算子不提供 重新缩放 函数, 也不 提供与 Flink 1.1 的 向后兼容性

在 Flink 1.2 中使用对齐窗口 算子的代码如下所示:

// for tumbling windows
DataStream<Tuple2<String, Integer>> window1 = source
  .keyBy(0)
  .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
  .apply(your-function)

// for sliding windows
DataStream<Tuple2<String, Integer>> window1 = source
  .keyBy(0)
  .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
  .apply(your-function)
// for tumbling windows val window1 = source
  .keyBy(0)
  .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
  .apply(your-function)

// for sliding windows val window2 = source
  .keyBy(0)
  .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
  .apply(your-function)

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。