返回介绍

Flink DataStream API 编程指南

发布于 2025-05-02 18:19:13 字数 17998 浏览 0 评论 0 收藏

Flink 中的 DataStream 程序是实现数据流转换的常规程序(例如,Filter,更新状态,定义窗口,聚合)。最初从各种源(例如,消息队列,套接字流,文件)创建数据流。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地 JVM 中执行,也可以在许多计算机的集群上执行。

有关 Flink API 基本概念 的介绍,请参阅 基本概念

为了创建您自己的 Flink DataStream 程序,我们鼓励您从 Flink 程序的解剖 开始, 逐步添加您自己的 流转换 。其余部分充当其他 算子操作和高级函数的参考。

示例程序

以下程序是流窗口字数统计应用程序的完整工作示例,它在 5 秒窗口中对来自 Web 套接字的单词进行计数。您可以复制并粘贴代码以在本地运行它。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> dataStream = env
        .socketTextStream("localhost", 9999)
        .flatMap(new Splitter())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

    dataStream.print();

    env.execute("Window WordCount");
  }

  public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
      for (String word: sentence.split(" ")) {
        out.collect(new Tuple2<String, Integer>(word, 1));
      }
    }
  }

}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val text = env.socketTextStream("localhost", 9999)

  val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
    .map { (_, 1) }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1)

  counts.print()

  env.execute("Window Stream WordCount")
  }
}

要运行示例程序,首先从终端使用 netcat 启动输入流:

nc -lk 9999

只需键入一些单词就可以返回一个新单词。这些将是字数统计程序的输入。如果要查看大于 1 的计数,请在 5 秒内反复键入相同的单词(如果无法快速输入,则将窗口大小从 5 秒增加☺)。

数据源

源是您的程序从中读取输入的位置。您可以使用将附加源附加到程序 StreamExecutionEnvironment.addSource(sourceFunction) 。Flink 附带了许多预先实现的源函数,但您可以通过实现 SourceFunction 非并行源,或通过实现 ParallelSourceFunction 接口或扩展 RichParallelSourceFunction 并行源来编写自己的自定义源。

有几个预定义的流源可从以下位置访问 StreamExecutionEnvironment

基于文件的:

  • readTextFile(path) - TextInputFormat 逐行读取文本文件,即符合规范的文件,并将它们作为字符串返回。
  • readFile(fileInputFormat, path) - 按指定的文件输入格式指定读取(一次)文件。
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是前两个内部调用的方法。它 path 根据给定的内容读取文件 fileInputFormat 。根据提供的内容 watchType ,此源可以定期监视(每 interval ms)新数据( FileProcessingMode.PROCESS_CONTINUOUSLY )的路径,或者处理当前在路径中的数据并退出( FileProcessingMode.PROCESS_ONCE )。使用该 pathFilter ,用户可以进一步排除正在处理的文件。

    实现:

    在引擎盖下,Flink 将文件读取过程分为两个子任务,即 目录监控 和 数据读取 。这些子任务中的每一个都由单独的实体实现。监视由单个 非并行 (并行性= 1)任务实现,而读取由并行运行的多个任务执行。后者的并行性等于工作并行性。单个监视任务的作用是扫描目录(定期或仅一次,具体取决于 watchType ),找到要处理的文件,将它们 分成分割 ,并将这些拆分分配给下游读卡器。读者是那些将阅读实际数据的人。每个分割仅由一个读取器读取,而读取器可以逐个读取多个分割。

    重要笔记:

    1. 如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY ,则在修改文件时,将完全重新处理其内容。这可以打破“完全一次”的语义,因为在文件末尾追加数据将导致其 所有 内容被重新处理。
    2. 如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE ,则源扫描路径 一次 并退出,而不等待读者完成读取文件内容。当然读者将继续阅读,直到读取所有文件内容。在该点之后关闭源将导致不再有检查点。这可能会导致节点发生故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。

插座为基础的:

  • socketTextStream - 从套接字读取。数据元可以用分隔符分隔。

基于集合:

  • fromCollection(Collection) - 从 Java Java.util.Collection 创建数据流。集合中的所有数据元必须属于同一类型。
  • fromCollection(Iterator, Class) - 从迭代器创建数据流。该类指定迭代器返回的数据元的数据类型。
  • fromElements(T ...) - 从给定的对象序列创建数据流。所有对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator, Class) - 并行地从迭代器创建数据流。该类指定迭代器返回的数据元的数据类型。
  • generateSequence(from, to) - 并行生成给定间隔中的数字序列。

自定义:

  • addSource - 附加新的源函数。例如,要从 Apache Kafka 中读取,您可以使用 addSource(new FlinkKafkaConsumer08&lt;&gt;(...)) 。请参阅 连接器 以获取更多详

源是您的程序从中读取输入的位置。您可以使用将附加源附加到程序 StreamExecutionEnvironment.addSource(sourceFunction) 。Flink 附带了许多预先实现的源函数,但您可以通过实现 SourceFunction 非并行源,或通过实现 ParallelSourceFunction 接口或扩展 RichParallelSourceFunction 并行源来编写自己的自定义源。

