返回介绍

为 Flink 程序注册自定义序列化程序

发布于 2025-05-02 18:19:16 字数 3117 浏览 0 评论 0 收藏

如果您在 Flink 程序中使用自定义类型无法通过 Flink 类型序列化程序进行序列化,则 Flink 将回退到使用通用 Kryo 序列化程序。您可以注册自己的序列化程序或序列化系统,如 Google Protobuf 或 Apache Thrift with Kryo。为此,只需在 ExecutionConfig Flink 程序中注册类型类和序列化程序即可。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register the class of the serializer as serializer for a type
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);

// register an instance as serializer for a type
MySerializer mySerializer = new MySerializer();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);

请注意,您的自定义序列化程序必须扩展 Kryo 的 Serializer 类。对于 Google Protobuf 或 Apache Thrift,已经为您完成了这项工作:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register the Google Protobuf serializer with Kryo
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);

// register the serializer included with Apache Thrift as the standard serializer
// TBaseSerializer states it should be initialized as a default Kryo serializer
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);

要使上述示例起作用,您需要在 Maven 项目文件(pom.xml)中包含必要的依赖项。在依赖项部分中,为 Apache Thrift 添加以下内容:

<dependency>
  <groupId>com.twitter</groupId>
  <artifactId>chill-thrift</artifactId>
  <version>0.5.2</version>
</dependency>
<!-- libthrift is required by chill-thrift -->
<dependency>
  <groupId>org.apache.thrift</groupId>
  <artifactId>libthrift</artifactId>
  <version>0.6.1</version>
  <exclusions>
    <exclusion>
      <groupId>javax.servlet</groupId>
      <artifactId>servlet-api</artifactId>
    </exclusion>
    <exclusion>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
    </exclusion>
  </exclusions>
</dependency>

对于 Google Protobuf,您需要以下 Maven 依赖项:

<dependency>
  <groupId>com.twitter</groupId>
  <artifactId>chill-protobuf</artifactId>
  <version>0.5.2</version>
</dependency>
<!-- We need protobuf for chill-protobuf -->
<dependency>
  <groupId>com.google.protobuf</groupId>
  <artifactId>protobuf-java</artifactId>
  <version>2.5.0</version>
</dependency>

请根据需要调整两个库的版本。

使用 Kryo JavaSerializer 的问题

如果您 JavaSerializer 为自定义类型注册 Kryo ,即使您的自定义类型类包含在提交的用户代码 jar 中,您也可能遇到 ClassNotFoundException。这是由于 Kryo 的已知问题 JavaSerializer ,可能会错误地使用错误的类加载器。

在这种情况下,您应该使用它 org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer 来解决问题。这是 JavaSerializer 在 Flink 中重新实现的,确保使用用户代码类加载器。

有关详细信息,请参阅 FLINK-6025

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。