返回介绍

迭代

发布于 2025-05-02 18:19:15 字数 6527 浏览 0 评论 0 收藏

迭代算法出现在许多数据分析领域,例如 机器学习 或 图形分析 。这些算法对于实现大数据从数据中提取有意义信息的承诺至关重要。随着越来越有兴趣在非常大的数据集上运行这些算法,需要以大规模并行方式执行迭代。

Flink 程序通过定义 步进函数 并将其嵌入到特殊的迭代 算子中来实现迭代算法。此 算子有两种变体: IterateDelta Iterate 。两个 算子在当前迭代状态下重复调用步进函数,直到达到某个终止条件。

在这里,我们提供两种算子变体的背景并概述它们的用法。该 节目指南 介绍了如何实现算子在这两个 Scala 和 Java。我们还通过 Flink 的图形处理 API Gelly 支持以 顶点为中心和聚集求和的迭代

下表提供了两个 算子的概述:

 迭代Delta 迭代
迭代输入部分解决方案工作集解决方案集
步函数任意数据流
状态更新一部分解决方案下一个工作集
  对解决方案集的更改
迭代结果最后部分解决方案上次迭代后的解决方案设置状态
终止最大迭代次数 (默认)最大迭代次数或空工作集 (默认)
 自定义聚合器收敛自定义聚合器收敛

迭代 算子

迭代 算子 覆盖所述 迭代简单形式 :在每次迭代中, 阶梯函数 消耗 整个输入 (在 先前的迭代的结果 ,或在 初始数据集 ),并且计算 该部分解决方案的下一个版本 (例如 mapreducejoin ,等等。)。

迭代 算子

  1. 迭代输入 :来自 数据源 或 先前 算子 的 第一次迭代的 初始输入。
  2. 步骤函数 :步进函数将在每次迭代中执行。它是由像算子的任意数据流 mapreducejoin 等,取决于手头的特定任务。
  3. Next Partial Solution :在每次迭代中,step 函数的输出将反馈到 下一次迭代 。
  4. 迭代结果 : 最后一次迭代的 输出被写入 数据接收器 或用作 以下 算子的 输入。

有多个选项可指定迭代的 终止条件

  • 最大迭代次数 :没有任何其他条件,迭代将执行多次。
  • 自定义聚合器收敛 :迭代允许指定 自定义聚合器 和 收敛标准, 如 sum 聚合发出的记录数(聚合器),如果此数字为零则终止(收敛标准)。

您还可以考虑伪代码中的迭代 算子:

IterationState state = getInitialState();

while (!terminationCriterion()) {
  state = step(state);
}

setFinalState(state);

有关详细信息和代码示例,请参阅 编程指南

示例:递增数字

在以下示例中,我们 迭代地递增一组数字

迭代 算子示例

  1. 迭代输入 :初始输入从数据源读取和由五个单字段记录(整数 15 )。
  2. 步进函数 :步进函数是单个 map 算子,它将整数字段从 i 增加到 i+1 。它将应用于输入的每个记录。
  3. Next Partial Solution :step 函数的输出将是 map 算子的输出,即具有递增整数的记录。
  4. 迭代结果 :经过十次迭代,最初的数字将被增加十倍,造成整数 1115
// 1st       2nd             10th
map(1) -> 2    map(2) -> 3    ...    map(10) -> 11
map(2) -> 3    map(3) -> 4    ...    map(11) -> 12
map(3) -> 4    map(4) -> 5    ...    map(12) -> 13
map(4) -> 5    map(5) -> 6    ...    map(13) -> 14
map(5) -> 6    map(6) -> 7    ...    map(14) -> 15

需要注意的是 12 ,和 4 可以是任意的数据流。

Delta 迭代算子

增量迭代算子 覆盖的情况下, 增量迭代 。增量迭代 有选择地修改解决方案的** 数据元**并演化解决方案,而不是完全重新计算它。

