- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
批处理示例
以下示例程序展示了 Flink 的不同应用程序,从简单的字数统计到图形算法。代码示例说明了 Flink 的 DataSet API 的使用 。
可以在 Flink 源存储库的 flink-examples-batch 模块中找到以下和更多示例的完整源代码。
运行一个例子
为了运行 Flink 示例,我们假设您有一个正在运行的 Flink 实例。导航中的“快速入门”和“设置”选项卡描述了启动 Flink 的各种方法。
最简单的方法是运行 ./bin/start-cluster.sh
,默认情况下启动一个带有一个 JobManager 和一个 TaskManager 的本地集群。
Flink 的每个二进制版本都包含一个 examples
目录,其中包含此页面上每个示例的 jar 文件。
要运行 WordCount 示例,请发出以下命令:
./bin/flink run ./examples/batch/WordCount.jar
其他示例可以以类似的方式启动。
请注意,通过使用内置数据,许多示例在不传递任何参数的情况下运行。要使用实际数据运行 WordCount,您必须将路径传递给数据:
./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result
请注意,非本地文件系统需要模式前缀,例如 hdfs://
。
字数
WordCount 是大数据处理系统的“Hello World”。它计算文本集合中单词的频率。该算法分两步进行:首先,文本将文本分成单个单词。其次,对单词进行分组和计数。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("/path/to/file");
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
counts.writeAsCsv(outputPath, "\n", " ");
// User-defined functions
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
该 字计数示例 实现上述算法的输入参数: --input <path> --output <path>
。作为测试数据,任何文本文件都可以。
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data val text = env.readTextFile("/path/to/file")
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
counts.writeAsCsv(outputPath, "\n", " ")
该 字计数示例 实现上述算法的输入参数: --input <path> --output <path>
。作为测试数据,任何文本文件都可以。
网页排名
PageRank 算法计算链接定义的图形中页面的“重要性”,链接指向一个页面到另一个页面。它是一种迭代图算法,这意味着它重复应用相同的计算。在每次迭代中,每个页面在其所有邻居上分配其当前等级,并将其新等级计算为从其邻居接收的等级的纳税总和。PageRank 算法由 Google 搜索引擎推广,该搜索引擎利用网页的重要性对搜索查询的结果进行排名。
在这个简单的例子中,PageRank 通过 批量迭代 和固定数量的迭代来实现。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read the pages and initial ranks by parsing a CSV file
DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
.types(Long.class, Double.class)
// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);
// set iterative data set
IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
DataSet<Tuple2<Long, Double>> newRanks = iteration
// join pages with outgoing edges and distribute rank
.join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
// collect and sum ranks
.groupBy(0).sum(1)
// apply dampening factor
.map(new Dampener(DAMPENING_FACTOR, numPages));
DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
newRanks,
newRanks.join(iteration).where(0).equalTo(0)
// termination condition
.filter(new EpsilonFilter()));
finalPageRanks.writeAsCsv(outputPath, "\n", " ");
// User-defined functions
public static final class JoinVertexWithEdgesMatch
implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,
Tuple2<Long, Double>> {
@Override
public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,
Collector<Tuple2<Long, Double>> out) {
Long[] neighbors = adj.f1;
double rank = page.f1;
double rankToDistribute = rank / ((double) neigbors.length);
for (int i = 0; i < neighbors.length; i++) {
out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute));
}
}
}
public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
private final double dampening, randomJump;
public Dampener(double dampening, double numVertices) {
this.dampening = dampening;
this.randomJump = (1 - dampening) / numVertices;
}
@Override
public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
value.f1 = (value.f1 * dampening) + randomJump;
return value;
}
}
public static final class EpsilonFilter
implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
@Override
public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
}
}
所述 的 PageRank 程序 实现上述实施例。它需要运行以下参数: --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
。
// User-defined types case class Link(sourceId: Long, targetId: Long)
case class Page(pageId: Long, rank: Double)
case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
// set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment
// read the pages and initial ranks by parsing a CSV file val pages = env.readCsvFile[Page](pagesInputPath)
// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids)) val links = env.readCsvFile[Link](linksInputPath)
// assign initial ranks to pages val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
// build adjacency list from link input val adjacencyLists = links
// initialize lists
.map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
// concatenate lists
.groupBy("sourceId").reduce {
(l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
}
// start iteration val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
currentRanks =>
val newRanks = currentRanks
// distribute ranks to target pages
.join(adjacencyLists).where("pageId").equalTo("sourceId") {
(page, adjacent, out: Collector[Page]) =>
for (targetId <- adjacent.targetIds) {
out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
}
}
// collect ranks and sum them up
.groupBy("pageId").aggregate(SUM, "rank")
// apply dampening factor
.map { p =>
Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
}
// terminate if no rank update was significant
val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
(current, next, out: Collector[Int]) =>
// check for significant update
if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
}
(newRanks, termination)
}
val result = finalRanks
// emit result result.writeAsCsv(outputPath, "\n", " ")
he PageRank program implements the above example. It requires the following parameters to run: --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
.
输入文件是纯文本文件,必须格式如下:
- 页面表示为由新行字符分隔的(长)ID。
- 例如,
"1\n2\n12\n42\n63\n"
给出五个页面 ID 为 1,2,12,42 和 63 的页面。
- 例如,
- 链接表示为由空格字符分隔的页面 ID 对。链接由换行符分隔:
- 例如,
"1 2\n2 12\n1 12\n42 63\n"
给出四个(定向)链接(1) - >(2),(2) - >(12),(1) - >(12)和(42) - >(63)。
- 例如,
对于这个简单的实现,要求每个页面至少有一个传入链接和一个传出链接(页面可以指向自身)。
连接组件
连通分量算法通过为同一连接部分中的所有顶点分配相同的组件 ID 来识别较大图形的部分。与 PageRank 类似,Connected Components 是一种迭代算法。在每个步骤中,每个顶点将其当前组件 ID 传播到其所有邻居。如果顶点小于其自己的组件 ID,则顶点接受来自邻居的组件 ID。
此实现使用 增量迭代 :未更改其组件 ID 的顶点不参与下一步。这会产生更好的性能,因为后面的迭代通常只处理一些异常值顶点。
// read vertex and edge data
DataSet<Long> vertices = getVertexDataSet(env);
DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
// assign the initial component IDs (equal to the vertex ID)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
// apply the step logic:
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
// join with the edges
.join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
// select the minimum neighbor component ID
.groupBy(0).aggregate(Aggregations.MIN, 1)
// update if the component ID of the candidate is smaller
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.flatMap(new ComponentIdFilter());
// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
// emit result
result.writeAsCsv(outputPath, "\n", " ");
// User-defined functions
public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
@Override
public Tuple2<T, T> map(T vertex) {
return new Tuple2<T, T>(vertex, vertex);
}
}
public static final class UndirectEdge
implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
@Override
public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
invertedEdge.f0 = edge.f1;
invertedEdge.f1 = edge.f0;
out.collect(edge);
out.collect(invertedEdge);
}
}
public static final class NeighborWithComponentIDJoin
implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
}
}
public static final class ComponentIdFilter
implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
Tuple2<Long, Long>> {
@Override
public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,
Collector<Tuple2<Long, Long>> out) {
if (value.f0.f1 < value.f1.f1) {
out.collect(value.f0);
}
}
}
该 ConnectedComponents 程序 实现上述实施例。它需要运行以下参数: --vertices <path> --edges <path> --output <path> --iterations <n>
。
// set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment
// read vertex and edge data
// assign the initial components (equal to the vertex id) val vertices = getVerticesDataSet(env).map { id => (id, id) }
// undirected edges by emitting for each input edge the input edges itself and an inverted
// version val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
// open a delta iteration val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
(s, ws) =>
// apply the step logic: join with the edges
val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
(edge._2, vertex._2)
}
// select the minimum neighbor
val minNeighbors = allNeighbors.groupBy(0).min(1)
// update if the component of the candidate is smaller
val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
(newVertex, oldVertex, out: Collector[(Long, Long)]) =>
if (newVertex._2 < oldVertex._2) out.collect(newVertex)
}
// delta and new workset are identical
(updatedComponents, updatedComponents)
}
verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
这个 PageRank 程序 实现了上面的例子。它需要运行以下参数: --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
。
输入文件是纯文本文件,必须格式如下:
- 顶点表示为 ID 并用换行符分隔。
- 例如,
"1\n2\n12\n42\n63\n"
给出五个顶点(1),(2),(12),(42)和(63)。
- 例如,
- 边缘表示为由空格字符分隔的顶点 ID 的对。边线由换行符分隔:
- 例如,
"1 2\n2 12\n1 12\n42 63\n"
给出四个(无向)链路(1) - (2),(2) - (12),(1) - (12)和(42) - (63)。
- 例如,
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论