返回介绍

数据类型和序列化

发布于 2025-05-02 18:19:16 字数 9681 浏览 0 评论 0 收藏

Apache Flink 以独特的方式处理数据类型和序列化,包含自己的类型描述符,泛型类型提取和类型序列化框架。本文档描述了它们背后的概念和基本原理。

Flink 中的类型处理

Flink 尝试推断有关在分布式计算期间交换和存储的数据类型的大量信息。可以把它想象成一个推断表格架构的数据库。在大多数情况下,Flink 自己无缝地推断所有必要的信息。拥有类型信息允许 Flink 做一些很酷的事情:

  • 使用 POJO 类型并通过引用字段名称(如 dataSet.keyBy("username") )来分组/Join/聚合它们。类型信息允许 Flink 提前检查(用于拼写错误和类型兼容性),而不是稍后在运行时失败。
  • Flink 对数据类型的了解越多,序列化和数据布局方案就越好。这对于 Flink 中的内存使用范例非常重要(尽可能处理堆内部/外部的序列化数据并使序列化非常便宜)。
  • 最后,它还使大多数情况下的用户免于担心序列化框架和必须注册类型。

一般来说,在需要有关数据类型的信息 飞行前阶段 -也就是,当程序的调用 DataStreamDataSet 制成,任何调用之前 execute()print()count() ,或 collect()

最常见的问题

用户需要与 Flink 的数据类型处理进行交互的最常见问题是:

  • 注册子类型: 如果函数签名仅描述超类型,但实际上它们在执行期间使用了这些类型的子类型,则可能会大大提高性能,使 Flink 了解这些子类型。为此,呼吁 .registerType(clazz)StreamExecutionEnvironment 或者 ExecutionEnvironment 每个亚型。
  • 注册自定义序列化程序: Flink 会回退到 Kryo ,因为它本身无法透明地处理这些类型。并非所有类型都由 Kryo(以及 Flink)无缝处理。例如,默认情况下,许多 Google Guava 集合类型无法正常工作。解决方案是为导致问题的类型注册其他序列化程序。打电话 .getConfig().addDefaultKryoSerializer(clazz, serializer)StreamExecutionEnvironmentExecutionEnvironment 。许多库中都提供了其他 Kryo 序列化程序。有关使用 自定义序列化 程序的详细信息,请参阅 自定义序列 化程
  • 添加类型提示: 有时,尽管有所有技巧,但 Flink 无法推断泛型类型时,用户必须传递 类型提示 。这通常只在 Java API 中是必需的。该 类型提示章节 描述了更多的细节。
  • 手动创建 TypeInformation 这对于某些 API 调用可能是必要的,因为由于 Java 泛型类型擦除,Flink 无法推断数据类型。有关 详细信息,请参阅 创建 TypeInformation 或 TypeSerializer

Flink 的 TypeInformation 类

TypeInformation 是所有类型描述符的基类。它揭示了该类型的一些基本属性,并且可以生成序列化器,并且在专业化中,可以生成类型的比较器。( 注意 Flink 中的比较器不仅仅是定义一个顺序 - 它们基本上是处理键的实用程序 )

在内部,Flink 在类型之间做出以下区分:

  • 基本类型:所有的 Java 原语及其盒装形式,加 voidStringDateBigDecimal ,和 BigInteger
  • 基元数组和对象数组
  • 复合类型
    • Flink Java Tuples(Flink Java API 的一部分):最多 25 个字段,不支持空字段
    • Scala Case Class (包括 Scala 元组):最多 22 个字段,不支持空字段
    • Row:具有任意数量字段的元组并支持空字段
    • POJO:遵循某种类似 bean 的模式的类
  • 辅助类型(选项,任一,列表,Map,......)
  • 通用类型:这些不会被 Flink 本身序列化,而是由 Kryo 序列化。

POJO 特别令人感兴趣,因为它们支持复杂类型的创建以及在键的定义中使用字段名称: dataSet.join(another).where("name").equalTo("personName") 。它们对运行时也是透明的,并且可以由 Flink 非常有效地处理。

POJO 类型的规则

