返回介绍

执行配置

发布于 2025-05-02 18:19:16 字数 4349 浏览 0 评论 0 收藏

StreamExecutionEnvironment 包含 ExecutionConfig 允许为运行时设置工作的具体配置值。要更改影响所有作业的默认值,请参阅 配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
val env = StreamExecutionEnvironment.getExecutionEnvironment
var executionConfig = env.getConfig

可以使用以下配置选项:(默认为粗体)

  • enableClosureCleaner() / disableClosureCleaner() 。默认情况下启用闭包清理器。闭包清理器删除 Flink 程序中对周围类匿名函数的不需要的引用。禁用闭包清除程序后,可能会发生匿名用户函数引用周围的类(通常不是 Serializable)。这将导致序列化程序出现异常。
  • getParallelism() / setParallelism(int parallelism) 设置作业的默认并行度。
  • getMaxParallelism() / setMaxParallelism(int parallelism) 设置作业的默认最大并行度。此设置确定最大并行度并指定动态缩放的上限。
  • getNumberOfExecutionRetries() / setNumberOfExecutionRetries(int numberOfExecutionRetries) 设置重新执行失败任务的次数。值为零可有效禁用容错。值为 -1 表示应使用系统默认值(在配置中定义)。这已弃用,请改用 重启策略
  • getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay) 设置在重新执行作业之前系统在作业失败后等待的延迟(以毫秒为单位)。在 TaskManagers 上成功停止所有任务后,延迟开始,一旦延迟过去,任务就会重新启动。此参数对于延迟重新执行非常有用,以便在尝试重新执行之前让某些超时相关故障完全浮出水面(例如尚未完全超时的断开连接),并且由于同样的问题而再次立即失败。仅当执行重试次数为一次或多次时,此参数才有效。这已弃用,请改用 重启策略
  • getExecutionMode() / setExecutionMode() 。默认执行模式为 PIPELINED。设置执行模式以执行程序。执行模式定义数据交换是以批处理还是以流水线方式执行。
  • enableForceKryo() / disableForceKryo 。Kryo 默认不会被迫。强制 GenericTypeInformation 将 Pryo 序列化程序用于 POJO,即使我们可以将它们分析为 POJO。在某些情况下,这可能更可取。例如,当 Flink 的内部序列化程序无法正确处理 POJO 时。
  • enableForceAvro() / disableForceAvro() 。默认情况下不会强制使用 Avro。强制 Flink AvroTypeInformation 使用 Avro 序列化程序而不是 Kryo 来序列化 Avro POJO。
  • enableObjectReuse() / disableObjectReuse() 默认情况下,对象不会在 Flink 中重复使用。启用对象重用模式将指示运行时重用用户对象以获得更好的性能。请记住,当 算子操作的用户代码函数不知道此行为时,这可能会导致错误。
  • enableSysoutLogging() / disableSysoutLogging() JobManager 状态更新 System.out 默认打印到。此设置允许禁用此行为。
  • getGlobalJobParameters() / setGlobalJobParameters() 此方法允许用户将自定义对象设置为作业的全局配置。由于 ExecutionConfig 可以在所有用户定义的函数中访问,因此这是一种在作业中全局可用的配置的简单方法。
  • addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) 为给定的注册 Kryo 序列化程序实例 type
  • addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) 为给定的注册 Kryo 序列化程序类 type
  • registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) 使用 Kryo 注册给定类型并为其指定序列化程序。通过使用 Kryo 注册类型,类型的序列化将更加高效。
  • registerKryoType(Class<?> type) 如果类型最终被 Kryo 序列化,那么它将在 Kryo 注册以确保只写入标签(整数 ID)。如果某个类型未在 Kryo 中注册,则其整个类名将与每个实例序列化,从而导致更高的 I / O 成本。
  • registerPojoType(Class<?> type) 使用序列化堆栈注册给定类型。如果类型最终被序列化为 POJO,则该类型将在 POJO 序列化程序中注册。如果类型最终被 Kryo 序列化,那么它将在 Kryo 注册以确保只写入标签。如果某个类型未在 Kryo 中注册,则其整个类名将与每个实例序列化,从而导致更高的 I / O 成本。

请注意,注册的类型 registerKryoType() 不适用于 Flink 的 Kryo 序列化程序实例。

  • disableAutoTypeRegistration() 默认情况下启用自动类型注册。自动类型注册是使用 Kryo 和 POJO 序列化器注册用户代码使用的所有类型(包括子类型)。
  • setTaskCancellationInterval(long interval) 设置在连续尝试取消正在运行的任务之间等待的间隔(以毫秒为单位)。取消 interrupt() 任务时,如果任务线程未在特定时间内终止,则创建新线程,该线程定期调用任务线程。此参数是指连续呼叫之间的时间 interrupt() ,默认设置为 30000 毫秒或 30 秒

RuntimeContext 是在可访问的 Rich* 通过的函数的 getRuntimeContext() 方法还允许访问 ExecutionConfig 中的所有用户定义的函数。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。