- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
DataStream 转换
转换: 映射 DataStream→DataStream
描述:采用一个数据元并生成一个数据元。一个 map 函数,它将输入流的值加倍:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
转换: FlatMap DataStream→DataStream
描述:采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的 flatmap 函数:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
转换: Filter DataStream→DataStream
描述:计算每个数据元的布尔函数,并保存函数返回 true 的数据元。过滤掉零值的过滤器:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
转换: KeyBy DataStream→KeyedStream
描述:逻辑上将流分区为不相交的分区。具有相同 Keys 的所有记录都分配给同一分区。在内部, keyBy() 是使用散列分区实现的。 指定键 有不同的方法。此转换返回 KeyedStream ,其中包括使用 被 Keys 化状态 所需的 KeyedStream 。
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
注意 如果出现以下情况,则类型 不能成为关键 :
- 它是 POJO 类型但不覆盖 hashCode() 方法并依赖于 Object.hashCode() 实现。
- 它是任何类型的数组。
转换: Reduce KeyedStream→DataStream
描述:被 Keys 化数据流上的“滚动”Reduce。将当前数据元与最后一个 Reduce 的值组合并发出新值。
reduce 函数,用于创建部分和的流:
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
转换: 折叠 KeyedStream→DataStream
描述:具有初始值的被 Keys 化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值。
折叠函数,当应用于序列(1,2,3,4,5)时,发出序列“start-1”,“start-1-2”,“start-1-2-3”,. ..
DataStream<String> result =
keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
转换: 聚合 KeyedStream→DataStream
描述:在被 Keys 化数据流上滚动聚合。min 和 minBy 之间的差异是 min 返回最小值,而 minBy 返回该字段中具有最小值的数据元(max 和 maxBy 相同)。
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
转换: Window KeyedStream→WindowedStream
描述:可以在已经分区的 KeyedStream 上定义 Windows。Windows 根据某些特征(例如,在最后 5 秒内到达的数据)对每个 Keys 中的数据进行分组。有关 窗口 的完整说明,请参见 windows。
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
转换: WindowAll DataStream→AllWindowedStream
描述:Windows 可以在常规 DataStream 上定义。Windows 根据某些特征(例如,在最后 5 秒内到达的数据)对所有流事件进行分组。有关 窗口 的完整说明,请参见 windows。 警告: 在许多情况下,这 是非并行 转换。所有记录将收集在 windowAll 算子的一个任务中。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
转换: Window Apply WindowedStream→DataStream AllWindowedStream→DataStream
描述:将一般函数应用于整个窗口。下面是一个手动求和窗口数据元的函数。 注意: 如果您正在使用 windowAll 转换,则需要使用 AllWindowFunction。
windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public void apply (Tuple tuple,
Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
public void apply (Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
转换: Window Reduce WindowedStream→DataStream
描述:将函数缩减函数应用于窗口并返回缩小的值。
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
});
转换: Window Fold WindowedStream→DataStream
描述:将函数折叠函数应用于窗口并返回折叠值。示例函数应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”:
windowedStream.fold("start", new FoldFunction<Integer, String>() {
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
转换: Windows 上的聚合 WindowedStream→DataStream
描述:聚合窗口的内容。min 和 minBy 之间的差异是 min 返回最小值,而 minBy 返回该字段中具有最小值的数据元(max 和 maxBy 相同)。
windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
转换: Union DataStream *→DataStream
描述:两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元。
dataStream.union(otherStream1, otherStream2, ...);
转换: Window Join DataStream,DataStream→DataStream
描述:在给定 Keys 和公共窗口上连接两个数据流。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
转换: Interval Join KeyedStream,KeyedStream→DataStream
描述:在给定的时间间隔内使用公共 Keys 关联两个被 Key 化的数据流的两个数据元 e1 和 e2,以便 e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});
转换: Window CoGroup DataStream,DataStream→DataStream
描述:在给定 Keys 和公共窗口上对两个数据流进行 Cogroup。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});
转换: 连接 DataStream,DataStream→ConnectedStreams
描述:“连接”两个保存其类型的数据流。连接允许两个流之间的共享状态。
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
转换: CoMap,CoFlatMap ConnectedStreams→DataStream
描述:类似于连接数据流上的 map 和 flatMap
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
转换: 拆分 DataStream→SplitStream
描述:根据某些标准将流拆分为两个或更多个流。
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
转换: 选择 SplitStream→DataStream
描述:从拆分流中选择一个或多个流。
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
转换: 迭代 DataStream→IterativeStream→DataStream
描述:通过将一个 算子的输出重定向到某个先前的 算子,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于 0 的数据元将被发送回反馈通道,其余数据元将向下游转发。有关完整说明,请参阅 迭代 。
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value <= 0;
}
});
转换: 提取时间戳 DataStream→DataStream
描述:从记录中提取时间戳,以便使用使用事件时间语义的窗口。查看 活动时间 。
stream.assignTimestamps (new TimeStampExtractor() {...});
转换: Map DataStream → DataStream
描述:Takes one element and produces one element. A map function that doubles the values of the input stream:
dataStream.map { x => x * 2 }
转换: FlatMap DataStream → DataStream
描述:Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
dataStream.flatMap { str => str.split(" ") }
转换: Filter DataStream → DataStream
描述:Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
dataStream.filter { _ != 0 }
转换: KeyBy DataStream → KeyedStream
描述:Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedStream.
dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple
转换: Reduce KeyedStream → DataStream
描述:A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
A reduce function that creates a stream of partial sums:
keyedStream.reduce { _ + _ }
转换: Fold KeyedStream → DataStream
描述:A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.
A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...
val result: DataStream[String] =
keyedStream.fold("start")((str, i) => { str + "-" + i })
转换: Aggregations KeyedStream → DataStream
描述:Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
转换: Window KeyedStream → WindowedStream
描述:Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows.
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
转换: WindowAll DataStream → AllWindowedStream
描述:Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
转换: Window Apply WindowedStream → DataStream AllWindowedStream → DataStream | Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window. Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.
windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction }
转换: Window Reduce WindowedStream → DataStream
描述:Applies a functional reduce function to the window and returns the reduced value.
windowedStream.reduce { _ + _ }
转换: Window Fold WindowedStream → DataStream
描述:Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":
val result: DataStream[String] =
windowedStream.fold("start", (str, i) => { str + "-" + i })
转换: Aggregations on windows WindowedStream → DataStream
描述:Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
转换: Union DataStream* → DataStream
描述:Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.
dataStream.union(otherStream1, otherStream2, ...)
转换: Window Join DataStream,DataStream → DataStream
描述:Join two data streams on a given key and a common window.
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
转换: Window CoGroup DataStream,DataStream → DataStream
描述:Cogroups two data streams on a given key and a common window.
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
转换: Connect DataStream,DataStream → ConnectedStreams
描述:"Connects" two data streams retaining their types, allowing for shared state between the two streams.
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
转换: CoMap, CoFlatMap ConnectedStreams → DataStream
描述:Similar to map and flatMap on a connected data stream
connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
转换: Split DataStream → SplitStream
描述:Split the stream into two or more streams according to some criterion.
val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
转换: Select SplitStream → DataStream
描述:Select one or more streams from a split stream.
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
转换: Iterate DataStream → IterativeStream → DataStream
描述:Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description.
initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/*do something*/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}
转换: Extract Timestamps DataStream → DataStream
描述:Extracts timestamps from records in order to work with windows that use event time semantics. See Event Time .
stream.assignTimestamps { timestampExtractor }
Extraction from tuples, case classes and collections via anonymous pattern matching, like the following:
val data: DataStream[(Int, String, Double)] = // [...] data.map {
case (id, name, temperature) => // [...] }
is not supported by the API out-of-the-box. To use this feature, you should use a Scala API extension .
以下转换可用于元组的数据流:
转换: Project DataStream→DataStream
描述:从元组中选择字段的子集
DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论