- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
执行配置
该 StreamExecutionEnvironment
包含 ExecutionConfig
允许为运行时设置工作的具体配置值。要更改影响所有作业的默认值,请参阅 配置 。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
val env = StreamExecutionEnvironment.getExecutionEnvironment
var executionConfig = env.getConfig
可以使用以下配置选项:(默认为粗体)
enableClosureCleaner()
/disableClosureCleaner()
。默认情况下启用闭包清理器。闭包清理器删除 Flink 程序中对周围类匿名函数的不需要的引用。禁用闭包清除程序后,可能会发生匿名用户函数引用周围的类(通常不是 Serializable)。这将导致序列化程序出现异常。getParallelism()
/setParallelism(int parallelism)
设置作业的默认并行度。getMaxParallelism()
/setMaxParallelism(int parallelism)
设置作业的默认最大并行度。此设置确定最大并行度并指定动态缩放的上限。getNumberOfExecutionRetries()
/setNumberOfExecutionRetries(int numberOfExecutionRetries)
设置重新执行失败任务的次数。值为零可有效禁用容错。值为-1
表示应使用系统默认值(在配置中定义)。这已弃用,请改用 重启策略 。getExecutionRetryDelay()
/setExecutionRetryDelay(long executionRetryDelay)
设置在重新执行作业之前系统在作业失败后等待的延迟(以毫秒为单位)。在 TaskManagers 上成功停止所有任务后,延迟开始,一旦延迟过去,任务就会重新启动。此参数对于延迟重新执行非常有用,以便在尝试重新执行之前让某些超时相关故障完全浮出水面(例如尚未完全超时的断开连接),并且由于同样的问题而再次立即失败。仅当执行重试次数为一次或多次时,此参数才有效。这已弃用,请改用 重启策略 。getExecutionMode()
/setExecutionMode()
。默认执行模式为 PIPELINED。设置执行模式以执行程序。执行模式定义数据交换是以批处理还是以流水线方式执行。enableForceKryo()
/disableForceKryo
。Kryo 默认不会被迫。强制 GenericTypeInformation 将 Pryo 序列化程序用于 POJO,即使我们可以将它们分析为 POJO。在某些情况下,这可能更可取。例如,当 Flink 的内部序列化程序无法正确处理 POJO 时。enableForceAvro()
/disableForceAvro()
。默认情况下不会强制使用 Avro。强制 Flink AvroTypeInformation 使用 Avro 序列化程序而不是 Kryo 来序列化 Avro POJO。enableObjectReuse()
/disableObjectReuse()
默认情况下,对象不会在 Flink 中重复使用。启用对象重用模式将指示运行时重用用户对象以获得更好的性能。请记住,当 算子操作的用户代码函数不知道此行为时,这可能会导致错误。enableSysoutLogging()
/disableSysoutLogging()
JobManager 状态更新System.out
默认打印到。此设置允许禁用此行为。getGlobalJobParameters()
/setGlobalJobParameters()
此方法允许用户将自定义对象设置为作业的全局配置。由于ExecutionConfig
可以在所有用户定义的函数中访问,因此这是一种在作业中全局可用的配置的简单方法。addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer)
为给定的注册 Kryo 序列化程序实例type
。addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
为给定的注册 Kryo 序列化程序类type
。registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer)
使用 Kryo 注册给定类型并为其指定序列化程序。通过使用 Kryo 注册类型,类型的序列化将更加高效。registerKryoType(Class<?> type)
如果类型最终被 Kryo 序列化,那么它将在 Kryo 注册以确保只写入标签(整数 ID)。如果某个类型未在 Kryo 中注册,则其整个类名将与每个实例序列化,从而导致更高的 I / O 成本。registerPojoType(Class<?> type)
使用序列化堆栈注册给定类型。如果类型最终被序列化为 POJO,则该类型将在 POJO 序列化程序中注册。如果类型最终被 Kryo 序列化,那么它将在 Kryo 注册以确保只写入标签。如果某个类型未在 Kryo 中注册,则其整个类名将与每个实例序列化,从而导致更高的 I / O 成本。
请注意,注册的类型 registerKryoType()
不适用于 Flink 的 Kryo 序列化程序实例。
disableAutoTypeRegistration()
默认情况下启用自动类型注册。自动类型注册是使用 Kryo 和 POJO 序列化器注册用户代码使用的所有类型(包括子类型)。setTaskCancellationInterval(long interval)
设置在连续尝试取消正在运行的任务之间等待的间隔(以毫秒为单位)。取消interrupt()
任务时,如果任务线程未在特定时间内终止,则创建新线程,该线程定期调用任务线程。此参数是指连续呼叫之间的时间interrupt()
,默认设置为 30000 毫秒或 30 秒 。
的 RuntimeContext
是在可访问的 Rich*
通过的函数的 getRuntimeContext()
方法还允许访问 ExecutionConfig
中的所有用户定义的函数。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论