- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
配置
对于单节点设置,Flink 已准备好开箱即用,您无需更改默认配置即可开始使用。
开箱即用的配置将使用您的默认 Java 安装.您可以手动设置环境变量 JAVA_HOME 或配置项 env.java.home 中 conf/flink-conf.yaml ,如果你想手动覆盖 Java 运行时使用。
此页面列出了设置性能良好(分布式)安装通常所需的最常用选项.此外,此处还列出了所有可用配置参数的完整列表。
所有配置都已完成 conf/flink-conf.yaml ,预计将是具有格式的 YAML 键值对 的扁平集合 key: value .
系统和运行脚本在启动时解析配置.对配置文件的更改需要重新启动 Flink JobManager 和 TaskManagers.
TaskManagers 的配置文件可能不同,Flink 不承担集群中的统一机器。
常见选项
| 键 | 默认 | 描述 |
|---|---|---|
jobmanager.heap.size | “1024m” | JobManager 的 JVM 堆大小。 |
taskmanager.heap.size | “1024m” | TaskManagers 的 JVM 堆大小,它是系统的并行工作者.在 YARN 设置中,此值自动配置为 TaskManager 的 YARN 容器的大小,减去一定的容差值。 |
parallelism.default | 1 | |
taskmanager.numberOfTaskSlots | 1 | 单个 TaskManager 可以运行的并行算子或用户函数实例的数量.如果此值大于 1,则单个 TaskManager 将获取函数或 算子的多个实例.这样,TaskManager 可以使用多个 CPU 内核,但同时,可用内存在不同的算子或函数实例之间划分.此值通常与 TaskManager 的计算机具有的物理 CPU 核心数成比例(例如,等于核心数,或核心数的一半). |
state.backend | (none) | 状态后台用于存储和检查点状态。 |
state.checkpoints.dir | (none) | 用于在 Flink 支持的文件系统中存储检查点的数据文件和元数据的默认目录.必须可以从所有参与的进程/节点(即所有 TaskManagers 和 JobManagers)访问存储路径。 |
state.savepoints.dir | (none) | 保存点的默认目录.由将后台写入文件系统的状态后台(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)使用。 |
high-availability | “no / not” | 定义用于群集执行的高可用性模式.要启用高可用性,请将此模式设置为“ZOOKEEPER”. |
high-availability.storageDir | (none) | 文件系统路径(URI)Flink 在高可用性设置中持久保存元数据。 |
security.ssl.internal.enabled | false | 打开 SSL 以进行内部网络通信.可选地,特定组件可以通过它们自己的设置(rpc,数据传输,REST 等)覆盖它。 |
security.ssl.rest.enabled | false | 通过 REST 端点打开 SSL 以进行外部通信。 |
完整参考
HDFS
注意: 不推荐使用这些 Keys,建议使用环境变量配置 Hadoop 路径 HADOOP_CONF_DIR .
这些参数配置 Flink 使用的默认 HDFS.未指定 HDFS 配置的设置必须指定 HDFS 文件的完整路径( hdfs://address:port/path/to/files )文件也将使用默认 HDFS 参数(块大小,复制因子)编写。
fs.hdfs.hadoopconf:Hadoop 文件系统(HDFS)配置 目录 的绝对路径(可选值).指定此值允许程序使用短 URI 引用 HDFS 文件(hdfs:///path/to/files不包括文件 URI 中 NameNode 的地址和端口).如果没有此选项,则可以访问 HDFS 文件,但需要完全限定的 URIhdfs://address:port/path/to/files.此选项还会导致文件编写者获取 HDFS 的块大小和复制因子的默认值.Flink 将在指定目录中查找“core-site.xml”和“hdfs-site.xml”文件。fs.hdfs.hdfsdefault:Hadoop 自己的配置文件“hdfs-default.xml”的绝对路径(DEFAULT:null).fs.hdfs.hdfssite:Hadoop 自己的配置文件“hdfs-site.xml”的绝对路径(DEFAULT:null).
核心
| 键 | 默认 | 描述 |
|---|---|---|
classloader.parent-first-patterns.additional | (none) | 一个(以分号分隔的)模式列表,指定应始终首先通过父 ClassLoader 解析哪些类.模式是一个简单的前缀,它根据完全限定的类名进行检查.这些模式附加到“classloader.parent-first-patterns.default”. |
classloader.parent-first-patterns.default | “java .; <wbr>scala .; <wbr>org.apache.flink .; <wbr>com.esotericsoftware.kryo; <wbr>org.apache.hadoop .; <wbr>javax.annotation .; <wbr>org.slf4j; <wbr>org.apache.log4j; <wbr>org.apache.logging; <wbr>org. apache.commons.logging; <wbr>ch.qos.logback“ | 一个(以分号分隔的)模式列表,指定应始终首先通过父 ClassLoader 解析哪些类.模式是一个简单的前缀,它根据完全限定的类名进行检查.通常不应修改此设置.要添加其他模式,我们建议使用“classloader.parent-first-patterns.additional”. |
classloader.resolve-order | "child-first" | 从用户代码加载类时定义类解析策略,这意味着是首先检查用户代码 jar(“child-first”)还是应用程序类路径(“parent-first”).默认设置指示首先从用户代码 jar 加载类,这意味着用户代码 jar 可以包含和加载不同于 Flink 使用的(依赖)依赖项。 |
io.tmp.dirs | YARN 上的'LOCAL_DIRS'.Mesos 上的'_FLINK_TMP_DIR'.独立的 System.getProperty(“java.io.tmpdir”). | |
mode | "new" | 切换到选择执行模式.可能的值为“new”和“legacy”. |
parallelism.default | 1 |
JobManager
| 键 | 默认 | 描述 |
|---|---|---|
jobmanager.archive.fs.dir | (none) | |
jobmanager.execution.attempts-history-size | 16 | 历史记录中保存的最大执行尝试次数。 |
jobmanager.execution.failover-strategy | full | 此选项指定作业计算如何从任务失败中恢复.可接受的值是: |
| 'full':重新启动所有任务。 | ||
| 'individual':仅重新启动失败的任务.仅当所有任务都是独立组件时才应使用。 | ||
| 'region':重新启动可能受任务失败影响的所有任务。 | ||
jobmanager.heap.size | “1024m” | JobManager 的 JVM 堆大小。 |
jobmanager.resourcemanager.reconnect-interval | 2000 | 此选项指定在与资源管理器的连接丢失时触发资源管理器重新连接的时间间隔.此选项仅供内部使用。 |
jobmanager.rpc.address | (none) | config 参数定义要连接的网络地址以与 JobManager 进行通信.此值仅在具有静态名称或地址的单个 JobManager 存在的设置中解释(简单的独立设置或具有动态服务名称解析的容器设置).当使用 Leader 选举服务(如 ZooKeeper)从潜在的多个 Slave JobManagers 中选择和发现 JobManagerLeader 时,它不会在许多高可用性设置中使用。 |
jobmanager.rpc.port | 6123 | config 参数定义要连接的网络端口以与 JobManager 进行通信.与 jobmanager.rpc.address 一样,此值仅在设置中解释,其中存在具有静态名称/地址和端口的单个 JobManager(简单的独立设置或具有动态服务名称解析的容器设置).当使用 Leader 选举服务(如 ZooKeeper)从潜在的多个 Slave JobManagers 中选择和发现 JobManagerLeader 时,此配置选项不会用于许多高可用性设置。 |
jobstore.cache-size | 52428800 | 作业存储缓存大小(以字节为单位),用于将已完成的作业保存在内存中。 |
jobstore.expiration-time | 3600 | 完成作业到期并从作业库中清除的时间(以秒为单位). |
slot.idle.timeout | 50000 | Slot Pool 中空闲槽的超时时间(以 ms 为单位). |
slot.request.timeout | 300000 | 从 Slot Pool 请求插槽的超时(以 ms 为单位). |
TaskManager
| 键 | 默认 | 描述 |
|---|---|---|
task.cancellation.interval | 30000 | 两次连续任务取消尝试之间的时间间隔(以 ms 为单位). |
task.cancellation.timeout | 180000 | 超时(以 ms 为单位),在此之后任务取消超时并导致致命的 TaskManager 错误.值为 0 将禁用看门狗。 |
task.cancellation.timers.timeout | 7500 | |
task.checkpoint.alignment.max-size | -1 | 检查点对齐可以缓冲的最大字节数.如果检查点对齐缓冲超过配置的数据量,则中止检查点(跳过).值-1 表示没有限制。 |
taskmanager.data.port | 0 | TaskManager 的端口用于数据交换 算子操作。 |
taskmanager.data.ssl.enabled | true | 为 taskmanager 数据传输启用 SSL 支持.仅当内部 SSL 的全局标志(security.ssl.internal.enabled)设置为 true 时,此选项才适用 |
taskmanager.debug.memory.log | false | 指示是否启动线程的标志,该线程重复记录 JVM 的内存使用情况。 |
taskmanager.debug.memory.log-interval | 5000 | 日志线程记录当前内存使用情况的时间间隔(以 ms 为单位). |
taskmanager.exit-on-fatal-akka-error | false | 是否应启动 TaskManager 的隔离监视器.如果隔离监视器检测到它已隔离另一个 actor 系统或者它已被另一个 actor 系统隔离,则会关闭该 actor 系统。 |
taskmanager.heap.size | “1024m” | TaskManagers 的 JVM 堆大小,它是系统的并行工作者.在 YARN 设置中,此值自动配置为 TaskManager 的 YARN 容器的大小,减去一定的容差值。 |
taskmanager.host | (none) | TaskManager 绑定到的网络接口的主机名.默认情况下,TaskManager 搜索可以连接到 JobManager 和其他 TaskManagers 的网络接口.如果该策略由于某种原因失败,则此选项可用于定义主机名.由于不同的 TaskManagers 需要此选项的不同值,因此通常在其他非共享的特定于 TaskManager 的配置文件中指定。 |
taskmanager.jvm-exit-on-oom | false | 是否在任务线程抛出 OutOfMemoryError 时终止 TaskManager. |
taskmanager.memory.fraction | 0.7 | TaskManager 为排序,哈希表和中间结果的缓存预留的相对内存量(在减去网络缓冲区使用的内存量之后).例如,值“0.8”表示 TaskManager 为内部数据缓冲区保存 80%的内存,为 TaskManager 的堆留下 20%的可用内存,用于由用户定义的函数创建的对象.仅当未设置 taskmanager.memory.size 时,才会评估此参数。 |
taskmanager.memory.off-heap | false | 内存分配方法(JVM 堆或堆外),用于 TaskManager 的托管内存以及网络缓冲区。 |
taskmanager.memory.preallocate | false | 在 TaskManager 启动时是否应预先分配 TaskManager 托管内存。 |
taskmanager.memory.segment-size | “32KB” | 网络堆栈和内存管理器使用的内存缓冲区的大小。 |
taskmanager.memory.size | “0” | TaskManager 的内存管理器分配的内存量.如果未设置,将分配相对分数。 |
taskmanager.network.detailed-metrics | false | 布尔标志,用于启用/禁用有关入站/出站网络队列长度的更详细指标。 |
taskmanager.network.memory.buffers-per-channel | 2 | 每个传出/传入通道(子分区/输入通道)使用的最大网络缓冲区数.在基于信用的流量控制模式下,这表示每个输入通道中有多少信用.它应配置至少 2 以获得良好的性能.1 个缓冲区用于接收子分区中的飞行中数据,1 个缓冲区用于并行序列化。 |
taskmanager.network.memory.floating-buffers-per-gate | 8 | 每个输出/输入门(结果分区/输入门)使用的额外网络缓冲区数.在基于信用的流量控制模式中,这表示在所有输入通道之间共享多少浮动信用.浮动缓冲区基于积压(子分区中的实时输出缓冲区)反馈来分布,并且可以帮助减轻由子分区之间的不平衡数据分布引起的背压.如果节点之间的往返时间较长和/或群集中的机器数量较多,则应增加此值。 |
taskmanager.network.memory.fraction | 0.1 | 用于网络缓冲区的 JVM 内存的分数.这决定了 TaskManager 可以同时拥有多少流数据交换通道以及通道缓冲的程度.如果作业被拒绝或者您收到系统没有足够缓冲区的警告,请增加此值或下面的最小/最大值.另请注意,“taskmanager.network.memory.min”和“taskmanager.network.memory.max”可能会覆盖此分数。 |
taskmanager.network.memory.max | “1GB” | 网络缓冲区的最大内存大小。 |
taskmanager.network.memory.min | “64MB” | 网络缓冲区的最小内存大小。 |
taskmanager.network.request-backoff.initial | 100 | 输入通道的分区请求的最小退避。 |
taskmanager.network.request-backoff.max | 10000 | 输入通道的分区请求的最大退避。 |
taskmanager.numberOfTaskSlots | 1 | 单个 TaskManager 可以运行的并行算子或用户函数实例的数量.如果此值大于 1,则单个 TaskManager 将获取函数或 算子的多个实例.这样,TaskManager 可以使用多个 CPU 内核,但同时,可用内存在不同的算子或函数实例之间划分.此值通常与 TaskManager 的计算机具有的物理 CPU 核心数成比例(例如,等于核心数,或核心数的一半). |
taskmanager.registration.initial-backoff | “500ms” | 两次连续注册尝试之间的初始注册退避.每次新注册尝试的退避加倍,直到达到最大注册退避。 |
taskmanager.registration.max-backoff | “30s" | 两次连续注册尝试之间的最大注册退避.最大注册退避需要时间单位指定符(ms / s / min / h / d). |
taskmanager.registration.refused-backoff | “10s" | 注册后的退避已被作业管理员拒绝,然后重试连接。 |
taskmanager.registration.timeout | “5m" | 定义 TaskManager 注册的超时.如果在未成功注册的情况下超过持续时间,则 TaskManager 将终止。 |
taskmanager.rpc.port | “0” | TaskManager 的 IPC 端口.接受端口列表(“50100,50101”),范围(“50100-50200”)或两者的组合.建议在同一台计算机上运行多个 TaskManagers 时设置一系列端口以避免冲突。 |
分布式协调(通过 Akka)
| 键 | 默认 | 描述 |
|---|---|---|
akka.ask.timeout | “10s" | 超时用于所有期货并阻止 Akka 通话.如果 Flink 由于超时而失败,那么您应该尝试增加此值.超时可能是由于机器速度慢或网络拥挤造成的.超时值需要时间单位指定符(ms / s / min / h / d). |
akka.client-socket-worker-pool.pool-size-factor | 1.0 | 池大小因子用于使用以下公式确定线程池大小:ceil(可用处理器*因子).然后,结果大小由 pool-size-min 和 pool-size-max 值限制。 |
akka.client-socket-worker-pool.pool-size-max | 2 | 将基于因子的数量限制为的最大线程数。 |
akka.client-socket-worker-pool.pool-size-min | 1 | 将基于因子的数量限制为的最小线程数。 |
akka.client.timeout | “60s" | 客户端的所有阻塞调用超时。 |
akka.fork-join-executor.parallelism-factor | 2.0 | 并行因子用于使用以下公式确定线程池大小:ceil(可用处理器*因子).然后,得到的大小受 parallelism-min 和 parallelism-max 值的限制。 |
akka.fork-join-executor.parallelism-max | 64 | 将基于因子的并行数量限制为的最大线程数。 |
akka.fork-join-executor.parallelism-min | 8 | 将基于因子的并行数量限制为的最小线程数。 |
akka.framesize | “10485760b” | 在 JobManager 和 TaskManager 之间发送的消息的最大大小.如果 Flink 由于消息超出此限制而失败,那么您应该增加它.邮件大小需要大小单位说明符。 |
akka.jvm-exit-on-fatal-error | true | 退出 JVM 致命的 Akka 错误。 |
akka.log.lifecycle.events | false | 打开 Akka 远程记录事件.在调试时将此值设置为“true”. |
akka.lookup.timeout | “10s" | 用于查找 JobManager 的超时.超时值必须包含时间单位说明符(ms / s / min / h / d). |
akka.retry-gate-closed-for | 50 | 断开远程连接后,应关闭门的 ms 数。 |
akka.server-socket-worker-pool.pool-size-factor | 1.0 | 池大小因子用于使用以下公式确定线程池大小:ceil(可用处理器*因子).然后,结果大小由 pool-size-min 和 pool-size-max 值限制。 |
akka.server-socket-worker-pool.pool-size-max | 2 | 将基于因子的数量限制为的最大线程数。 |
akka.server-socket-worker-pool.pool-size-min | 1 | 将基于因子的数量限制为的最小线程数。 |
akka.ssl.enabled | true | 为 Akka 的远程通信打开 SSL.仅当全局 ssl 标志 security.ssl.enabled 设置为 true 时,这才适用。 |
akka.startup-timeout | (none) | 超时之后,远程组件的启动被视为失败。 |
akka.tcp.timeout | “20s" | 所有出站连接超时.如果由于网络速度较慢而导致连接到 TaskManager 时遇到问题,则应增加此值。 |
akka.throughput | 15 | 在将线程返回到池之前批处理的消息数.较低的值表示公平的调度,而较高的值可以以不公平为代价来提高性能。 |
akka.transport.heartbeat.interval | “1000s" | Akka 传输故障检测器的心跳间隔.由于 Flink 使用 TCP,因此不需要检测器.因此,通过将间隔设置为非常高的值来禁用检测器.如果您需要传输故障检测器,请将间隔设置为某个合理的值.间隔值需要时间单位指定符(ms / s / min / h / d). |
akka.transport.heartbeat.pause | “6000s" | Akka 的传输故障检测器可接受的心跳暂停.由于 Flink 使用 TCP,因此不需要检测器.因此,通过将暂停设置为非常高的值来禁用检测器.如果您需要传输故障检测器,请将暂停设置为某个合理的值.暂停值需要时间单位指定符(ms / s / min / h / d). |
akka.transport.threshold | 300.0 | 传输故障检测器的阈值.由于 Flink 使用 TCP,因此检测器不是必需的,因此阈值被设置为高值。 |
akka.watch.heartbeat.interval | “10s" | Akka 的 DeathWatch 机制检测死亡 TaskManagers 的心跳间隔.如果由于心跳消息丢失或延迟而导致 TaskManagers 被错误地标记为死亡,那么您应该减小此值或增加 akka.watch.heartbeat.pause.可在 此处 找到 Akka 的 DeathWatch 的详尽描述 |
akka.watch.heartbeat.pause | “60s" | Akka 的 DeathWatch 机制可接受的心跳暂停.较低的值不允许心律不齐.如果由于心跳消息丢失或延迟而导致 TaskManagers 被错误地标记为死亡,那么您应该增加此值或 Reduceakka.watch.heartbeat.interval.较高的值会增加检测死的 TaskManager 的时间.可在 此处 找到 Akka 的 DeathWatch 的详尽描述 |
akka.watch.threshold | 12 | DeathWatch 故障检测器的阈值.较低的值容易出现误报,而较高的值会增加检测死的 TaskManager 的时间.可在 此处 找到 Akka 的 DeathWatch 的详尽描述 |
REST
| 键 | 默认 | 描述 |
|---|---|---|
rest.address | (none) | 客户端应该用于连接到服务器的地址。 |
rest.await-leader-timeout | 30000 | 客户端等待 Leader 地址的时间(以 ms 为单位),例如 Dispatcher 或 WebMonitorEndpoint |
rest.bind-address | (none) | 服务器绑定自身的地址。 |
rest.client.max-content-length | 104857600 | 客户端将处理的最大内容长度(以字节为单位). |
rest.connection-timeout | 15000 | 客户端建立 TCP 连接的最长时间(以 ms 为单位). |
rest.port | 8081 | 服务器侦听的端口/客户端连接到的端口。 |
rest.retry.delay | 3000 | 客户端在重试之间等待的时间(以 ms 为单位)(另请参阅“rest.retry.max-attempts”). |
rest.retry.max-attempts | 20 | 如果可重试 算子操作失败,客户端将尝试重试的次数。 |
rest.server.max-content-length | 104857600 | 服务器将处理的最大内容长度(以字节为单位). |
Blob 服务器
| 键 | 默认 | 描述 |
|---|---|---|
blob.fetch.backlog | 1000 | config 参数定义 JobManager 上 BLOB 提取的积压。 |
blob.fetch.num-concurrent | 50 | config 参数定义 JobManager 服务的最大并发 BLOB 提取数。 |
blob.fetch.retries | 5 | config 参数定义失败的 BLOB 提取的退出次数。 |
blob.offload.minsize | 1048576 | 要卸载到 BlobServer 的消息的最小大小。 |
blob.server.port | “0” | config 参数定义 blob 服务的服务器端口。 |
blob.service.cleanup.interval | 3600 | TaskManager 中 blob 缓存的清理间隔(以秒为单位). |
blob.service.ssl.enabled | true | 用于覆盖 blob 服务传输的 ssl 支持的标志。 |
blob.storage.directory | (none) | config 参数,用于定义 blob 服务器使用的存储目录。 |
心跳管理器
| 键 | 默认 | 描述 |
|---|---|---|
heartbeat.interval | 10000 | 从发送方请求心跳的时间间隔。 |
heartbeat.timeout | 50000 | 为发送方和接收方双方请求和接收心跳的超时。 |
SSL 设置
| 键 | 默认 | 描述 |
|---|---|---|
security.ssl.algorithms | “TLS_RSA_WITH_AES_128_CBC_SHA” | 要支持的标准 SSL 算法的逗号分隔列表。 在这里 阅读更多 |
security.ssl.internal.enabled | false | 打开 SSL 以进行内部网络通信.可选地,特定组件可以通过它们自己的设置(rpc,数据传输,REST 等)覆盖它。 |
security.ssl.internal.key-password | (none) | 解密 Keys 库中 Flink 内部端点(rpc,数据传输,blob 服务器)Keys 的密钥。 |
security.ssl.internal.keystore | (none) | 带有 SSLKeys 和证书的 JavaKeys 库文件,用于 Flink 的内部端点(rpc,数据传输,blob 服务器). |
security.ssl.internal.keystore-password | (none) | 为 Flink 的内部端点(rpc,数据传输,blob 服务器)解密 Flink 的 Keys 库文件的密钥。 |
security.ssl.internal.truststore | (none) | 包含公共 CA 证书的信任库文件,用于验证 Flink 内部端点(rpc,数据传输,blob 服务器)的对等方。 |
security.ssl.internal.truststore-password | (none) | 用于解密 Flink 内部端点(rpc,数据传输,blob 服务器)的信任库的密码。 |
security.ssl.key-password | (none) | 解密 Keys 库中的服务器 Keys 的密钥。 |
security.ssl.keystore | (none) | flink 端点用于其 SSLKeys 和证书的 JavaKeys 库文件。 |
security.ssl.keystore-password | (none) | 解密 Keys 库文件的密钥。 |
security.ssl.protocol | “TLSv1.2” | ssl 传输支持的 SSL 协议版本.请注意,它不支持以逗号分隔的列表。 |
security.ssl.rest.enabled | false | 通过 REST 端点打开 SSL 以进行外部通信。 |
security.ssl.rest.key-password | (none) | 解密 Flink 外部 REST 端点的 Keys 库中的 Keys 的密钥。 |
security.ssl.rest.keystore | (none) | 带有 SSLKeys 和证书的 JavaKeys 库文件,用于 Flink 的外部 REST 端点。 |
security.ssl.rest.keystore-password | (none) | 为 Flink 的外部 REST 端点解密 Flink 的 Keys 库文件的密钥。 |
security.ssl.rest.truststore | (none) | 包含公共 CA 证书的信任库文件,用于验证 Flink 的外部 REST 端点的对等方。 |
security.ssl.rest.truststore-password | (none) | 用于解密 Flink 外部 REST 端点的信任库的密码。 |
security.ssl.truststore | (none) | 信任库文件,包含 flink 端点用于验证对等方证书的公共 CA 证书。 |
security.ssl.truststore-password | (none) | 解密信任库的秘诀。 |
security.ssl.verify-hostname | true | 标记以在 ssl 握手期间启用对等方的主机名验证。 |
网络通讯(通过 Netty)
这些参数允许高级调整.在大型群集上运行并发高吞吐量作业时,默认值就足够了。
| 键 | 默认 | 描述 |
|---|---|---|
taskmanager.network.netty.client.connectTimeoutSec | 120 | Netty 客户端连接超时。 |
taskmanager.network.netty.client.numThreads | -1 | Netty 客户端线程的数量。 |
taskmanager.network.netty.num-arenas | -1 | Netty 竞技场的数量。 |
taskmanager.network.netty.sendReceiveBufferSize | 0 | Netty 发送和接收缓冲区大小.这默认为系统缓冲区大小(cat / proc / sys / net / ipv4 / tcp_ [rw] mem),在现代 Linux 中为 4 MiB. |
taskmanager.network.netty.server.backlog | 0 | netty 服务器连接积压。 |
taskmanager.network.netty.server.numThreads | -1 | Netty 服务器线程数。 |
taskmanager.network.netty.transport | “nio” | Netty 传输类型,“nio”或“epoll” |
Web 前端
| 键 | 默认 | 描述 |
|---|---|---|
web.access-control-allow-origin | “*” | |
web.address | (none) | |
web.backpressure.cleanup-interval | 600000 | |
web.backpressure.delay-between-samples | 50 | |
web.backpressure.num-samples | 100 | |
web.backpressure.refresh-interval | 60000 | |
web.checkpoints.history | 10 | |
web.history | 5 | |
web.log.path | (none) | |
web.refresh-interval | 3000 | |
web.ssl.enabled | true | |
web.submit.enable | true | |
web.timeout | 10000 | |
web.tmpdir | System.getProperty( “java.io.tmpdir”) | |
web.upload.dir | (none) |
文件系统
| 键 | 默认 | 描述 |
|---|---|---|
fs.default-scheme | (none) | 默认文件系统方案,用于未明确声明方案的路径.可能包含权限,例如,在 HDFS NameNode 的情况下为 host:port. |
fs.output.always-create-directory | false | 以大于 1 的并行度运行的文件编写器为输出文件路径创建目录,并将不同的结果文件(每个并行编写器任务一个)放入该目录中.如果此选项设置为“true”,则并行度为 1 的编写器也将创建一个目录并将单个结果文件放入其中.如果该选项设置为“false”,则编写器将直接在输出路径上直接创建文件,而不创建包含目录。 |
fs.overwrite-files | false | 指定默认情况下文件输出编写器是否应覆盖现有文件.设置为“true”以默认覆盖,否则设置为“false”. |
编译/优化
| 键 | 默认 | 描述 |
|---|---|---|
compiler.delimited-informat.max-line-samples | 10 | 编译器为分隔输入采用的最大行样本数.样本用于估计记录数.可以使用输入格式的参数覆盖特定输入的此值。 |
compiler.delimited-informat.max-sample-len | 2097152 | 编译器用于分隔输入的行样本的最大长度.如果单个样本的长度超过此值(可能是因为解析器配置错误),则取样将中止.可以使用输入格式的参数覆盖特定输入的此值。 |
compiler.delimited-informat.min-line-samples | 2 | 编译器为分隔输入采用的最小行样本数.样本用于估计记录数.可以使用输入格式的参数覆盖特定输入的此值 |
运行时算法
| 键 | 默认 | 描述 |
|---|---|---|
taskmanager.runtime.hashjoin-bloom-filters | false | 用于在混合散列连接实现中激活/停用 bloom 过滤器的标志.如果散列连接需要溢出到磁盘(数据集大于保存的内存部分),这些布隆过滤器可以大大 Reduce 溢出记录的数量,但需要花费一些 CPU 周期。 |
taskmanager.runtime.max-fan | 128 | 外部合并的最大扇入连接和扇出用于溢出哈希表.限制每个 算子的文件句柄数,但如果设置得太小,可能会导致中间合并/分区。 |
taskmanager.runtime.sort-spilling-threshold | 0.8 | 当这部分内存预算已满时,排序 算子操作开始溢出。 |
Resource Manager
本节中的配置键独立于使用的资源管理框架(YARN,Mesos,Standalone,...)
| 键 | 默认 | 描述 |
|---|---|---|
containerized.heap-cutoff-min | 600 | 作为安全边际,要在容器中删除的最小堆内存量。 |
containerized.heap-cutoff-ratio | 0.25 | 要从容器中删除的堆空间百分比(YARN / Mesos),以补偿其他 JVM 内存使用情况。 |
local.number-resourcemanager | 1 | |
resourcemanager.job.timeout | “5m" | 没有 TaskManager 作为 Leader 的工作超时。 |
resourcemanager.rpc.port | 0 | 定义要连接的网络端口以与资源管理器进行通信.默认情况下,JobManager 的端口,因为使用了相同的 ActorSystem.无法使用此配置键定义端口范围。 |
resourcemanager.taskmanager-timeout | 30000 | 释放空闲 TaskManager 的超时。 |
YARN
| 键 | 默认 | 描述 |
|---|---|---|
yarn.application-attempts | (none) | ApplicationMaster 重启次数.请注意,整个 Flink 群集将重新启动,YARN 客户端将断开连接.此外,JobManager 地址将更改,您需要手动设置 JM 主机:port.建议将此选项保存为 1. |
yarn.application-master.port | “0” | 使用此配置选项,用户可以为 Application Master(和 JobManager)RPC 端口指定端口,一系列端口或端口列表.默认情况下,我们建议使用默认值(0)让 算子操作系统选择适当的端口.特别是当多个 AM 在同一物理主机上运行时,固定端口分配会阻止 AM 启动.例如,在具有限制性防火墙的环境中在 YARN 上运行 Flink 时,此选项允许指定一系列允许的端口。 |
yarn.appmaster.rpc.address | (none) | 应用程序主 RPC 系统正在侦听的主机名或地址。 |
yarn.appmaster.rpc.port | -1 | 应用程序主 RPC 系统正在侦听的端口。 |
yarn.containers.vcores | -1 | 每个 YARN 容器的虚拟核心数(vcores).默认情况下,vcores 的数量设置为每个 TaskManager 的插槽数(如果已设置),或者设置为 1,否则设置为 1.为了使用此参数,您的群集必须启用 CPU 调度.您可以通过设置来完成此 算子操作 org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler . |
yarn.heartbeat-delay | 5 | 使用 ResourceManager 的心跳之间的时间,以秒为单位。 |
yarn.maximum-failed-containers | (none) | 系统在发生故障时将重新分配的最大容器数。 |
yarn.per-job-cluster.include-user-jar | “ORDER” | 定义用户 jar 是否包含在每个作业集群的系统类路径中以及它们在路径中的位置.它们可以位于开头(“FIRST”),末尾(“LAST”),或者根据其名称(“ORDER”)定位.将此参数设置为“DISABLED”会导致 jar 包含在用户类路径中。 |
yarn.properties-file.location | (none) | 将 Flink 作业提交给 YARN 时,JobManager 的主机和可用处理槽的数量将写入属性文件,以便 Flink 客户端能够选择这些详细信息.此配置参数允许更改该文件的默认位置(例如,对于在用户之间共享 Flink 安装的环境). |
yarn.tags | (none) | 要应用于 Flink YARN 应用程序的以逗号分隔的标记列表。 |
Mesos
| 键 | 默认 | 描述 |
|---|---|---|
mesos.failover-timeout | 604800 | Mesos 调度程序的故障转移超时(以秒为单位),之后将自动关闭正在运行的任务。 |
mesos.initial-tasks | 0 | 最初的工人在主人开始时提出来.除非 Flink 处于 传统模式, 否则将忽略此选项。 |
mesos.master | (none) | Mesos 主 URL.该值应采用以下形式之一: |
| 主持人:port | ||
| ZK://主机 1:端口 1,主机 2:端口 2,... /路径 | ||
| ZK://用户名:密码 @主机 1:端口 1,主机 2:端口 2,... /路径 | ||
| 文件:///路径/到/文件 | ||
mesos.maximum-failed-tasks | -1 | 群集失败前失败的最大工作数.可以设置为-1 以禁用此函数.除非 Flink 处于 传统模式, 否则将忽略此选项。 |
mesos.resourcemanager.artifactserver.port | 0 | config 参数定义要使用的 Mesos 工件服务器端口.将端口设置为 0 将允许 算子操作系统选择可用端口。 |
mesos.resourcemanager.artifactserver.ssl.enabled | true | 为 Flink 工件服务器启用 SSL.请注意,security.ssl.enabled 也需要设置为 true 加密才能启用加密。 |
mesos.resourcemanager.framework.name | “Flink” | Mesos 框架名称 |
mesos.resourcemanager.framework.principal | (none) | Mesos 框架主体 |
mesos.resourcemanager.framework.role | “*” | Mesos 框架角色定义 |
mesos.resourcemanager.framework.secret | (none) | Mesos 框架密钥 |
mesos.resourcemanager.framework.user | (none) | Mesos 框架用户 |
mesos.resourcemanager.tasks.port-assignments | (none) | 以逗号分隔的配置键列表,表示可配置端口.所有端口 Keys 将动态获取通过 Mesos 分配的端口。 |
Mesos TaskManager
| 键 | 默认 | 描述 |
|---|---|---|
mesos.constraints.hard.hostattribute | (none) | 基于代理属性在 Mesos 上放置任务的约束.采用逗号分隔的键:值对列表,对应于目标介质代理公开的属性.示例:az:eu-west-1a,系列:t2 |
mesos.resourcemanager.tasks.bootstrap-cmd | (none) | 在 TaskManager 启动之前执行的命令。 |
mesos.resourcemanager.tasks.container.docker.force-pull-image | false | 指示 docker containerizer 强制拉动镜像,而不是重用缓存版本。 |
mesos.resourcemanager.tasks.container.docker.parameters | (none) | 使用 docker 容器时,要传递给 docker run 命令的自定义参数.逗号分隔的“key = value”对列表.“值”可能包含“=”. |
mesos.resourcemanager.tasks.container.image.name | (none) | 用于容器的映像名称。 |
mesos.resourcemanager.tasks.container.type | “mesos” | 使用的集装箱类型:“mesos”或“docker”. |
mesos.resourcemanager.tasks.container.volumes | (none) | 逗号分隔的[host_path:] container_path [:RO | RW]列表.这允许将额外的卷安装到容器中。 |
mesos.resourcemanager.tasks.cpus | 0.0 | 要分配给 Mesos 工作者的 CPU. |
mesos.resourcemanager.tasks.gpus | 0 | 要分配给 Mesos 工作者的 GPU. |
mesos.resourcemanager.tasks.hostname | (none) | 用于定义 TaskManager 主机名的可选值.模式 TASK 由 Mesos 任务的实际 ID 替换.这可用于配置 TaskManager 以使用 Mesos DNS(例如 TASK .flink-service.mesos)进行名称查找。 |
mesos.resourcemanager.tasks.mem | 1024 | 要以 MB 为单位分配给 Mesos worker 的内存。 |
mesos.resourcemanager.tasks.taskmanager-cmd | "$FLINK_HOME/bin/mesos-taskmanager.sh" | |
mesos.resourcemanager.tasks.uris | (none) | 以逗号分隔的自定义工件 URI 列表,这些 URI 将下载到 Mesos 工作者的沙箱中。 |
taskmanager.numberOfTaskSlots | 1 | 单个 TaskManager 可以运行的并行算子或用户函数实例的数量.如果此值大于 1,则单个 TaskManager 将获取函数或 算子的多个实例.这样,TaskManager 可以使用多个 CPU 内核,但同时,可用内存在不同的算子或函数实例之间划分.此值通常与 TaskManager 的计算机具有的物理 CPU 核心数成比例(例如,等于核心数,或核心数的一半). |
高可用性(HA)
| 键 | 默认 | 描述 |
|---|---|---|
high-availability | “NONE” | 定义用于群集执行的高可用性模式.要启用高可用性,请将此模式设置为“ZOOKEEPER”. |
high-availability.cluster-id | “/default” | Flink 集群的 ID,用于将多个 Flink 集群彼此分开.需要为独立群集设置,但在 YARN 和 Mesos 中自动推断。 |
high-availability.job.delay | (none) | 故障转移后 JobManager 之前的时间恢复当前作业。 |
high-availability.jobmanager.port | “0” | JobManager 在高可用性模式下使用的可选端口(范围). |
high-availability.storageDir | (none) | 文件系统路径(URI)Flink 在高可用性设置中持久保存元数据。 |
基于 ZooKeeper 的 HA 模式
| 键 | 默认 | 描述 |
|---|---|---|
high-availability.zookeeper.client.acl | “open” | 定义要在 ZK 节点上配置的 ACL(open | creator).如果 ZooKeeper 服务器配置将“authProvider”属性映射为使用 SASLAuthenticationProvider 并且群集配置为以安全模式(Kerberos)运行,则可以将配置值设置为“creator”. |
high-availability.zookeeper.client.connection-timeout | 15000 | 定义 ZooKeeper 的连接超时(以 ms 为单位). |
high-availability.zookeeper.client.max-retry-attempts | 3 | 定义客户端放弃之前的连接重试次数。 |
high-availability.zookeeper.client.retry-wait | 5000 | 定义以 ms 为单位的连续重试之间的暂停。 |
high-availability.zookeeper.client.session-timeout | 60000 | 以 ms 为单位定义 ZooKeeper 会话的会话超时。 |
high-availability.zookeeper.path.checkpoint-counter | “/checkpoint-counter” | ZooKeeper 根路径(ZNode)用于检查点计数器。 |
high-availability.zookeeper.path.checkpoints | “/checkpoints” | 已完成检查点的 ZooKeeper 根路径(ZNode). |
high-availability.zookeeper.path.jobgraphs | “/jobgraphs” | 作业图的 ZooKeeper 根路径(ZNode) |
high-availability.zookeeper.path.latch | “/leaderlatch” | 定义用于选择 Leader 的 Leader 锁存器的 znode. |
high-availability.zookeeper.path.leader | “/leader” | 定义 Leader 的 znode,其中包含 Leader 的 URL 和当前 Leader 会话 ID. |
high-availability.zookeeper.path.mesos 工 | “/mesos-workers” | ZooKeeper 根路径,用于保存 Mesos 工作者信息。 |
high-availability.zookeeper.path.root | “/flink” | Flink 在 ZooKeeper 中存储其条目的根路径。 |
high-availability.zookeeper.path.running 的注册表 | “/running_job_registry/” | |
high-availability.zookeeper.quorum | (none) | 使用 ZooKeeper 在高可用性模式下运行 Flink 时要使用的 ZooKeeper quorum. |
ZooKeeper 安全
| 键 | 默认 | 描述 |
|---|---|---|
zookeeper.sasl.disable | false | |
zookeeper.sasl.login-context-name | “Client” | |
zookeeper.sasl.service-name | “zookeeper” |
基于 Kerberos 的安全性
| 键 | 默认 | 描述 |
|---|---|---|
security.kerberos.login.contexts | (none) | 以逗号分隔的登录上下文列表,用于提供 Kerberos 凭据(例如, Client,KafkaClient 使用凭证进行 ZooKeeper 身份验证和 Kafka 身份验证) |
security.kerberos.login.keytab | (none) | 包含用户凭据的 KerberosKeys 表文件的绝对路径。 |
security.kerberos.login.principal | (none) | 与 keytab 关联的 Kerberos 主体名称。 |
security.kerberos.login.use-ticket-cache | true | 指示是否从 Kerberos 票证缓存中读取。 |
环境
| 键 | 默认 | 描述 |
|---|---|---|
env.hadoop.conf.dir | (none) | hadoop 配置目录的路径.需要读取 HDFS 和/或 YARN 配置.您也可以通过环境变量进行设置。 |
env.java.opts | (none) | |
env.java.opts.jobmanager | (none) | |
env.java.opts.taskmanager | (none) | |
env.log.dir | (none) | 定义保存 Flink 日志的目录.它必须是一条绝对的道路.(默认为 Flink 主页下的日志目录) |
env.log.max | 5 | 要保存的最大旧日志文件数。 |
env.ssh.opts | (none) | 启动或停止 JobManager,TaskManager 和 Zookeeper 服务时,其他命令行选项传递给 SSH 客户端(start-cluster.sh,stop-cluster.sh,start-zookeeper-quorum.sh,stop-zookeeper-quorum.sh). |
env.yarn.conf.dir | (none) | YARN 配置目录的路径.它需要在 YARN 上运行 flink.您也可以通过环境变量进行设置。 |
检查点
| 键 | 默认 | 描述 |
|---|---|---|
state.backend | (none) | 状态后台用于存储和检查点状态。 |
state.backend.async | true | 选择状态后台是否应在可能和可配置的情况下使用异步 SNAPSHOT 方法.某些状态后台可能不支持异步 SNAPSHOT,或者仅支持异步 SNAPSHOT,并忽略此选项。 |
state.backend.fs.memory-threshold | 1024 | 状态数据文件的最小大小.小于该值的所有状态块都内联存储在根检查点元数据文件中。 |
state.backend.incremental | false | 如果可能,选择状态后台是否应创建增量检查点.对于增量检查点,仅存储来自先前检查点的差异,而不是完整的检查点状态.某些状态后台可能不支持增量检查点并忽略此选项。 |
state.backend.local-recovery | false | |
state.checkpoints.dir | (none) | 用于在 Flink 支持的文件系统中存储检查点的数据文件和元数据的默认目录.必须可以从所有参与的进程/节点(即所有 TaskManagers 和 JobManagers)访问存储路径。 |
state.checkpoints.num-retained | 1 | 要保存的已完成检查点的最大数量。 |
state.savepoints.dir | (none) | 保存点的默认目录.由将后台写入文件系统的状态后台(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)使用。 |
taskmanager.state.local.root-dirs | (none) |
可查询状态
| 键 | 默认 | 描述 |
|---|---|---|
query.client.network-threads | 0 | 网络数(Netty 的事件循环)可查询状态客户端的线程。 |
query.proxy.network-threads | 0 | 网络数(Netty 的事件循环)可查询状态代理的线程。 |
query.proxy.ports | “9069” | 可查询状态代理的端口范围.指定范围可以是单个端口:“9123”,一系列端口:“50100-50200”,或范围和端口列表:“50100-50200,50300-50400,51234”. |
query.proxy.query-threads | 0 | 可查询状态代理的查询线程数.如果设置为 0,则使用插槽数。 |
query.server.network-threads | 0 | 网络数(Netty 的事件循环)可查询状态服务器的线程。 |
query.server.ports | “9067” | 可查询状态服务器的端口范围.指定范围可以是单个端口:“9123”,一系列端口:“50100-50200”,或范围和端口列表:“50100-50200,50300-50400,51234”. |
query.server.query-threads | 0 | 可查询状态服务器的查询线程数.如果设置为 0,则使用插槽数。 |
度量
| 键 | 默认 | 描述 |
|---|---|---|
metrics.latency.granularity | “operator” | 定义延迟指标的粒度.可接受的值是: |
| 单一 - 跟踪延迟,无需区分源和子任务。 | ||
| operator - 跟踪延迟,同时区分源,但不区分子任务。 | ||
| 子任务 - 在区分源和子任务时跟踪延迟。 | ||
metrics.latency.history-size | 128 | 定义每个算子维护的测量延迟数。 |
metrics.latency.interval | 0 | 定义从源发出延迟跟踪标记的间隔.如果设置为 0 或负值,则禁用延迟跟踪.启用此函数会显着影响群集的性能。 |
metrics.reporter.<name> .<parameter> | (none) | 为名为<name>的报告器配置参数<parameter>. |
metrics.reporter.<name>.class | (none) | 报告类用于为报告命名<name>. |
metrics.reporter.<name>.interval | (none) | 报告间隔用于报告名为<name>. |
metrics.reporters | (none) | |
metrics.scope.delimiter | “” | |
metrics.scope.jm | “<host> .jobmanager” | 定义应用于作用于 JobManager 的所有度量标准的范围格式字符串。 |
metrics.scope.jm.job | “<host> .jobmanager.<job_name>” | 定义范围格式字符串,该字符串应用于作用于 JobManager 上作业的所有度量标准。 |
metrics.scope.operator | “<host> .taskmanager.<tm_id> <job_name> <operator_name> <subtask_index>” | 定义应用于作用于 算子的所有度量标准的范围格式字符串。 |
metrics.scope.task | “<host> .taskmanager.<tm_id> <job_name> <task_name> <subtask_index>” | 定义应用于作用于任务的所有度量标准的范围格式字符串。 |
metrics.scope.tm | “<host> .taskmanager.<tm_id>” | 定义应用于作用于 TaskManager 的所有度量标准的范围格式字符串。 |
metrics.scope.tm.job | “<host> .taskmanager.<tm_id> <job_name>” | 定义范围格式字符串,该字符串应用于作用于 TaskManager 上作业的所有度量标准。 |
metrics.system-resource | false | |
metrics.system-resource-probing-interval | 5000 |
历史服务器
如果要通过 HistoryServer 的 Web 前端显示它们,则必须进行配置 jobmanager.archive.fs.dir 以存档已终止的作业并将其添加到受监视目录列表中 historyserver.archive.fs.dir .
jobmanager.archive.fs.dir:将有关已终止作业的信息上载到的目录.您必须将此目录添加到历史服务器的受监视目录列表中historyserver.archive.fs.dir.
| 键 | 默认 | 描述 |
|---|---|---|
historyserver.archive.fs.dir | (none) | 以逗号分隔的目录列表,用于从中获取已归档的作业.历史服务器将监视这些目录以获取已存档的作业.您可以将 JobManager 配置为通过 jobmanager.archive.fs.dir 将作业存档到目录。 |
historyserver.archive.fs.refresh-interval | 10000 | 刷新已归档作业目录的时间间隔(以 ms 为单位). |
historyserver.web.address | (none) | HistoryServer 的 Web 界面的地址。 |
historyserver.web.port | 8082 | HistoryServers 的 Web 界面的端口。 |
historyserver.web.refresh-interval | 10000 | |
historyserver.web.ssl.enabled | false | 启用对 HistoryServer Web 前端的 HTTP 访问.仅当全局 SSL 标志 security.ssl.enabled 设置为 true 时,此选项才适用。 |
historyserver.web.tmpdir | (none) | 此配置参数允许定义历史服务器 Web 界面使用的 Flink Web 目录.Web 界面将其静态文件复制到目录中。 |
留存
mode:Flink 的执行模式.可能的值是legacy和new.要启动旧组件,您必须指定legacy(DEFAULT:)new.
背景
配置网络缓冲区
如果您看到异常 java.io.IOException: Insufficient number of network buffers ,则需要调整用于网络缓冲区的内存量,以便程序在您的 TaskManager 上运行。
网络缓冲区是通信层的关键资源.它们用于在通过网络传输之前缓冲记录,并在将传入数据解析为记录并将其传递给应用程序之前缓冲传入数据.足够数量的网络缓冲区对于实现良好的吞吐量至关重要。
从 Flink 1.3 开始,你可以遵循“越多越好”的成语而不会对延迟造成任何惩罚(我们通过限制每个通道使用的实际缓冲区数量来防止每个传出和传入通道中的过度缓冲,即 缓冲膨胀 ) .
通常,将 TaskManager 配置为具有足够的缓冲区,以使您希望同时打开的每个逻辑网络连接都具有专用缓冲区.对于网络上的每个点对点数据交换存在逻辑网络连接,这通常发生在重新分区或广播步骤(混洗阶段).在那些中,TaskManager 中的每个并行任务必须能够与所有其他并行任务进行通信。
注意: 从 Flink 1.5 开始,网络缓冲区将始终在堆外分配,即在 JVM 堆之外,而不管其值是多少 taskmanager.memory.off-heap .这样,我们可以将这些缓冲区直接传递给底层网络堆栈层。
设置内存分数
以前,手动设置网络缓冲区的数量,这成为一个非常容易出错的任务(见下文).从 Flink 1.3 开始,可以使用以下配置参数定义用于网络缓冲区的一小部分内存:
taskmanager.network.memory.fraction:用于网络缓冲区的 JVM 内存的分数(DEFAULT:0.1),taskmanager.network.memory.min:网络缓冲区的最小内存大小(默认值:64MB),taskmanager.network.memory.max:网络缓冲区的最大内存大小(默认值:1GB),和taskmanager.memory.segment-size:内存管理器和网络堆栈使用的内存缓冲区大小(以字节为单位)(默认值:32KB).
直接设置网络缓冲区的数量
注意:不建议使用 这种配置网络缓冲区使用的内存量的方法.请考虑使用上述方法定义要使用的内存部分。
缓冲器的上一个 TaskManager 所要求数量为 总度的平行度 (数的目标)* 节点内并行性 (源在一个 TaskManager 数)× N 与 N 是限定多少 repartitioning-恒定/您希望同时处于活动状态的广播步骤.由于 节点内并行 性通常是核心数量,并且超过 4 个重新分区或广播频道很少并行活动,因此它经常归结为
#slots-per-TM^2 * #TMs * 4
哪里 #slots per TM 是 每个 TaskManager 插槽数量 和 #TMs 是 TaskManager 的总数。
例如,为了支持 20 个 8 插槽机器的集群,您应该使用大约 5000 个网络缓冲区来获得最佳吞吐量。
默认情况下,每个网络缓冲区的大小为 32 KiBytes.在上面的示例中,系统因此将为网络缓冲区分配大约 300 MiBytes.
可以使用以下参数配置网络缓冲区的数量和大小:
taskmanager.network.numberOfBuffers,和taskmanager.memory.segment-size.
配置临时 I / O 目录
虽然 Flink 的目标是尽可能多地处理主内存中的数据,但是需要处理的内存比内存更多的数据并不少见.Flink 的运行时用于将临时数据写入磁盘以处理这些情况。
该 taskmanager.tmp.dirs 参数指定 Flink 写入临时文件的目录列表.目录的路径需要用':'(冒号字符)分隔.Flink 将同时向(从)每个配置的目录写入(或读取)一个临时文件.这样,临时 I / O 可以均匀地分布在多个独立的 I / O 设备(如硬盘)上,以提高性能.要利用快速 I / O 设备(例如,SSD,RAID,NAS),可以多次指定目录。
如果 taskmanager.tmp.dirs 未显式指定参数,Flink 会将临时数据写入 算子操作系统的临时目录,例如 Linux 系统中的 / tmp .
配置 TaskManager 处理槽
Flink 通过将程序拆分为子任务并将这些子任务调度到处理槽来并行执行程序。
每个 Flink TaskManager 都在集群中提供处理插槽.插槽数通常与 每个 TaskManager 的可用 CPU 核心数成比例.作为一般建议,可用的 CPU 核心数量是一个很好的默认值 taskmanager.numberOfTaskSlots .
启动 Flink 应用程序时,用户可以提供用于该作业的默认插槽数.因此调用命令行值 -p (用于并行).此外,可以为整个应用程序和各个算子 设置编程 API 中的插槽数 .

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论