- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
迭代
迭代算法出现在许多数据分析领域,例如 机器学习 或 图形分析 。这些算法对于实现大数据从数据中提取有意义信息的承诺至关重要。随着越来越有兴趣在非常大的数据集上运行这些算法,需要以大规模并行方式执行迭代。
Flink 程序通过定义 步进函数 并将其嵌入到特殊的迭代 算子中来实现迭代算法。此 算子有两种变体: Iterate 和 Delta Iterate 。两个 算子在当前迭代状态下重复调用步进函数,直到达到某个终止条件。
在这里,我们提供两种算子变体的背景并概述它们的用法。该 节目指南 介绍了如何实现算子在这两个 Scala 和 Java。我们还通过 Flink 的图形处理 API Gelly 支持以 顶点为中心和聚集求和的迭代 。
下表提供了两个 算子的概述:
迭代 | Delta 迭代 | |
---|---|---|
迭代输入 | 部分解决方案 | 工作集 和 解决方案集 |
步函数 | 任意数据流 | |
状态更新 | 下 一部分解决方案 | 下一个工作集 |
对解决方案集的更改 | ||
迭代结果 | 最后部分解决方案 | 上次迭代后的解决方案设置状态 |
终止 | 最大迭代次数 (默认) | 最大迭代次数或空工作集 (默认) |
自定义聚合器收敛 | 自定义聚合器收敛 |
迭代 算子
在 迭代 算子 覆盖所述 迭代简单形式 :在每次迭代中, 阶梯函数 消耗 整个输入 (在 先前的迭代的结果 ,或在 初始数据集 ),并且计算 该部分解决方案的下一个版本 (例如 map
, reduce
, join
,等等。)。
- 迭代输入 :来自 数据源 或 先前 算子 的 第一次迭代的 初始输入。
- 步骤函数 :步进函数将在每次迭代中执行。它是由像算子的任意数据流
map
,reduce
,join
等,取决于手头的特定任务。 - Next Partial Solution :在每次迭代中,step 函数的输出将反馈到 下一次迭代 。
- 迭代结果 : 最后一次迭代的 输出被写入 数据接收器 或用作 以下 算子的 输入。
有多个选项可指定迭代的 终止条件 :
- 最大迭代次数 :没有任何其他条件,迭代将执行多次。
- 自定义聚合器收敛 :迭代允许指定 自定义聚合器 和 收敛标准, 如 sum 聚合发出的记录数(聚合器),如果此数字为零则终止(收敛标准)。
您还可以考虑伪代码中的迭代 算子:
IterationState state = getInitialState();
while (!terminationCriterion()) {
state = step(state);
}
setFinalState(state);
有关详细信息和代码示例,请参阅 编程指南 。
示例:递增数字
在以下示例中,我们 迭代地递增一组数字 :
- 迭代输入 :初始输入从数据源读取和由五个单字段记录(整数
1
到5
)。 - 步进函数 :步进函数是单个
map
算子,它将整数字段从i
增加到i+1
。它将应用于输入的每个记录。 - Next Partial Solution :step 函数的输出将是 map 算子的输出,即具有递增整数的记录。
- 迭代结果 :经过十次迭代,最初的数字将被增加十倍,造成整数
11
来15
。
// 1st 2nd 10th
map(1) -> 2 map(2) -> 3 ... map(10) -> 11
map(2) -> 3 map(3) -> 4 ... map(11) -> 12
map(3) -> 4 map(4) -> 5 ... map(12) -> 13
map(4) -> 5 map(5) -> 6 ... map(13) -> 14
map(5) -> 6 map(6) -> 7 ... map(14) -> 15
需要注意的是 1 , 2 ,和 4 可以是任意的数据流。
Delta 迭代算子
该 增量迭代算子 覆盖的情况下, 增量迭代 。增量迭代 有选择地修改 其 解决方案的** 数据元**并演化解决方案,而不是完全重新计算它。
在适用的情况下,这会导致 更高效的算法 ,因为解决方案集中的每个数据元都不会在每次迭代中发生变化。这样可以 专注于 解决方案 的热部件 ,并 保持冷部件不受影响 。通常,大多数解决方案相对较快地冷却,后来的迭代仅在一小部分数据上运行。
- 迭代输入 :从 数据源 或 先前的 算子 读取初始工作集和解决方案集作为第一次迭代的输入。
- 步骤函数 :步进函数将在每次迭代中执行。它是由像算子的任意数据流
map
,reduce
,join
等,取决于手头的特定任务。 - 下 一个工作集 /更新解决方案集 : 下一个工作集 驱动迭代计算,并将反馈到 下一个迭代 。此外,解决方案集将被更新并隐式转发(不需要重建)。两个数据集都可以由步进函数的不同 算子更新。
- 迭代结果 :在 最后一次迭代之后 , 解决方案集 被写入 数据接收器 或用作 以下 算子的 输入。
delta 迭代的默认 终止条件 由 空工作集收敛标准 和 最大迭代次数指定 。当生成的 下一个工作集 为空或达到最大迭代次数时,迭代将终止。还可以指定 自定义聚合器 和 收敛标准 。
您还可以考虑伪代码中的迭代 算子:
IterationState workset = getInitialState();
IterationState solution = getInitialSolution();
while (!terminationCriterion()) {
(delta, workset) = step(workset, solution);
solution.update(delta)
}
setFinalState(solution);
有关详细信息和代码示例,请参阅 编程指南 。
示例:在图表中传播最小值
在以下示例中,每个顶点都有一个 ID 和一个 着色 。每个顶点将其顶点 ID 传播到相邻顶点。该 目标 是 最小 ID 分配给子图的每个顶点 。如果接收的 ID 小于当前的 ID,则它将变为具有接收到的 ID 的顶点的颜色。其中一个应用可以在 社区分析 或 连通组件 计算中找到。
的 初始输入 被设置为 两个工作集和溶液组。 在上图中,颜色可视化 解决方案集 的 演变 。每次迭代时,最小 ID 的颜色在相应的子图中展开。同时,每次迭代,工作量(交换和比较顶点 ID)都会 Reduce。这对应于 工作集的大小减小 ,其在三次迭代之后从所有七个顶点变为零,此时迭代终止。在 重要的观察 是, 较低的子收敛上半之前 不和增量迭代能够与工作集抽象捕捉到这一点。
在上部子图 ID 1 ( 橙色 )中是 最小 ID 。在第 一次迭代中 ,它将传播到顶点 2,随后将其颜色更改为橙色。顶点 3 和 4 将接收 ID 2 ( 黄色 )作为其当前最小 ID 并更改为黄色。因为 顶点 1 的颜色在第一次迭代中没有改变,所以可以在下一个工作集中跳过它。
在较低的子图中, ID 5 ( 青色 )是 最小 ID 。下子图的所有顶点将在第一次迭代中接收它。同样,我们可以跳过下一个工作集的未更改顶点( 顶点 5 )。
在 第二次迭代中 ,工作集大小已经从七个数据元 Reduce 到五个数据元(顶点 2,3,4,6 和 7)。这些是迭代的一部分,并进一步传播其当前的最小 ID。在此迭代之后,下部子图已经收敛(图的 冷部分 ),因为它在工作集中没有数据元,而上半部分需要对剩余的两个工作集数据元(顶点)进行进一步迭代(图的 热部分 ) 3 和 4)。
当 第三次迭代 后工作集为空时,迭代 终止 。
超级同步
我们将迭代 算子的阶梯函数的每次执行称为 单次迭代 。在并行设置中,在迭代状态的不同分区上 并行评估步骤函数的多个实例 。在许多设置中,对所有并行实例的步骤函数的一个评估形成所谓的 超级步骤 ,其也是同步的粒度。因此,迭代的 所有 并行任务都需要在初始化下一个超级步骤之前完成超级步骤。 终止标准 也将在超级障碍评估。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论