有几个预定义的流源可从以下位置访问 StreamExecutionEnvironment

基于文件的:

  • readTextFile(path) - TextInputFormat 逐行读取文本文件,即符合规范的文件,并将它们作为字符串返回。
  • readFile(fileInputFormat, path) - 按指定的文件输入格式指定读取(一次)文件。
  • readFile(fileInputFormat, path, watchType, interval, pathFilter) - 这是前两个内部调用的方法。它 path 根据给定的内容读取文件 fileInputFormat 。根据提供的内容 watchType ,此源可以定期监视(每 interval ms)新数据( FileProcessingMode.PROCESS_CONTINUOUSLY )的路径,或者处理当前在路径中的数据并退出( FileProcessingMode.PROCESS_ONCE )。使用该 pathFilter ,用户可以进一步排除正在处理的文件。

    实现:

    在引擎盖下,Flink 将文件读取过程分为两个子任务,即 目录监控 和 数据读取 。这些子任务中的每一个都由单独的实体实现。监视由单个 非并行 (并行性= 1)任务实现,而读取由并行运行的多个任务执行。后者的并行性等于工作并行性。单个监视任务的作用是扫描目录(定期或仅一次,具体取决于 watchType ),找到要处理的文件,将它们 分成分割 ,并将这些拆分分配给下游读卡器。读者是那些将阅读实际数据的人。每个分割仅由一个读取器读取,而读取器可以逐个读取多个分割。

    重要笔记:

    1. 如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY ,则在修改文件时,将完全重新处理其内容。这可以打破“完全一次”的语义,因为在文件末尾追加数据将导致其 所有 内容被重新处理。
    2. 如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE ,则源扫描路径 一次 并退出,而不等待读者完成读取文件内容。当然读者将继续阅读,直到读取所有文件内容。在该点之后关闭源将导致不再有检查点。这可能会导致节点发生故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。

插座为基础的:

  • socketTextStream - 从套接字读取。元素可以用分隔符分隔。

基于集合:

  • fromCollection(Seq) - 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。
  • fromCollection(Iterator) - 从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
  • fromElements(elements: _*) - 从给定的对象序列创建数据流。所有对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator) - 并行地从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
  • generateSequence(from, to) - 并行生成给定间隔中的数字序列。

自定义:

  • addSource - 附加新的源函数。例如,要从 Apache Kafka 中读取,您可以使用 addSource(new FlinkKafkaConsumer08&lt;&gt;(...)) 。请参阅 连接器 以获取更多详

DataStream 转换

有关可用流转换的概述,请参阅 算子

数据接收

数据接收器使用 DataStream 并将它们转发到文件,套接字,外部系统或打印它们。Flink 带有各种内置输出格式,这些格式封装在 DataStreams 上的 算子操作后面:

  • writeAsText() / TextOutputFormat - 按字符串顺序写入数据元。通过调用每个数据元的 toString() 方法获得字符串。
  • writeAsCsv(...) / CsvOutputFormat - 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的 toString() 方法。
  • print() / printToErr() - 在标准输出/标准错误流上打印每个数据元的 toString() 值。可选地,可以提供前缀(msg),其前缀为输出。这有助于区分不同的 打印 调用。如果并行度大于 1,则输出也将与生成输出的任务的标识符一起添加。
  • writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • writeToSocket - 根据 a 将数据元写入套接字 SerializationSchema
  • addSink - 调用自定义接收器函数。Flink 捆绑了其他系统(如 Apache Kafka)的连接器,这些系统实现为接收器函数。

数据接收器使用 DataStream 并将它们转发到文件,套接字,外部系统或打印它们。Flink 带有各种内置输出格式,这些格式封装在 DataStreams 上的 算子操作后面:

  • writeAsText() / TextOutputFormat - 按字符串顺序写入元素。通过调用每个元素的 toString() 方法获得字符串。
  • writeAsCsv(...) / CsvOutputFormat - 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的 toString() 方法。
  • print() / printToErr() - 在标准输出/标准错误流上打印每个元素的 toString() 值。可选地,可以提供前缀(msg),其前缀为输出。这有助于区分不同的 打印 调用。如果并行度大于 1,则输出也将与生成输出的任务的标识符一起添加。
  • writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • writeToSocket - 根据 a 将元素写入套接字 SerializationSchema
  • addSink - 调用自定义接收器函数。Flink 捆绑了其他系统(如 Apache Kafka)的连接器,这些系统实现为接收器函数。

请注意, write*() 方法 DataStream 主要用于调试目的。他们没有参与 Flink 的检查点,这意味着这些函数通常具有至少一次的语义。刷新到目标系统的数据取决于 OutputFormat 的实现。这意味着并非所有发送到 OutputFormat 的数据元都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。

要将流可靠,准确地一次传送到文件系统,请使用 flink-connector-filesystem 。此外,通过该 .addSink(...) 方法的自定义实现可以参与 Flink 的精确一次语义检查点。

迭代