如果满足以下条件,Flink 会将数据类型识别为 POJO 类型(并允许“按名称”字段引用):

  • 该类是公共的和独立的(没有非静态内部类)
  • 该类有一个公共的无参数构造函数
  • 类(以及所有超类)中的所有非静态非瞬态字段都是公共的(和非最终的)或者具有公共 getter 和 setter 方法,该方法遵循 getter 和 setter 的 Java bean 命名约定。

请注意,当用户定义的数据类型无法识别为 POJO 类型时,必须将其作为 GenericType 处理并使用 Kryo 进行序列化。

创建 TypeInformation 或 TypeSerializer

要为类型创建 TypeInformation 对象,请使用特定于语言的方式:

因为 Java 通常会擦除泛型类型信息,所以需要将类型传递给 TypeInformation 构造:

对于非泛型类型,您可以传递类:

TypeInformation<String> info = TypeInformation.of(String.class);

对于泛型类型,您需要通过以下方式“捕获”泛型类型信息 TypeHint

TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});

在内部,这将创建 TypeHint 的匿名子类,捕获通用信息以将其保存到运行时。

In Scala, Flink uses macros that runs at compile time and captures all generic type information while it is still available.

// important: this import is needed to access the 'createTypeInformation' macro function import org.apache.flink.streaming.api.scala._

val stringInfo: TypeInformation[String] = createTypeInformation[String]

val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]

You can still use the same method as in Java as a fallback.

要创建一个 TypeSerializer ,只需调用 typeInfo.createSerializer(config)TypeInformation 对象即可。

config 参数是类型 ExecutionConfig 和保存有关该计划的注册的自定义序列化的信息。尽可能尝试将程序传递给 ExecutionConfig。您通常可以通过电话 DataStreamDataSet 通过电话获取 getExecutionConfig() 。在内部函数(如 MapFunction )中,您可以通过使函数成为和调用来获得它 getRuntimeContext().getExecutionConfig()



在 Scala API 中类型信息

尽管 类型清单 和 类标记, Scala 对运行时类型信息有非常精细的概念。通常,类型和方法可以访问其泛型参数的类型 - 因此,Scala 程序不会像 Java 程序那样遭受类型擦除。

此外,Scala 允许通过 Scala 宏在 Scala 编译器中运行自定义代码 - 这意味着无论何时编译针对 Flink 的 Scala API 编写的 Scala 程序,都会执行一些 Flink 代码。

我们使用宏来查看编译期间所有用户函数的参数类型和返回类型 - 这是当然所有类型信息都完全可用的时间点。在宏中,我们为函数的返回类型(或参数类型)创建一个 TypeInformation ,并使其成为 算子操作的一部分。

证据参数误差无隐含值

在无法创建 TypeInformation 的情况下,程序无法编译,并显示 “无法找到 TypeInformation 类型的证据参数的隐式值” 的错误。

如果尚未导入生成 TypeInformation 的代码的常见原因。确保导入整个 flink.api.scala 包。

import org.apache.flink.api.scala._

另一个常见原因是通用方法,可以按照以下部分所述进行修复。

通用方法

请考虑以下情况:

def selectFirst[T](input: DataSet[(T, _)]) : DataSet[T] = {
  input.map { v => v._1 }
}

