- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
DataStream API 教程
在本指南中,我们将从头开始,从设置 Flink 项目到在 Flink 集群上运行流分析程序。
Wikipedia 提供了一个 IRC 频道,其中记录了对 Wiki 的所有编辑。我们将在 Flink 中读取此通道,并计算每个用户在给定时间窗口内编辑的字节数。这很容易使用 Flink 在几分钟内实现,但它将为您提供一个良好的基础,从而开始自己构建更复杂的分析程序。
设置 Maven 项目
我们将使用 Flink Maven Archetype 来创建我们的项目结构。有关此内容的更多详细信息,请参阅 Java API 快速入门 。出于我们的目的,运行命令是这样的:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
-DarchetypeVersion=1.7-SNAPSHOT \
-DgroupId=wiki-edits \
-DartifactId=wiki-edits \
-Dversion=0.1 \
-Dpackage=wikiedits \
-DinteractiveMode=false
注意 :对于 Maven 3.0 或更高版本,不再可以通过命令行指定存储库(-DarchetypeCatalog)。如果要使用 SNAPSHOT 存储库,则需要向 settings.xml 添加存储库条目。有关此更改的详细信息,请参阅 Maven 官方文档
您可以编辑 groupId
, artifactId
而 package
如果你喜欢。使用上面的参数,Maven 将创建一个如下所示的项目结构:
$ tree wiki-edits
wiki-edits/
├── pom.xml
└── src
└── main
├── java
│ └── wikiedits
│ ├── BatchJob.java
│ ├── SocketTextStreamWordCount.java
│ ├── StreamingJob.java
│ └── WordCount.java
└── resources
└── log4j.properties
我们的 pom.xml
文件已经在根目录中添加了 Flink 依赖项,并且有几个示例 Flink 程序 src/main/java
。我们可以删除示例程序,因为我们将从头开始:
$ rm wiki-edits/src/main/java/wikiedits/*.java
作为最后一步,我们需要将 Flink Wikipedia 连接器添加为依赖关系,以便我们可以在我们的程序中使用它。编辑它的 dependencies
部分 pom.xml
,使它看起来像这样:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
注意 flink-connector-wikiedits_2.11
添加的依赖项。(此示例和 Wikipedia 连接器的灵感来自 Apache Samza 的 Hello Samza 示例。)
编写 Flink 程序
这是编码时间。启动您喜欢的 IDE 并导入 Maven 项目或打开文本编辑器并创建文件 src/main/java/wikiedits/WikipediaAnalysis.java
:
package wikiedits;
public class WikipediaAnalysis {
public static void main(String[] args) throws Exception {
}
}
该计划现在非常基础,但我们会尽力填写。请注意,我不会在此处提供 import 语句,因为 IDE 可以自动添加它们。在本节结束时,如果您只想跳过并在编辑器中输入,我将使用 import 语句显示完整的代码。
Flink 程序的第一步是创建一个 StreamExecutionEnvironment
(或者 ExecutionEnvironment
如果您正在编写批处理作业)。这可用于设置执行参数并创建从外部系统读取的源。所以让我们继续把它添加到 main 方法:
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
接下来,我们将创建一个从 Wikipedia IRC 日志中读取的源:
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
这创建了一个我们可以进一步处理 DataStream
的 WikipediaEditEvent
数据元。出于本示例的目的,我们感兴趣的是确定每个用户在特定时间窗口中添加或删除的字节数,比如说五秒。为此,我们首先必须指定我们要在用户名上键入流,也就是说此流上的 算子操作应考虑用户名。在我们的例子中,窗口中编辑的字节的总和应该是每个唯一的用户。对于键入流,我们必须提供一个 KeySelector
,如下所示:
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent event) {
return event.getUser();
}
});
这为我们提供了一个 WikipediaEditEvent
具有 String
Keys 的用户名。我们现在可以指定我们希望在此流上加上窗口,并根据这些窗口中的数据元计算结果。窗口指定要在其上执行计算的 Stream 片。在无限的数据元流上计算聚合时需要 Windows。在我们的例子中,我们将说我们想要每五秒聚合一次编辑的字节总和:
DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
第一个调用, .timeWindow()
指定我们想要有五秒钟的翻滚(非重叠)窗口。第二个调用为每个唯一键指定每个窗口切片的 折叠变换 。在我们的例子中,我们从一个初始值开始, ("", 0L)
并在其中为用户添加该时间窗口中每个编辑的字节差异。生成的 Stream 现在包含 Tuple2<String, Long>
每五秒钟发出一次的用户。
剩下要做的就是将流打印到控制台并开始执行:
result.print();
see.execute();
最后一次调用是启动实际 Flink 作业所必需的。所有 算子操作(例如创建源,转换和接收器)仅构建内部 算子操作的图形。只有在 execute()
被调用时 才会在集群上抛出或在本地计算机上执行此 算子操作图。
到目前为止完整的代码是这样的:
package wikiedits;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
public class WikipediaAnalysis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent event) {
return event.getUser();
}
});
DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.print();
see.execute();
}
}
您可以使用 Maven 在 IDE 或命令行上运行此示例:
$ mvn clean package
$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
第一个命令构建我们的项目,第二个命令执行我们的主类。输出应该类似于:
1> (Fenix down,114)
6> (AnomieBOT,155)
8> (BD2412bot,-3690)
7> (IgnorantArmies,49)
3> (Ckh3111,69)
5> (Slade360,0)
7> (Narutolovehinata5,2195)
6> (Vuyisa2001,79)
4> (Ms Sarah Welch,269)
4> (KasparBot,-245)
每行前面的数字告诉您输出生成的打印接收器的哪个并行实例。
这应该让您开始编写自己的 Flink 程序。要了解更多信息,您可以查看我们的 基本概念 指南和 DataStream API 。如果您想了解如何在自己的机器上设置 Flink 群集并将结果写入 Kafka ,请坚持参加奖励练习。
奖金练习:在群集上运行并写入 Kafka
请按照我们的 本地安装教程 在您的机器上设置 Flink 分发,并 在继续 算子操作之前参考 Kafka 快速入门 以设置 Kafka 安装。
作为第一步,我们必须添加 Flink Kafka 连接器作为依赖关系,以便我们可以使用 Kafka 接收器。将其添加到 pom.xml
依赖项部分中的文件:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
接下来,我们需要修改我们的程序。我们将移除 print()
水槽,而是使用 Kafka 水槽。新代码如下所示:
result
.map(new MapFunction<Tuple2<String,Long>, String>() {
@Override
public String map(Tuple2<String, Long> tuple) {
return tuple.toString();
}
})
.addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));
还需要导入相关的类:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.MapFunction;
注意我们是如何第一个转换的流 Tuple2<String, Long>
来流 String
使用 MapFunction。我们这样做是因为将简单字符串写入 Kafka 更容易。然后,我们创建了一个 Kafka 水槽。您可能必须使主机名和端口适应您的设置。 "wiki-result"
是运行我们的程序之前我们将要创建的 Kafka 流的名称。使用 Maven 构建项目因为我们需要 jar 文件在集群上运行:
$ mvn clean package
生成的 jar 文件将位于 target
子文件夹中: target/wiki-edits-0.1.jar
。我们稍后会用到它。
现在我们准备启动 Flink 集群并运行写入 Kafka 的程序。转到安装 Flink 的位置并启动本地群集:
$ cd my/flink/directory
$ bin/start-cluster.sh
我们还必须创建 Kafka 主题,以便我们的程序可以写入它:
$ cd my/kafka/directory
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results
现在我们准备在本地 Flink 集群上运行我们的 jar 文件:
$ cd my/flink/directory
$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
如果一切按计划进行,那么该命令的输出应该与此类似:
03/08/2016 15:09:27 Job execution switched to status RUNNING.
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
您可以看到各个算子如何开始运行。只有两个,因为出于性能原因,窗口之后的 算子操作被折叠成一个 算子操作。在 Flink,我们称之为 链接 。
您可以通过使用 Kafka 控制台使用者检查 Kafka 主题来观察程序的输出:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wiki-result
您还可以查看应在 http:// localhost:8081 上 运行的 Flink 仪表板。您将获得群集资源和正在运行的作业的概述:
如果单击正在运行的作业,您将获得一个视图,您可以在其中检查各个 算子操作,例如,查看已处理数据元的数量:
这就结束了我们对 Flink 的小游览。如果您有任何疑问,请随时询问我们的 邮件列表 。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论