迭代流程序实现步进函数并将其嵌入到 IterativeStream 。由于 DataStream 程序可能永远不会完成,因此没有最大迭代次数。相反,您需要指定流的哪个部分反馈到迭代,哪个部分使用 split 转换或转发到下游 filter 。在这里,我们展示了使用过滤器的示例。首先,我们定义一个 IterativeStream

IterativeStream<Integer> iteration = input.iterate();

然后,我们使用一系列转换指定将在循环内执行的逻辑(这里是一个简单的 map 转换)

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

要关闭迭代并定义迭代尾部,请调用 closeWith(feedbackStream) 方法 IterativeStream 。赋予 closeWith 函数的 DataStream 将反馈给迭代头。常见的模式是使用过滤器来分离反馈的流的部分和向前传播的流的部分。这些滤波器可以例如定义“终止”逻辑,其中允许元件向下游传播而不是反馈。

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

例如,这里是从一系列整数中连续减去 1 直到它们达到零的程序:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
  return value - 1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
  return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
  return (value <= 0);
  }
});

迭代流程序实现步进函数并将其嵌入到 IterativeStream 。由于 DataStream 程序可能永远不会完成,因此没有最大迭代次数。相反,您需要指定流的哪个部分反馈到迭代,哪个部分使用 split 转换或转发到下游 filter 。这里,我们展示了一个示例迭代,其中正文(重复的计算部分)是一个简单的映射转换,反馈的元素由使用过滤器向下游转发的元素区分。

val iteratedStream = someDataStream.iterate(
  iteration => {
  val iterationBody = iteration.map(/* this is executed many times */)
  (iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
})

例如,这里是从一系列整数中连续减去 1 直到它们达到零的程序:

val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)

val iteratedStream = someIntegers.iterate(
  iteration => {
  val minusOne = iteration.map( v => v - 1)
  val stillGreaterThanZero = minusOne.filter (_ > 0)
  val lessThanZero = minusOne.filter(_ <= 0)
  (stillGreaterThanZero, lessThanZero)
  }
)

执行参数

StreamExecutionEnvironment 包含 ExecutionConfig 允许为运行时设置工作的具体配置值。

有关大多数参数的说明,请参阅 执行配置 。这些参数特别适用于 DataStream API:

  • setAutoWatermarkInterval(long milliseconds) :设置自动水印发射的间隔。您可以使用获取当前值 long getAutoWatermarkInterval()

容错

State&Checkpointing 描述了如何启用和配置 Flink 的检查点机制。

控制延迟

默认情况下,数据元不会逐个传输到网络上(这会导致不必要的网络流量),但会被缓冲。可以在 Flink 配置文件中设置缓冲区的大小(实际在计算机之间传输)。虽然此方法适用于优化吞吐量,但当传入流速度不够快时,可能会导致延迟问题。要控制吞吐量和延迟,您可以 env.setBufferTimeout(timeoutMillis) 在运行环境(或单个 算子)上使用以设置缓冲区填充的最长等待时间。在此之后,即使缓冲区未满,也会自动发送缓冲区。此超时的默认值为 100 毫秒。

用法:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)

env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)

为了最大化吞吐量,设置 setBufferTimeout(-1) 哪个将删除超时和缓冲区只有在它们已满时才会被刷新。要最小化延迟,请将超时设置为接近 0 的值(例如 5 或 10 ms)。应避免缓冲区超时为 0,因为它可能导致严重的性能下降。

调试

在分布式集群中运行流式程序之前,最好确保实现的算法按预期工作。因此,实施数据分析程序通常是检查结果,调试和改进的增量过程。

Flink 通过支持 IDE 内的本地调试,测试数据的注入和结果数据的收集,提供了显着简化数据分析程序开发过程的函数。本节提供了一些如何简化 Flink 程序开发的提示。

本地运行环境

A LocalStreamEnvironment 在创建它的同一 JVM 进程中启动 Flink 系统。如果从 IDE 启动 LocalEnvironment,则可以在代码中设置断点并轻松调试程序。

创建 LocalEnvironment 并使用如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();
val env = StreamExecutionEnvironment.createLocalEnvironment()

val lines = env.addSource(/* some source */)
// build your program 
env.execute()

收集数据源

Flink 提供了特殊的数据源,这些数据源由 Java 集合支持,以方便测试。一旦程序经过测试,源和接收器可以很容易地被读取/写入外部系统的源和接收器替换。

集合数据源可以使用如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
val env = StreamExecutionEnvironment.createLocalEnvironment()

// Create a DataStream from a list of elements val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataStream from any Collection val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)

// Create a DataStream from an Iterator val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

注意: 目前,集合数据源要求实现数据类型和迭代器 Serializable 。此外,收集数据源不能并行执行(并行度= 1)。

迭代器数据接收器

Flink 还提供了一个接收器,用于收集 DataStream 结果以进行测试和调试。它可以使用如下:

import org.apache.flink.streaming.experimental.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
import org.apache.flink.streaming.experimental.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter

val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala

注意: flink-streaming-contrib 模块已从 Flink 1.5.0 中删除。它的课程已被移入 flink-streaming-javaflink-streaming-scala

下一步

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。