- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
API 迁移指南
从 Flink 1.2 迁移到 Flink 1.3
自 Flink 1.2 以来,有一些 API 已被更改。大多数更改都记录在其特定文档中。以下是 API 更改的综合列表以及升级到 Flink 1.3 时迁移详细信息的链接。
TypeSerializer
界面变化
这主要适用 TypeSerializer
于为其状态实施自定义的用户。
从 Flink 1.3 开始,添加了两个与保存点恢复的串行器兼容性相关的其他方法。 有关如何实现这些方法的更多详细信息,请参阅 处理序列化程序升级和兼容性 。
ProcessFunction
总是一个 RichFunction
在 Flink 1.2 中,引入了 ProcessFunction
其丰富的变体 RichProcessFunction
。自 Flink 1.3 以来, RichProcessFunction
已被删除, ProcessFunction
现在始终 RichFunction
可以访问生命周期方法和运行时上下文。
Flink CEP 库 API 更改
Flink 1.3 中的 CEP 库附带了许多新函数,这些函数导致 API 发生了一些变化。有关详细信息,请访问 CEP 迁移文档 。
从 Flink 核心工件中删除了 Logger 依赖项
在 Flink 1.3 中,为了确保用户可以使用他们自己的自定义日志记录框架,核心 Flink 工件现在可以清除特定的记录器依赖项。
示例和快速入门原型已经指定了记录器,不应受到影响。对于其他自定义项目,请确保添加记录器依赖项。例如,在 Maven 中 pom.xml
,您可以添加:
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
从 Flink 1.1 迁移到 Flink 1.2
如 状态文档 中所述,Flink 有两种类型的状态: 被 Keys 化 状态 和 非被 Keys 化 状态(也称为 算子 状态)。这两种类型都可用于 算子和用户定义的函数。本文档将指导您完成将 Flink 1.1 函数代码迁移到 Flink 1.2 的过程,并介绍 Flink 1.2 中引入的一些重要内部更改,这些更改涉及 Flink 1.1 中对齐窗口 算子的弃用(请参阅 对齐处理时间窗口 算子 )。
迁移过程将有两个目标:
- 允许您的函数利用 Flink 1.2 中引入的新函数,例如重新缩放,
- 确保您的新 Flink 1.2 作业能够从其 Flink 1.1 前身生成的保存点恢复执行。
按照本指南中的步骤 算子操作后,您可以将正在运行的作业从 Flink 1.1 迁移到 Flink 1.2,只需在 Flink 1.1 作业中使用 保存点 并将其作为起点提供给 Flink 1.2 作业。这将允许 Flink 1.2 作业从其 Flink 1.1 前任中断的位置恢复执行。
示例用户函数
作为本文档其余部分的运行示例,我们将使用 CountMapper
和 BufferingSink
函数。第一个是具有 被 Keys 化 状态的函数的示例,而第二个是具有 非被 Keys 化 状态的函数。Flink 1.1 中上述两个函数的代码如下:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private transient ValueState<Integer> counter;
private final int numberElements;
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
}
}
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
Checkpointed<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private ArrayList<Tuple2<String, Integer>> bufferedElements;
BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public ArrayList<Tuple2<String, Integer>> snapshotState(
long checkpointId, long checkpointTimestamp) throws Exception {
return bufferedElements;
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
bufferedElements.addAll(state);
}
}
这 CountMapper
是一个 RichFlatMapFunction
假定按表格分组的输入流 (word, 1)
。该函数为每个传入的键( ValueState<Integer> counter
)保存一个计数器,如果某个单词的出现次数超过用户提供的阈值,则会发出一个包含单词本身和出现次数的元组。
的 BufferingSink
是一个 SinkFunction
接收元件(的潜在的输出 CountMapper
),并直到达到某个用户指定的阈值,将它们发射到最终水槽之前缓冲它们。这是避免对数据库或外部存储系统进行许多昂贵调用的常用方法。为了以容错方式进行缓冲,缓冲数据元保存在列表( bufferedElements
)中,该列表定期检查点。
状态 API 迁移
要利用 Flink 1.2 的新函数,应修改上面的代码以使用新的状态抽象。完成这些更改后,您将能够更改作业的并行度(向上或向下扩展),并确保您的新版本的作业将从其前任停止的位置开始。
Keys 状态: 在深入研究迁移过程的细节之前需要注意的是,如果您的函数 只有被 Keys 化状态 ,那么 Flink 1.1 的完全相同的代码也适用于 Flink 1.2,完全支持新函数和完全向后兼容性。可以仅针对更好的代码组织进行更改,但这仅仅是一种风格问题。
如上所述,本节的其余部分重点介绍 非被 Keys 化状态 。
重新缩放和新状态抽象
第一个修改是从旧 Checkpointed<T extends Serializable>
状态接口到新状态接口的转换。在 Flink 1.2 中,有状态函数可以实现更通用的 CheckpointedFunction
接口或 ListCheckpointed<T extends Serializable>
接口,它在语义上更接近旧接口 Checkpointed
。
在这两种情况中,非键合状态被预期是一个 List
的 序列化的 对象,彼此独立的,因而在重新缩放获再分配。换句话说,这些对象是可以重新分区非被 Keys 化状态的最细粒度。例如,如果并行性 1 BufferingSink
包含数据元的检查点状态, (test1, 2)
并且 (test2, 2)
当将并行性增加到 2 时, (test1, 2)
可能最终在任务 0 中,而 (test2, 2)
将进入任务 1。
有关被 Keys 化状态和非被 Keys 化状态重新缩放的原则的更多详细信息,请参阅 状态文档 。
ListCheckpointed
该 ListCheckpointed
接口需要实现两种方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
它们的语义与旧 Checkpointed
界面中的对应物相同。唯一的区别是现在 snapshotState()
应该将对象列表返回到检查点,如前所述,并且 restoreState
必须在恢复时处理此列表。如果状态不是重新分区,可以随时返回 Collections.singletonList(MY_STATE)
的 snapshotState()
。更新的代码 BufferingSink
包括在下面:
public class BufferingSinkListCheckpointed implements
SinkFunction<Tuple2<String, Integer>>,
ListCheckpointed<Tuple2<String, Integer>>,
CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSinkListCheckpointed(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
this.bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public List<Tuple2<String, Integer>> snapshotState(
long checkpointId, long timestamp) throws Exception {
return this.bufferedElements;
}
@Override
public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
if (!state.isEmpty()) {
this.bufferedElements.addAll(state);
}
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
}
如代码所示,更新的函数也实现了 CheckpointedRestoring
接口。这是出于向后兼容性原因,更多细节将在本节末尾解释。
CheckpointedFunction
该 CheckpointedFunction
接口需要再次执行两种方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
与在 Flink 1.1 中一样, snapshotState()
每当执行检查点时都会调用它,但是每次初始化用户定义的函数时,都会调用 initializeState()
它(它的对应部分 restoreState()
),而不是仅在我们从故障中恢复的情况下。鉴于此, initializeState()
不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑。 CheckpointedFunction
接口的实现 BufferingSink
如下所示。
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
checkpointedState = context.getOperatorStateStore().
getSerializableListState("buffered-elements");
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
}
在 initializeState
把参数作为一个 FunctionInitializationContext
。这用于初始化非被 Keys 化状态“容器”。这是一个类型的容器, ListState
其中非被 Keys 化状态对象将在检查点存储:
this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");
在初始化容器之后,我们使用 isRestored()
上下文的方法来检查我们是否在失败后恢复。如果是这样 true
, 即 我们正在恢复,则应用恢复逻辑。
如修改的代码所示,在状态初始化期间恢复的 BufferingSink
这个 ListState
被保存在类变量中以供将来使用 snapshotState()
。在那里, ListState
被清除由先前的检查点包含的所有对象,然后填充我们要设置检查点新的。
作为旁注,被 Keys 化状态也可以在 initializeState()
方法中初始化。这可以使用 FunctionInitializationContext
给定的参数来完成,而不是 RuntimeContext
Flink 1.1 的情况。如果 CheckpointedFunction
要在 CountMapper
示例中使用该接口,则 open()
可以删除旧方法,new snapshotState()
和 initializeState()
方法如下所示:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
implements CheckpointedFunction {
private transient ValueState<Integer> counter;
private final int numberElements;
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// all managed, nothing to do.
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
counter = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
}
请注意,该 snapshotState()
方法为空,因为 Flink 本身负责在检查点时 SNAPSHOT 托管的被 Keys 化状态。
向后兼容 Flink 1.1
到目前为止,我们已经看到如何修改我们的函数以利用 Flink 1.2 引入的新函数。剩下的问题是“我可以确保我的修改后的(Flink 1.2)作业将从我从 Flink 1.1 运行的作业停止的位置开始吗?”。
答案是肯定的,而这样做的方式非常简单。对于被 Keys 化状态,您必须什么都不做。Flink 将负责从 Flink 1.1 恢复状态。对于非被 Keys 化状态,新函数必须实现 CheckpointedRestoring
接口,如上面的代码所示。这有一个方法,熟悉 Flink 1.1 restoreState()
的旧 Checkpointed
接口。如修改后的代码所示 BufferingSink
,该 restoreState()
方法与其前身相同。
对齐处理时间窗口 算子
在 Flink 1.1 中,只有在没有指定的逐出器或触发器的 处理时间 上运行 时 , timeWindow()
被 Key 化的数据流上的命令才会实例化特殊类型 WindowOperator
。这可以是一个 AggregatingProcessingTimeWindowOperator
或一个 AccumulatingProcessingTimeWindowOperator
。这两个 算子都被称为 对齐 窗口 算子,因为它们假设它们的输入数据元按顺序到达。这在处理时间内有效,因为数据元在到达窗口算子时获得挂钟时间的时间戳。这些 算子仅限于使用内存状态后台,并且具有优化的数据结构,用于存储利用有序输入数据元到达的每窗口数据元。
在 Flink 1.2 中,不推荐使用对齐的窗口 算子,并且所有窗口 算子操作都通过泛型 WindowOperator
。此迁移不需要更改 Flink 1.1 作业的代码,因为 Flink 将透明地读取 Flink 1.1 保存点中对齐的窗口 算子存储的状态,将其转换为与泛型相兼容的格式 WindowOperator
,并使用通用的 WindowOperator
。
注意尽管已弃用,但您仍然可以使用 Flink 1.2 中的对齐窗口 算子,通过特殊 WindowAssigners
介绍来实现此目的。这些 assigners 是 SlidingAlignedProcessingTimeWindows
和 TumblingAlignedProcessingTimeWindows
assigners,滑动和翻滚分别窗口。使用对齐窗口的 Flink 1.2 作业必须是一项新工作,因为在使用这些 算子时无法从 Flink 1.1 保存点恢复执行。
注意对齐的窗口 算子不提供 重新缩放 函数, 也不 提供与 Flink 1.1 的 向后兼容性 。
在 Flink 1.2 中使用对齐窗口 算子的代码如下所示:
// for tumbling windows
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
.apply(your-function)
// for sliding windows
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.apply(your-function)
// for tumbling windows val window1 = source
.keyBy(0)
.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
.apply(your-function)
// for sliding windows val window2 = source
.keyBy(0)
.window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.apply(your-function)
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论