- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
数据类型和序列化
Apache Flink 以独特的方式处理数据类型和序列化,包含自己的类型描述符,泛型类型提取和类型序列化框架。本文档描述了它们背后的概念和基本原理。
Flink 中的类型处理
Flink 尝试推断有关在分布式计算期间交换和存储的数据类型的大量信息。可以把它想象成一个推断表格架构的数据库。在大多数情况下,Flink 自己无缝地推断所有必要的信息。拥有类型信息允许 Flink 做一些很酷的事情:
- 使用 POJO 类型并通过引用字段名称(如
dataSet.keyBy("username")
)来分组/Join/聚合它们。类型信息允许 Flink 提前检查(用于拼写错误和类型兼容性),而不是稍后在运行时失败。 - Flink 对数据类型的了解越多,序列化和数据布局方案就越好。这对于 Flink 中的内存使用范例非常重要(尽可能处理堆内部/外部的序列化数据并使序列化非常便宜)。
- 最后,它还使大多数情况下的用户免于担心序列化框架和必须注册类型。
一般来说,在需要有关数据类型的信息 飞行前阶段 -也就是,当程序的调用 DataStream
和 DataSet
制成,任何调用之前 execute()
, print()
, count()
,或 collect()
。
最常见的问题
用户需要与 Flink 的数据类型处理进行交互的最常见问题是:
- 注册子类型: 如果函数签名仅描述超类型,但实际上它们在执行期间使用了这些类型的子类型,则可能会大大提高性能,使 Flink 了解这些子类型。为此,呼吁
.registerType(clazz)
对StreamExecutionEnvironment
或者ExecutionEnvironment
每个亚型。 - 注册自定义序列化程序: Flink 会回退到 Kryo ,因为它本身无法透明地处理这些类型。并非所有类型都由 Kryo(以及 Flink)无缝处理。例如,默认情况下,许多 Google Guava 集合类型无法正常工作。解决方案是为导致问题的类型注册其他序列化程序。打电话
.getConfig().addDefaultKryoSerializer(clazz, serializer)
给StreamExecutionEnvironment
或ExecutionEnvironment
。许多库中都提供了其他 Kryo 序列化程序。有关使用 自定义序列化 程序的详细信息,请参阅 自定义序列 化程 - 添加类型提示: 有时,尽管有所有技巧,但 Flink 无法推断泛型类型时,用户必须传递 类型提示 。这通常只在 Java API 中是必需的。该 类型提示章节 描述了更多的细节。
- 手动创建
TypeInformation
: 这对于某些 API 调用可能是必要的,因为由于 Java 泛型类型擦除,Flink 无法推断数据类型。有关 详细信息,请参阅 创建 TypeInformation 或 TypeSerializer 。
Flink 的 TypeInformation 类
类 TypeInformation 是所有类型描述符的基类。它揭示了该类型的一些基本属性,并且可以生成序列化器,并且在专业化中,可以生成类型的比较器。( 注意 Flink 中的比较器不仅仅是定义一个顺序 - 它们基本上是处理键的实用程序 )
在内部,Flink 在类型之间做出以下区分:
- 基本类型:所有的 Java 原语及其盒装形式,加
void
,String
,Date
,BigDecimal
,和BigInteger
。 - 基元数组和对象数组
- 复合类型
- Flink Java Tuples(Flink Java API 的一部分):最多 25 个字段,不支持空字段
- Scala Case Class (包括 Scala 元组):最多 22 个字段,不支持空字段
- Row:具有任意数量字段的元组并支持空字段
- POJO:遵循某种类似 bean 的模式的类
- 辅助类型(选项,任一,列表,Map,......)
- 通用类型:这些不会被 Flink 本身序列化,而是由 Kryo 序列化。
POJO 特别令人感兴趣,因为它们支持复杂类型的创建以及在键的定义中使用字段名称: dataSet.join(another).where("name").equalTo("personName")
。它们对运行时也是透明的,并且可以由 Flink 非常有效地处理。
POJO 类型的规则
如果满足以下条件,Flink 会将数据类型识别为 POJO 类型(并允许“按名称”字段引用):
- 该类是公共的和独立的(没有非静态内部类)
- 该类有一个公共的无参数构造函数
- 类(以及所有超类)中的所有非静态非瞬态字段都是公共的(和非最终的)或者具有公共 getter 和 setter 方法,该方法遵循 getter 和 setter 的 Java bean 命名约定。
请注意,当用户定义的数据类型无法识别为 POJO 类型时,必须将其作为 GenericType 处理并使用 Kryo 进行序列化。
创建 TypeInformation 或 TypeSerializer
要为类型创建 TypeInformation 对象,请使用特定于语言的方式:
因为 Java 通常会擦除泛型类型信息,所以需要将类型传递给 TypeInformation 构造:
对于非泛型类型,您可以传递类:
TypeInformation<String> info = TypeInformation.of(String.class);
对于泛型类型,您需要通过以下方式“捕获”泛型类型信息 TypeHint
:
TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
在内部,这将创建 TypeHint 的匿名子类,捕获通用信息以将其保存到运行时。
In Scala, Flink uses macros that runs at compile time and captures all generic type information while it is still available.
// important: this import is needed to access the 'createTypeInformation' macro function import org.apache.flink.streaming.api.scala._
val stringInfo: TypeInformation[String] = createTypeInformation[String]
val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]
You can still use the same method as in Java as a fallback.
要创建一个 TypeSerializer
,只需调用 typeInfo.createSerializer(config)
该 TypeInformation
对象即可。
该 config
参数是类型 ExecutionConfig
和保存有关该计划的注册的自定义序列化的信息。尽可能尝试将程序传递给 ExecutionConfig。您通常可以通过电话 DataStream
或 DataSet
通过电话获取 getExecutionConfig()
。在内部函数(如 MapFunction
)中,您可以通过使函数成为和调用来获得它 getRuntimeContext().getExecutionConfig()
。
在 Scala API 中类型信息
尽管 类型清单 和 类标记, Scala 对运行时类型信息有非常精细的概念。通常,类型和方法可以访问其泛型参数的类型 - 因此,Scala 程序不会像 Java 程序那样遭受类型擦除。
此外,Scala 允许通过 Scala 宏在 Scala 编译器中运行自定义代码 - 这意味着无论何时编译针对 Flink 的 Scala API 编写的 Scala 程序,都会执行一些 Flink 代码。
我们使用宏来查看编译期间所有用户函数的参数类型和返回类型 - 这是当然所有类型信息都完全可用的时间点。在宏中,我们为函数的返回类型(或参数类型)创建一个 TypeInformation ,并使其成为 算子操作的一部分。
证据参数误差无隐含值
在无法创建 TypeInformation 的情况下,程序无法编译,并显示 “无法找到 TypeInformation 类型的证据参数的隐式值” 的错误。
如果尚未导入生成 TypeInformation 的代码的常见原因。确保导入整个 flink.api.scala 包。
import org.apache.flink.api.scala._
另一个常见原因是通用方法,可以按照以下部分所述进行修复。
通用方法
请考虑以下情况:
def selectFirst[T](input: DataSet[(T, _)]) : DataSet[T] = {
input.map { v => v._1 }
}
val data : DataSet[(String, Long) = ...
val result = selectFirst(data)
对于这样的通用方法,函数参数和返回类型的数据类型对于每个调用可能不相同,并且在定义方法的站点处不知道。上面的代码将导致错误,即没有足够的隐式证据可用。
在这种情况下,必须在调用站点生成类型信息并将其传递给方法。Scala 为此提供 隐式参数 。
以下代码告诉 Scala 将 T 的类型信息带入函数。然后,将在调用方法的站点生成类型信息,而不是在定义方法的位置生成类型信息。
def selectFirst[T : TypeInformation](input: DataSet[(T, _)]) : DataSet[T] = {
input.map { v => v._1 }
}
在 Java API 中类型信息
在一般情况下,Java 会擦除泛型类型信息。Flink 尝试使用 Java 保存的少量位(主要是函数签名和子类信息)通过反射重建尽可能多的类型信息。对于函数的返回类型取决于其输入类型的情况,此逻辑还包含一些简单的类型推断:
public class AppendOne<T> extends MapFunction<T, Tuple2<T, Long>> {
public Tuple2<T, Long> map(T value) {
return new Tuple2<T, Long>(value, 1L);
}
}
在某些情况下,Flink 无法重建所有通用类型信息。在这种情况下,用户必须通过 类型提示 帮助。
在 Java API 中类型提示
在 Flink 无法重建已擦除的泛型类型信息的情况下,Java API 提供所谓的 类型提示 。类型提示告诉系统函数生成的数据流或数据集的类型:
DataSet<SomeType> result = dataSet
.map(new MyGenericNonInferrableFunction<Long, SomeType>())
.returns(SomeType.class);
该 returns
语句指定生成的类型,在本例中通过类。提示支持通过类型定义
- 用于非参数化类型的类(无泛型)
- TypeHints 的形式
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
。该TypeHint
班可以捕获泛型类型信息,并保存它运行时(通过一个匿名子类)。
Java 8 lambdas 的类型提取
Java 8 lambdas 的类型提取与非 lambdas 的工作方式不同,因为 lambda 不与扩展函数接口的实现类相关联。
目前,Flink 试图找出实现 lambda 的方法,并使用 Java 通用签名来确定参数类型和返回类型。但是,并非所有编译器都为 lambda 生成这些签名(从 4.5 开始,Eclipse JDT 编译器只能可靠地编写本文档)。
POJO 类型的序列化
PojoTypeInformation 正在为 POJO 中的所有字段创建序列化器。标准类型(如 int,long,String 等)由我们随 Flink 提供的序列化程序处理。对于所有其他类型,我们回到 Kryo。
如果 Kryo 无法处理该类型,您可以要求 PojoTypeInfo 使用 Avro 序列化 POJO。要这样做,你必须打电话
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();
请注意,Flink 会使用 Avro 序列化程序自动序列化 Avro 生成的 POJO。
如果您希望 Kryo 序列化程序处理 整个 POJO 类型,请进行设置
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();
如果 Kryo 无法序列化您的 POJO,您可以使用自定义序列化程序添加到 Kryo
env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
这些方法有不同的变体可供选择。
禁用 Kryo 回退
在某些情况下,程序可能希望明确避免将 Kryo 用作泛型类型的回退。最常见的是希望确保通过 Flink 自己的序列化程序或通过用户定义的自定义序列化程序有效地序列化所有类型。
每当遇到将通过 Kryo 的数据类型时,下面的设置将引发异常:
env.getConfig().disableGenericTypes();
使用工厂定义类型信息
类型信息工厂允许将用户定义的类型信息插入 Flink 类型系统。您必须实现 org.apache.flink.api.common.typeinfo.TypeInfoFactory
以返回自定义类型信息。如果相应的类型已使用 @org.apache.flink.api.common.typeinfo.TypeInfo
注释进行注释,则在类型提取阶段调用工厂。
类型信息工厂可以在 Java 和 Scala API 中使用。
在类型的层次结构中,在向上遍历时将选择最接近的工厂,但是,内置工厂具有最高优先级。工厂的优先级也高于 Flink 的内置类型,因此您应该知道自己在做什么。
以下示例说明如何 MyTuple
使用 Java 中的工厂注释自定义类型并为其提供自定义类型信息。
带注释的自定义类型:
@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0, T1> {
public T0 myfield0;
public T1 myfield1;
}
工厂提供自定义类型信息:
public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {
@Override
public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
}
}
该方法 createTypeInfo(Type, Map<String, TypeInformation<?>>)
为工厂所针对的类型创建类型信息。参数提供有关类型本身的其他信息以及类型的泛型类型参数(如果可用)。
如果您的类型包含可能需要从 Flink 函数的输入类型派生的泛型参数,请确保还实现 org.apache.flink.api.common.typeinfo.TypeInformation#getGenericParameters
泛型参数到类型信息的双向映射。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论