在适用的情况下,这会导致 更高效的算法 ,因为解决方案集中的每个数据元都不会在每次迭代中发生变化。这样可以 专注于 解决方案 的热部件 ,并 保持冷部件不受影响 。通常,大多数解决方案相对较快地冷却,后来的迭代仅在一小部分数据上运行。

Delta 迭代算子

  1. 迭代输入 :从 数据源 或 先前的 算子 读取初始工作集和解决方案集作为第一次迭代的输入。
  2. 步骤函数 :步进函数将在每次迭代中执行。它是由像算子的任意数据流 mapreducejoin 等,取决于手头的特定任务。
  3. 一个工作集 /更新解决方案集 : 下一个工作集 驱动迭代计算,并将反馈到 下一个迭代 。此外,解决方案集将被更新并隐式转发(不需要重建)。两个数据集都可以由步进函数的不同 算子更新。
  4. 迭代结果 :在 最后一次迭代之后 , 解决方案集 被写入 数据接收器 或用作 以下 算子的 输入。

delta 迭代的默认 终止条件空工作集收敛标准最大迭代次数指定 。当生成的 下一个工作集 为空或达到最大迭代次数时,迭代将终止。还可以指定 自定义聚合器收敛标准

您还可以考虑伪代码中的迭代 算子:

IterationState workset = getInitialState();
IterationState solution = getInitialSolution();

while (!terminationCriterion()) {
  (delta, workset) = step(workset, solution);

  solution.update(delta)
}

setFinalState(solution);

有关详细信息和代码示例,请参阅 编程指南

示例:在图表中传播最小值

在以下示例中,每个顶点都有一个 ID 和一个 着色 。每个顶点将其顶点 ID 传播到相邻顶点。该 目标 是 最小 ID 分配给子图的每个顶点 。如果接收的 ID 小于当前的 ID,则它将变为具有接收到的 ID 的顶点的颜色。其中一个应用可以在 社区分析 或 连通组件 计算中找到。

Delta 迭代 算子示例

初始输入 被设置为 两个工作集和溶液组。 在上图中,颜色可视化 解决方案集演变 。每次迭代时,最小 ID 的颜色在相应的子图中展开。同时,每次迭代,工作量(交换和比较顶点 ID)都会 Reduce。这对应于 工作集的大小减小 ,其在三次迭代之后从所有七个顶点变为零,此时迭代终止。在 重要的观察 是, 较低的子收敛上半之前 不和增量迭代能够与工作集抽象捕捉到这一点。

在上部子图 ID 1 ( 橙色 )中是 最小 ID 。在第 一次迭代中 ,它将传播到顶点 2,随后将其颜色更改为橙色。顶点 3 和 4 将接收 ID 2 ( 黄色 )作为其当前最小 ID 并更改为黄色。因为 顶点 1 的颜色在第一次迭代中没有改变,所以可以在下一个工作集中跳过它。

在较低的子图中, ID 5 ( 青色 )是 最小 ID 。下子图的所有顶点将在第一次迭代中接收它。同样,我们可以跳过下一个工作集的未更改顶点( 顶点 5 )。

第二次迭代中 ,工作集大小已经从七个数据元 Reduce 到五个数据元(顶点 2,3,4,6 和 7)。这些是迭代的一部分,并进一步传播其当前的最小 ID。在此迭代之后,下部子图已经收敛(图的 冷部分 ),因为它在工作集中没有数据元,而上半部分需要对剩余的两个工作集数据元(顶点)进行进一步迭代(图的 热部分 ) 3 和 4)。

第三次迭代 后工作集为空时,迭代 终止

超级同步

我们将迭代 算子的阶梯函数的每次执行称为 单次迭代 。在并行设置中,在迭代状态的不同分区上 并行评估步骤函数的多个实例 。在许多设置中,对所有并行实例的步骤函数的一个评估形成所谓的 超级步骤 ,其也是同步的粒度。因此,迭代的 所有 并行任务都需要在初始化下一个超级步骤之前完成超级步骤。 终止标准 也将在超级障碍评估。

超级步

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。