val data : DataSet[(String, Long) = ...

val result = selectFirst(data)

对于这样的通用方法,函数参数和返回类型的数据类型对于每个调用可能不相同,并且在定义方法的站点处不知道。上面的代码将导致错误,即没有足够的隐式证据可用。

在这种情况下,必须在调用站点生成类型信息并将其传递给方法。Scala 为此提供 隐式参数 。

以下代码告诉 Scala 将 T 的类型信息带入函数。然后,将在调用方法的站点生成类型信息,而不是在定义方法的位置生成类型信息。

def selectFirst[T : TypeInformation](input: DataSet[(T, _)]) : DataSet[T] = {
  input.map { v => v._1 }
}


在 Java API 中类型信息

在一般情况下,Java 会擦除泛型类型信息。Flink 尝试使用 Java 保存的少量位(主要是函数签名和子类信息)通过反射重建尽可能多的类型信息。对于函数的返回类型取决于其输入类型的情况,此逻辑还包含一些简单的类型推断:

public class AppendOne<T> extends MapFunction<T, Tuple2<T, Long>> {

  public Tuple2<T, Long> map(T value) {
    return new Tuple2<T, Long>(value, 1L);
  }
}

在某些情况下,Flink 无法重建所有通用类型信息。在这种情况下,用户必须通过 类型提示 帮助。

在 Java API 中类型提示

在 Flink 无法重建已擦除的泛型类型信息的情况下,Java API 提供所谓的 类型提示 。类型提示告诉系统函数生成的数据流或数据集的类型:

DataSet<SomeType> result = dataSet
  .map(new MyGenericNonInferrableFunction<Long, SomeType>())
    .returns(SomeType.class);

returns 语句指定生成的类型,在本例中通过类。提示支持通过类型定义

  • 用于非参数化类型的类(无泛型)
  • TypeHints 的形式 returns(new TypeHint&lt;Tuple2&lt;Integer, SomeType&gt;&gt;(){}) 。该 TypeHint 班可以捕获泛型类型信息,并保存它运行时(通过一个匿名子类)。

Java 8 lambdas 的类型提取

Java 8 lambdas 的类型提取与非 lambdas 的工作方式不同,因为 lambda 不与扩展函数接口的实现类相关联。

目前,Flink 试图找出实现 lambda 的方法,并使用 Java 通用签名来确定参数类型和返回类型。但是,并非所有编译器都为 lambda 生成这些签名(从 4.5 开始,Eclipse JDT 编译器只能可靠地编写本文档)。

POJO 类型的序列化

PojoTypeInformation 正在为 POJO 中的所有字段创建序列化器。标准类型(如 int,long,String 等)由我们随 Flink 提供的序列化程序处理。对于所有其他类型,我们回到 Kryo。

如果 Kryo 无法处理该类型,您可以要求 PojoTypeInfo 使用 Avro 序列化 POJO。要这样做,你必须打电话

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();

请注意,Flink 会使用 Avro 序列化程序自动序列化 Avro 生成的 POJO。

如果您希望 Kryo 序列化程序处理 整个 POJO 类型,请进行设置

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();

如果 Kryo 无法序列化您的 POJO,您可以使用自定义序列化程序添加到 Kryo

env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

这些方法有不同的变体可供选择。

禁用 Kryo 回退

在某些情况下,程序可能希望明确避免将 Kryo 用作泛型类型的回退。最常见的是希望确保通过 Flink 自己的序列化程序或通过用户定义的自定义序列化程序有效地序列化所有类型。

每当遇到将通过 Kryo 的数据类型时,下面的设置将引发异常:

env.getConfig().disableGenericTypes();

使用工厂定义类型信息

类型信息工厂允许将用户定义的类型信息插入 Flink 类型系统。您必须实现 org.apache.flink.api.common.typeinfo.TypeInfoFactory 以返回自定义类型信息。如果相应的类型已使用 @org.apache.flink.api.common.typeinfo.TypeInfo 注释进行注释,则在类型提取阶段调用工厂。

类型信息工厂可以在 Java 和 Scala API 中使用。

在类型的层次结构中,在向上遍历时将选择最接近的工厂,但是,内置工厂具有最高优先级。工厂的优先级也高于 Flink 的内置类型,因此您应该知道自己在做什么。

以下示例说明如何 MyTuple 使用 Java 中的工厂注释自定义类型并为其提供自定义类型信息。

带注释的自定义类型:

@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0, T1> {
  public T0 myfield0;
  public T1 myfield1;
}

工厂提供自定义类型信息:

public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {

  @Override
  public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
  return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
  }
}

该方法 createTypeInfo(Type, Map&lt;String, TypeInformation&lt;?&gt;&gt;) 为工厂所针对的类型创建类型信息。参数提供有关类型本身的其他信息以及类型的泛型类型参数(如果可用)。

如果您的类型包含可能需要从 Flink 函数的输入类型派生的泛型参数,请确保还实现 org.apache.flink.api.common.typeinfo.TypeInformation#getGenericParameters 泛型参数到类型信息的双向映射。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。