返回介绍

Flink DataSet API 编程指南

发布于 2025-05-02 18:19:14 字数 65574 浏览 0 评论 0 收藏

Flink 中的 DataSet 程序是实现数据集转换的常规程序(例如,Filter,映射,连接,分组)。数据集最初是从某些来源创建的(例如,通过读取文件或从本地集合创建)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink 程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地 JVM 中执行,也可以在许多计算机的集群上执行。

有关 Flink API 基本概念 的介绍,请参阅 基本概念

为了创建您自己的 Flink DataSet 程序,我们鼓励您从 Flink 程序解剖 开始, 逐步添加您自己的 转换 。其余部分充当其他 算子操作和高级函数的参考。

示例程序

以下程序是 WordCount 的完整工作示例。您可以复制并粘贴代码以在本地运行它。您只需要在项目中包含正确的 Flink 库(请参见 使用 Flink 链接 )并指定导入。那你就准备好了!

public class WordCountExample {
  public static void main(String[] args) throws Exception {
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    DataSet<String> text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?");

    DataSet<Tuple2<String, Integer>> wordCounts = text
      .flatMap(new LineSplitter())
      .groupBy(0)
      .sum(1);

    wordCounts.print();
  }

  public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
      for (String word : line.split(" ")) {
        out.collect(new Tuple2<String, Integer>(word, 1));
      }
    }
  }
}
import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]) {

  val env = ExecutionEnvironment.getExecutionEnvironment
  val text = env.fromElements(
    "Who's there?",
    "I think I hear them. Stand, ho! Who's there?")

  val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
    .map { (_, 1) }
    .groupBy(0)
    .sum(1)

  counts.print()
  }
}

数据集转换

数据转换将一个或多个 DataSet 转换为新的 DataSet。程序可以将多个转换组合到复杂的程序集中。

本节简要概述了可用的转换。该 转换文档 与示例全部转换的完整描述。


转换: Map

描述:采用一个数据元并生成一个数据元。

data.map(new MapFunction&lt;String, Integer&gt;() {
  public Integer map(String value) { return Integer.parseInt(value); }
});

转换: FlatMap

描述:采用一个数据元并生成零个,一个或多个数据元。

data.flatMap(new FlatMapFunction&lt;String, String&gt;() {
  public void flatMap(String value, Collector&lt;String&gt; out) {
  for (String s : value.split(" ")) {
    out.collect(s);
  }
  }
});

转换: MapPartition

描述:在单个函数调用中转换并行分区。该函数将分区作为 Iterable 流来获取,并且可以生成任意数量的结果值。每个分区中的数据元数量取决于并行度和先前的 算子操作。

data.mapPartition(new MapPartitionFunction&lt;String, Long&gt;() {
  public void mapPartition(Iterable&lt;String&gt; values, Collector&lt;Long&gt; out) {
  long c = 0;
  for (String s : values) {
    c++;
  }
  out.collect(c);
  }
});

转换: Filter

描述:计算每个数据元的布尔函数,并保存函数返回 true 的数据元。 重要信息: 系统假定该函数不会修改应用谓词的数据元。违反此假设可能会导致错误的结果。

data.filter(new FilterFunction&lt;Integer&gt;() {
  public boolean filter(Integer value) { return value &gt; 1000; }
});

转换: Reduce

描述:通过将两个数据元重复组合成一个数据元,将一组数据元组合成一个数据元。Reduce 可以应用于完整数据集或分组数据集。

data.reduce(new ReduceFunction&lt;Integer&gt; {
  public Integer reduce(Integer a, Integer b) { return a + b; }
});

如果将 reduce 应用于分组数据集,则可以通过提供 CombineHint to 来指定运行时执行 reduce 的组合阶段的方式 setCombineHint 。在大多数情况下,基于散列的策略应该更快,特别是如果不同键的数量与输入数据元的数量相比较小(例如 1/10)。


转换: ReduceGroup

描述:将一组数据元组合成一个或多个数据元。ReduceGroup 可以应用于完整数据集或分组数据集。

data.reduceGroup(new GroupReduceFunction&lt;Integer, Integer&gt; {
  public void reduce(Iterable&lt;Integer&gt; values, Collector&lt;Integer&gt; out) {
  int prefixSum = 0;
  for (Integer i : values) {
    prefixSum += i;
    out.collect(prefixSum);
  }
  }
});

转换: Aggregate

描述:将一组值聚合为单个值。聚合函数可以被认为是内置的 reduce 函数。聚合可以应用于完整数据集或分组数据集。

Dataset&lt;Tuple3&lt;Integer, String, Double&gt;&gt; input = // [...]
DataSet&lt;Tuple3&lt;Integer, String, Double&gt;&gt; output = input.aggregate(SUM, 0).and(MIN, 2);

您还可以使用简写语法进行最小,最大和总和聚合。

 Dataset&lt;Tuple3&lt;Integer, String, Double&gt;&gt; input = // [...]
DataSet&lt;Tuple3&lt;Integer, String, Double&gt;&gt; output = input.sum(0).andMin(2);

转换: Distinct

描述:返回数据集的不同数据元。它相对于数据元的所有字段或字段子集从输入 DataSet 中删除重复条目。

data.distinct();

使用 reduce 函数实现 Distinct。您可以通过提供 CombineHint to 来指定运行时执行 reduce 的组合阶段的方式 setCombineHint 。在大多数情况下,基于散列的策略应该更快,特别是如果不同键的数量与输入数据元的数量相比较小(例如 1/10)。


转换: Join

描述:通过创建在其键上相等的所有数据元对来连接两个数据集。可选地使用 JoinFunction 将数据元对转换为单个数据元,或使用 FlatJoinFunction 将数据元对转换为任意多个(包括无)数据元。请参阅 键部分 以了解如何定义连接键。

result = input1.join(input2)
         .where(0)     // key of the first input (tuple field 0)
         .equalTo(1);  // key of the second input (tuple field 1)

您可以通过 Join Hints 指定运行时执行连接的方式。提示描述了通过分区或广播进行连接,以及它是使用基于排序还是基于散列的算法。有关可能的提示和示例的列表,请参阅“ 转换指南” 。 如果未指定提示,系统将尝试估算输入大小,并根据这些估计选择最佳策略。

// This executes a join by broadcasting the first data set
// using a hash table for the broadcast data
result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
         .where(0).equalTo(1);

请注意,连接转换仅适用于等连接。其他连接类型需要使用 OuterJoin 或 CoGroup 表示。


转换: OuterJoin

描述:在两个数据集上执行左,右或全外连接。外连接类似于常规(内部)连接,并创建在其键上相等的所有数据元对。此外,如果在另一侧没有找到匹配的 Keys,则保存“外部”侧(左侧,右侧或两者都满)的记录。匹配数据元对(或一个数据元和 null 另一个输入的值)被赋予 JoinFunction 以将数据元对转换为单个数据元,或者转换为 FlatJoinFunction 以将数据元对转换为任意多个(包括无)数据元。请参阅 键部分 以了解如何定义连接键。

input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins
    .where(0)        // key of the first input (tuple field 0)
    .equalTo(1)      // key of the second input (tuple field 1)
    .with(new JoinFunction&lt;String, String, String&gt;() {
      public String join(String v1, String v2) {
       // NOTE:
       // - v2 might be null for leftOuterJoin
       // - v1 might be null for rightOuterJoin
       // - v1 OR v2 might be null for fullOuterJoin
      }
    });

转换: CoGroup

描述:reduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。请参阅 keys 部分 以了解如何定义 coGroup 键。

data1.coGroup(data2)
   .where(0)
   .equalTo(1)
   .with(new CoGroupFunction&lt;String, String, String&gt;() {
     public void coGroup(Iterable&lt;String&gt; in1, Iterable&lt;String&gt; in2, Collector&lt;String&gt; out) {
       out.collect(...);
     }
    });

转换: Cross

描述:构建两个输入的笛卡尔积(交叉乘积),创建所有数据元对。可选择使用 CrossFunction 将数据元对转换为单个数据元

DataSet&lt;Integer&gt; data1 = // [...]
DataSet&lt;String&gt; data2 = // [...]
DataSet&lt;Tuple2&lt;Integer, String&gt;&gt; result = data1.cross(data2);

注:交叉是一个潜在的 非常 计算密集型 算子操作它甚至可以挑战大的计算集群!建议使用 crossWithTiny() 和 crossWithHuge() 来提示系统的 DataSet 大小。


转换: Union

描述:生成两个数据集的并集。

DataSet&lt;String&gt; data1 = // [...]
DataSet&lt;String&gt; data2 = // [...]
DataSet&lt;String&gt; result = data1.union(data2);

转换: Rebalance

描述:均匀地 Rebalance 数据集的并行分区以消除数据偏差。只有类似 Map 的转换可能会遵循 Rebalance 转换。

DataSet&lt;String&gt; in = // [...]
DataSet&lt;String&gt; result = in.rebalance()
               .map(new Mapper());

转换: Hash-Partition

描述:散列分区给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。

DataSet&lt;Tuple2&lt;String,Integer&gt;&gt; in = // [...]
DataSet&lt;Integer&gt; result = in.partitionByHash(0)
              .mapPartition(new PartitionMapper());

转换: Range-Partition

描述:Range-Partition 给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。

DataSet&lt;Tuple2&lt;String,Integer&gt;&gt; in = // [...]
DataSet&lt;Integer&gt; result = in.partitionByRange(0)
              .mapPartition(new PartitionMapper());

转换: Custom Partitioning

描述:手动指定数据分区。 注意 :此方法仅适用于单个字段键。

DataSet&lt;Tuple2&lt;String,Integer&gt;&gt; in = // [...]
DataSet&lt;Integer&gt; result = in.partitionCustom(Partitioner&lt;K&gt; partitioner, key)

转换: Sort Partition

描述:本地按指定顺序对指定字段上的数据集的所有分区进行排序。可以将字段指定为元组位置或字段表达式。通过链接 sortPartition()调用来完成对多个字段的排序。

DataSet&lt;Tuple2&lt;String,Integer&gt;&gt; in = // [...]
DataSet&lt;Integer&gt; result = in.sortPartition(1, Order.ASCENDING)
              .mapPartition(new PartitionMapper());

转换: First-n

描述:返回数据集的前 n 个(任意)数据元。First-n 可以应用于常规数据集,分组数据集或分组排序数据集。分组键可以指定为键选择器函数或字段位置键。

DataSet&lt;Tuple2&lt;String,Integer&gt;&gt; in = // [...]
// regular data set
DataSet&lt;Tuple2&lt;String,Integer&gt;&gt; result1 = in.first(3);
// grouped data set
DataSet&lt;Tuple2&lt;String,Integer&gt;&gt; result2 = in.groupBy(0)
                      .first(3);
// grouped-sorted data set
DataSet&lt;Tuple2&lt;String,Integer&gt;&gt; result3 = in.groupBy(0)
                      .sortGroup(1, Order.ASCENDING)
                      .first(3);

以下转换可用于元组的数据集:


转换: project

描述:从元组中选择字段的子集

DataSet&lt;Tuple3&lt;Integer, Double, String&gt;&gt; in = // [...]
DataSet&lt;Tuple2&lt;String, Integer&gt;&gt; out = in.project(2,0);

转换: MinBy / MaxBy

描述:从一组元组中选择一个元组,其元组的一个或多个字段的值最小(最大)。用于比较的字段必须是有效的关键字段,即可比较。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。MinBy(MaxBy)可以应用于完整数据集或分组数据集。

DataSet&lt;Tuple3&lt;Integer, Double, String&gt;&gt; in = // [...]
// a DataSet with a single tuple with minimum values for the Integer and String fields.
DataSet&lt;Tuple3&lt;Integer, Double, String&gt;&gt; out = in.minBy(0, 2);
// a DataSet with one tuple for each group with the minimum value for the Double field.
DataSet&lt;Tuple3&lt;Integer, Double, String&gt;&gt; out2 = in.groupBy(2)
                          .minBy(1);

转换: Map

描述:采用一个元素并生成一个元素。

data.map { x =&gt; x.toInt }

转换: FlatMap

描述:采用一个元素并生成零个,一个或多个元素。

data.flatMap { str =&gt; str.split(" ") }

转换: MapPartition

描述:在单个函数调用中转换并行分区。该函数将分区作为“迭代器”,并可以生成任意数量的结果值。每个分区中的元素数量取决于并行度和先前的 算子操作。

data.mapPartition { in =&gt; in map { (_, 1) } }

转换: Filter

描述:计算每个元素的布尔函数,并保存函数返回 true 的元素。 重要信息: 系统假定该函数不会修改应用谓词的元素。违反此假设可能会导致错误的结果。

data.filter { _ &gt; 1000 }

转换: Reduce

描述:通过将两个元素重复组合成一个元素,将一组元素组合成一个元素。Reduce 可以应用于完整数据集或分组数据集。

data.reduce { _ + _ }

转换: ReduceGroup

描述:将一组元素组合成一个或多个元素。ReduceGroup 可以应用于完整数据集或分组数据集。

data.reduceGroup { elements =&gt; elements.sum }

转换: Aggregate

描述:将一组值聚合为单个值。聚合函数可以被认为是内置的 reduce 函数。聚合可以应用于完整数据集或分组数据集。

val input: DataSet[(Int, String, Double)] = // [...] val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2)

您还可以使用简写语法进行最小,最大和总和聚合。

val input: DataSet[(Int, String, Double)] = // [...] val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)

转换: Distinct

描述:返回数据集的不同元素。它相对于元素的所有字段或字段子集从输入 DataSet 中删除重复条目。

 data.distinct()

转换: Join

描述:通过创建在其键上相等的所有元素对来连接两个数据集。可选地使用 JoinFunction 将元素对转换为单个元素,或使用 FlatJoinFunction 将元素对转换为任意多个(包括无)元素。请参阅 键部分 以了解如何定义连接键。

// In this case tuple fields are used as keys. "0" is the join field on the first tuple
// "1" is the join field on the second tuple. val result = input1.join(input2).where(0).equalTo(1)

您可以通过 Join Hints 指定运行时执行连接的方式。提示描述了通过分区或广播进行连接,以及它是使用基于排序还是基于散列的算法。有关可能的提示和示例的列表,请参阅“ 转换指南” 。 如果未指定提示,系统将尝试估算输入大小,并根据这些估计选择最佳策略。

// This executes a join by broadcasting the first data set
// using a hash table for the broadcast data val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
           .where(0).equalTo(1)

请注意,连接转换仅适用于等连接。其他连接类型需要使用 OuterJoin 或 CoGroup 表示。


转换: OuterJoin

描述:在两个数据集上执行左,右或全外连接。外连接类似于常规(内部)连接,并创建在其键上相等的所有元素对。此外,如果在另一侧没有找到匹配的密钥,则保存“外部”侧(左侧,右侧或两者都满)的记录。匹配元素对(或一个元素和另一个输入的 null 值)被赋予 JoinFunction 以将元素对转换为单个元素,或者给予 FlatJoinFunction 以将元素对转换为任意多个(包括无)元素。请参阅 键部分 以了解如何定义连接键。

val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
   (left, right) =&gt;
   val a = if (left == null) "none" else left._1
   (a, right)
  }

转换: CoGroup

描述:reduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。请参阅 keys 部分 以了解如何定义 coGroup 键。

data1.coGroup(data2).where(0).equalTo(1)

转换: Cross

描述:构建两个输入的笛卡尔积(交叉乘积),创建所有元素对。可选择使用 CrossFunction 将元素对转换为单个元素

val data1: DataSet[Int] = // [...] val data2: DataSet[String] = // [...] val result: DataSet[(Int, String)] = data1.cross(data2)

注:交叉是一个潜在的 非常 计算密集型 算子操作它甚至可以挑战大的计算集群!建议使用 crossWithTiny() 和 crossWithHuge() 来提示系统的 DataSet 大小。


转换: Union

描述:生成两个数据集的并集。

data.union(data2)

转换: Rebalance

描述:均匀地 Rebalance 数据集的并行分区以消除数据偏差。只有类似 Map 的转换可能会遵循 Rebalance 转换。

val data1: DataSet[Int] = // [...] val result: DataSet[(Int, String)] = data1.rebalance().map(...)

转换: Hash-Partition

描述:散列分区给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。

val in: DataSet[(Int, String)] = // [...] val result = in.partitionByHash(0).mapPartition { ... }

转换: Range-Partition

描述:Range-Partition 给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。

val in: DataSet[(Int, String)] = // [...] val result = in.partitionByRange(0).mapPartition { ... }

转换: Custom Partitioning

描述:手动指定数据分区。 注意 :此方法仅适用于单个字段键。

val in: DataSet[(Int, String)] = // [...] val result = in
  .partitionCustom(partitioner: Partitioner[K], key)

转换: Sort Partition

描述:本地按指定顺序对指定字段上的数据集的所有分区进行排序。可以将字段指定为元组位置或字段表达式。通过链接 sortPartition()调用来完成对多个字段的排序。

val in: DataSet[(Int, String)] = // [...] val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }

转换: First-n

描述:返回数据集的前 n 个(任意)元素。First-n 可以应用于常规数据集,分组数据集或分组排序数据集。可以将分组键指定为键选择器函数,元组位置或案例类字段。

val in: DataSet[(Int, String)] = // [...]
// regular data set val result1 = in.first(3)
// grouped data set val result2 = in.groupBy(0).first(3)
// grouped-sorted data set val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)

以下转换可用于元组的数据集:


转换: MinBy / MaxBy

描述:从一组元组中选择一个元组,其元组的一个或多个字段的值最小(最大)。用于比较的字段必须是有效的关键字段,即可比较。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。MinBy(MaxBy)可以应用于完整数据集或分组数据集。

val in: DataSet[(Int, Double, String)] = // [...]
// a data set with a single tuple with minimum values for the Int and String fields.
val out: DataSet[(Int, Double, String)] = in.minBy(0, 2)
// a data set with one tuple for each group with the minimum value for the Double field.
val out2: DataSet[(Int, Double, String)] = in.groupBy(2)
                       .minBy(1)

通过匿名模式匹配从元组,案例类和集合中提取,如下所示:

val data: DataSet[(Int, String, Double)] = // [...] data.map {
  case (id, name, temperature) => // [...] }

API 开箱即用不支持。要使用此函数,您应该使用 Scala API 扩展 。

并行 转换的可以定义为 setParallelism(int) 同时 name(String) 指定一个自定义名称的转变这对于调试很有帮助。这同样是可能的 数据源数据接收器

withParameters(Configuration) 传递配置对象,可以从 open() 用户函数内的方法访问。

数据源

数据源创建初始数据集,例如来自文件或 Java 集合。创建数据集的一般机制是在 InputFormat 后面抽象的 。Flink 附带了几种内置格式,可以从通用文件格式创建数据集。他们中的许多人在 ExecutionEnvironment 上都有快捷方法。

基于文件的:

  • readTextFile(path) / TextInputFormat - 按行读取文件并将其作为字符串返回。
  • readTextFileWithValue(path) / TextValueInputFormat - 按行读取文件并将它们作为 StringValues 返回。StringValues 是可变字符串。
  • readCsvFile(path) / CsvInputFormat - 解析逗号(或其他字符)分隔字段的文件。返回元组或 POJO 的 DataSet。支持基本 java 类型及其 Value 对应作为字段类型。
  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat - 解析新行(或其他字符序列)分隔的原始数据类型(如 String 或)的文件 Integer
  • readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat - 解析新行(或其他字符序列)分隔的原始数据类型的文件,例如 StringInteger 使用给定的分隔符。
  • readSequenceFile(Key, Value, path) / SequenceFileInputFormat - 创建一个 JobConf 并从类型为 SequenceFileInputFormat,Key class 和 Value 类的指定路径中读取文件,并将它们作为 Tuple2 <Key,Value>返回。

基于集合:

  • fromCollection(Collection) - 从 Java Java.util.Collection 创建数据集。集合中的所有数据元必须属于同一类型。
  • fromCollection(Iterator, Class) - 从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。
  • fromElements(T ...) - 根据给定的对象序列创建数据集。所有对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator, Class) - 并行地从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。
  • generateSequence(from, to) - 并行生成给定间隔中的数字序列。

通用:

  • readFile(inputFormat, path) / FileInputFormat - 接受文件输入格式。
  • createInput(inputFormat) / InputFormat - 接受通用输入格式。

例子

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");

// read text file from a HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");

// read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
               .types(Integer.class, String.class, Double.class);

// read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                 .includeFields("10010")  // take the first and the fourth field
               .types(String.class, Double.class);

// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
             .pojoType(Person.class, "name", "age", "zipcode");

// read a file from the specified path of type SequenceFileInputFormat
DataSet<Tuple2<IntWritable, Text>> tuples =
 env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");

// creates a set from some given elements
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");

// generate a number sequence
DataSet<Long> numbers = env.generateSequence(1, 10000000);

// Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
  env.createInput(
    JDBCInputFormat.buildJDBCInputFormat()
           .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
           .setDBUrl("jdbc:derby:memory:persons")
           .setQuery("select name, age from persons")
           .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
           .finish()
  );

// Note: Flink's program compiler needs to infer the data types of the data items which are returned
// by an InputFormat. If this information cannot be automatically inferred, it is necessary to
// manually provide the type information as shown in the examples above.

配置 CSV 分析

Flink 为 CSV 解析提供了许多配置选项:

  • types(Class ... types) 指定要解析的字段的类型。 必须配置已解析字段的类型。 在类型为 Boolean.class 的情况下,“True”(不区分大小写),“False”(不区分大小写),“1”和“0”被视为布尔值。
  • lineDelimiter(String del) 指定单个记录的分隔符。默认行分隔符是换行符 '\n'
  • fieldDelimiter(String del) 指定用于分隔记录字段的分隔符。默认字段分隔符是逗号字符 ','
  • includeFields(boolean ... flag)includeFields(String mask)includeFields(long bitMask) 定义从输入文件中读取哪些字段(以及要忽略的字段)。默认情况下,将解析前 n 个 字段(由 types() 调用中的类型数定义)。
  • parseQuotedStrings(char quoteChar) 启用带引号的字符串解析。如果字符串字段的第一个字符是引号字符(前导或拖尾空格 未被 修剪),则字符串将被解析为带引号的字符串。引用字符串中的字段分隔符将被忽略。如果带引号的字符串字段的最后一个字符不是引号字符,或者引号字符出现在某个不是引用字符串字段的开头或结尾的点上(除非引号字符使用''转义,否则引用字符串解析失败)。如果启用了带引号的字符串解析并且该字段的第一个字符 不是 引用字符串,则该字符串将被解析为不带引号的字符串。默认情况下,禁用带引号的字符串解析。
  • ignoreComments(String commentPrefix) 指定注释前缀。所有以指定注释前缀开头的行都不会被解析和忽略。默认情况下,不会忽略任何行。
  • ignoreInvalidLines() 启用宽松解析,即忽略无法正确解析的行。默认情况下,禁用宽松解析,无效行引发异常。
  • ignoreFirstLine() 配置 InputFormat 以忽略输入文件的第一行。默认情况下,不会忽略任何行。

递归遍历输入路径目录

对于基于文件的输入,当输入路径是目录时,默认情况下不会枚举嵌套文件。相反,只读取基目录中的文件,而忽略嵌套文件。可以通过 recursive.file.enumeration 配置参数启用嵌套文件的递归枚举,如下例所示。

// enable recursive enumeration of nested input files
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// create a configuration object
Configuration parameters = new Configuration();

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true);

// pass the configuration to the data source
DataSet<String> logs = env.readTextFile("file:///path/with.nested/files")
        .withParameters(parameters);

数据源创建初始数据集,例如来自文件或 Java 集合。创建数据集的一般机制是在 InputFormat 后面抽象的 。Flink 附带了几种内置格式,可以从通用文件格式创建数据集。他们中的许多人在 ExecutionEnvironment 上都有快捷方法。

基于文件的:

  • readTextFile(path) / TextInputFormat - 按行读取文件并将其作为字符串返回。
  • readTextFileWithValue(path) / TextValueInputFormat - 按行读取文件并将它们作为 StringValues 返回。StringValues 是可变字符串。
  • readCsvFile(path) / CsvInputFormat - 解析逗号(或其他字符)分隔字段的文件。返回元组,案例类对象或 POJO 的 DataSet。支持基本 java 类型及其 Value 对应作为字段类型。
  • readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat - 解析新行(或其他字符序列)分隔的原始数据类型的文件,例如 StringInteger 使用给定的分隔符。
  • readSequenceFile(Key, Value, path) / SequenceFileInputFormat - 创建一个 JobConf 并从类型为 SequenceFileInputFormat,Key class 和 Value 类的指定路径中读取文件,并将它们作为 Tuple2 <Key,Value>返回。

基于集合:

  • fromCollection(Seq) - 从 Seq 创建数据集。集合中的所有元素必须属于同一类型。
  • fromCollection(Iterator) - 从迭代器创建数据集。该类指定迭代器返回的元素的数据类型。
  • fromElements(elements: _*) - 根据给定的对象序列创建数据集。所有对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator) - 并行地从迭代器创建数据集。该类指定迭代器返回的元素的数据类型。
  • generateSequence(from, to) - 并行生成给定间隔中的数字序列。

通用:

  • readFile(inputFormat, path) / FileInputFormat - 接受文件输入格式。
  • createInput(inputFormat) / InputFormat - 接受通用输入格式。

例子

val env  = ExecutionEnvironment.getExecutionEnvironment

// read text file from local files system val localLines = env.readTextFile("file:///path/to/my/textfile")

// read text file from a HDFS running at nnHost:nnPort val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")

// read a CSV file with three fields val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")

// read a CSV file with five fields, taking only two of them val csvInput = env.readCsvFile[(String, Double)](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field 
// CSV input can also be used with Case Classes case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field 
// read a CSV file with three fields into a POJO (Person) with corresponding fields val csvInput = env.readCsvFile[Person](
  "hdfs:///the/CSV/file",
  pojoFields = Array("name", "age", "zipcode"))

// create a set from some given elements val values = env.fromElements("Foo", "bar", "foobar", "fubar")

// generate a number sequence val numbers = env.generateSequence(1, 10000000)

// read a file from the specified path of type SequenceFileInputFormat val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
 "hdfs://nnHost:nnPort/path/to/file")

配置 CSV 分析

Flink 为 CSV 解析提供了许多配置选项:

  • lineDelimiter: String 指定单个记录的分隔符。默认行分隔符是换行符 '\n'
  • fieldDelimiter: String 指定用于分隔记录字段的分隔符。默认字段分隔符是逗号字符 ','
  • includeFields: Array[Int] 定义要从输入文件中读取的字段(以及要忽略的字段)。默认情况下,将解析前 n 个 字段(由 types() 调用中的类型数定义)。
  • pojoFields: Array[String] 指定映射到 CSV 字段的 POJO 的字段。CSV 字段的解析器将根据 POJO 字段的类型和顺序自动初始化。
  • parseQuotedStrings: Character 启用带引号的字符串解析。如果字符串字段的第一个字符是引号字符(前导或拖尾空格 未被 修剪),则字符串将被解析为带引号的字符串。引用字符串中的字段分隔符将被忽略。如果带引号的字符串字段的最后一个字符不是引号字符,则引用字符串解析将失败。如果启用了带引号的字符串解析并且该字段的第一个字符 不是 引用字符串,则该字符串将被解析为不带引号的字符串。默认情况下,禁用带引号的字符串解析。
  • ignoreComments: String 指定注释前缀。所有以指定注释前缀开头的行都不会被解析和忽略。默认情况下,不会忽略任何行。
  • lenient: Boolean 启用宽松解析,即忽略无法正确解析的行。默认情况下,禁用宽松解析,无效行引发异常。
  • ignoreFirstLine: Boolean 配置 InputFormat 以忽略输入文件的第一行。默认情况下,不会忽略任何行。

递归遍历输入路径目录

对于基于文件的输入,当输入路径是目录时,默认情况下不会枚举嵌套文件。相反,只读取基目录中的文件,而忽略嵌套文件。可以通过 recursive.file.enumeration 配置参数启用嵌套文件的递归枚举,如下例所示。

// enable recursive enumeration of nested input files val env  = ExecutionEnvironment.getExecutionEnvironment

// create a configuration object val parameters = new Configuration

// set the recursive enumeration parameter parameters.setBoolean("recursive.file.enumeration", true)

// pass the configuration to the data source env.readTextFile("file:///path/with.nested/files").withParameters(parameters)

读压缩文件

Flink 目前支持输入文件的透明解压缩,如果它们标有适当的文件扩展名。特别是,这意味着不需要进一步配置输入格式,并且任何 FileInputFormat 支持压缩,包括自定义输入格式。请注意,压缩文件可能无法并行读取,从而影响作业可伸缩性。

下表列出了当前支持的压缩方法。


压缩方法: DEFLATE

文件扩展名: .deflate

可并行:no / not


压缩方法: GZip

文件扩展名: .gz.gzip

可并行:no / not


压缩方法: Bzip2

文件扩展名: .bz2

可并行:no / not


压缩方法: XZ

文件扩展名: .xz

可并行:no / not

数据接收

数据接收器使用 DataSet 并用于存储或返回它们。使用 OutputFormat 描述数据接收器 算子操作 。Flink 带有各种内置输出格式,这些格式封装在 DataSet 上的 算子操作后面:

  • writeAsText() / TextOutputFormat - 按字符串顺序写入数据元。通过调用每个数据元的 toString() 方法获得字符串。
  • writeAsFormattedText() / TextOutputFormat - 按字符串顺序写数据元。通过为每个数据元调用用户定义的 format() 方法来获取字符串。
  • writeAsCsv(...) / CsvOutputFormat - 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的 toString() 方法。
  • print() / printToErr() / print(String msg) / printToErr(String msg) - 在标准输出/标准错误流上打印每个数据元的 toString() 值。可选地,可以提供前缀(msg),其前缀为输出。这有助于区分不同的 打印 调用。如果并行度大于 1,则输出也将与生成输出的任务的标识符一起添加。
  • write() / FileOutputFormat - 自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • output() / OutputFormat - 大多数通用输出方法,用于非基于文件的数据接收器(例如将结果存储在数据库中)。

可以将 DataSet 输入到多个 算子操作。程序可以编写或打印数据集,同时对它们执行其他转换。

例子

标准数据接收方法:

// text data
DataSet<String> textData = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");

// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
  new TextFormatter<Tuple2<Integer, Integer>>() {
    public String format (Tuple2<Integer, Integer> value) {
      return value.f1 + " - " + value.f0;
    }
  });

使用自定义输出格式:

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// write Tuple DataSet to a relational database
myResult.output(
  // build and configure OutputFormat
  JDBCOutputFormat.buildJDBCOutputFormat()
          .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
          .setDBUrl("jdbc:derby:memory:persons")
          .setQuery("insert into persons (name, age, height) values (?,?,?)")
          .finish()
  );

本地排序输出

可以使用 元组字段位置字段表达式 以指定顺序在指定字段上对数据接收器的输出进行本地排序。这适用于每种输出格式。

以下示例显示如何使用此函数:

DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print();

// sort output on Double field in descending and Integer field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print();

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...);

// sort output on the full tuple in ascending order
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...);

// sort atomic type (String) output in descending order
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);

尚不支持全局排序的输出。

数据接收器使用 DataSet 并用于存储或返回它们。使用 OutputFormat 描述数据接收器 算子操作 。Flink 带有各种内置输出格式,这些格式封装在 DataSet 上的 算子操作后面:

  • writeAsText() / TextOutputFormat - 按字符串顺序写入元素。通过调用每个元素的 toString() 方法获得字符串。
  • writeAsCsv(...) / CsvOutputFormat - 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的 toString() 方法。
  • print() / printToErr() - 在标准输出/标准错误流上打印每个元素的 toString() 值。
  • write() / FileOutputFormat - 自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • output() / OutputFormat - 大多数通用输出方法,用于非基于文件的数据接收器(例如将结果存储在数据库中)。

可以将 DataSet 输入到多个 算子操作。程序可以编写或打印数据集,同时对它们执行其他转换。

例子

标准数据接收方法:

// text data val textData: DataSet[String] = // [...] 
// write DataSet to a file on the local file system textData.writeAsText("file:///my/result/on/localFS")

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")

// write DataSet to a file and overwrite the file if it exists textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)

// tuples as lines with pipe as the separator "a|b|c" val values: DataSet[(String, Int, Double)] = // [...] values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines values.writeAsText("file:///path/to/the/result/file")

// this writes values as strings using a user-defined formatting values map { tuple => tuple._1 + " - " + tuple._2 }
  .writeAsText("file:///path/to/the/result/file")

本地排序输出

可以使用 元组字段位置字段表达式 以指定顺序在指定字段上对数据接收器的输出进行本地排序。这适用于每种输出格式。

以下示例显示如何使用此函数:

val tData: DataSet[(Int, String, Double)] = // [...] val pData: DataSet[(BookPojo, Double)] = // [...] val sData: DataSet[String] = // [...] 
// sort output on String field in ascending order tData.sortPartition(1, Order.ASCENDING).print()

// sort output on Double field in descending and Int field in ascending order tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print()

// sort output on the "author" field of nested BookPojo in descending order pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...)

// sort output on the full tuple in ascending order tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...)

// sort atomic type (String) output in descending order sData.sortPartition("_", Order.DESCENDING).writeAsText(...)

尚不支持全局排序的输出。

迭代 算子

迭代在 Flink 程序中实现循环。迭代 算子封装程序的一部分并重复执行,将一次迭代的结果(部分解)反馈到下一次迭代中。Flink 中有两种类型的迭代: BulkIterationDeltaIteration

本节提供有关如何使用这两个 算子的快速示例。查看“ 迭代简介” 页面以获取更详细的介绍。

批量迭代

要创建 BulkIteration,请调用 iterate(int) 迭代的 DataSet 方法。这将返回一个 IterativeDataSet ,可以使用常规 算子进行转换。迭代调用的单个参数指定最大迭代次数。

要指定迭代的结束,请调用 closeWith(DataSet) 方法 IterativeDataSet 以指定应将哪个转换反馈到下一次迭代。 closeWith(DataSet, DataSet) 如果此 DataSet 为空,您可以选择指定终止条件,该条件评估第二个 DataSet 并终止迭代。如果未指定终止条件,则迭代将在给定的最大数量迭代后终止。

以下示例迭代地估计数量 Pi。目标是计算落入单位圆的随机点数。在每次迭代中,挑选一个随机点。如果此点位于单位圆内,我们会增加计数。然后估计 Pi 作为结果计数除以迭代次数乘以 4。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Create initial IterativeDataSet
IterativeDataSet<Integer> initial = env.fromElements(0).iterate(10000);

DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
  @Override
  public Integer map(Integer i) throws Exception {
    double x = Math.random();
    double y = Math.random();

    return i + ((x * x + y * y < 1) ? 1 : 0);
  }
});

// Iteratively transform the IterativeDataSet
DataSet<Integer> count = initial.closeWith(iteration);

count.map(new MapFunction<Integer, Double>() {
  @Override
  public Double map(Integer count) throws Exception {
    return count / (double) 10000 * 4;
  }
}).print();

env.execute("Iterative Pi Example");

您还可以查看 K-Means 示例 ,该 示例 使用 BulkIteration 来聚类一组未标记的点。

Delta 迭代

Delta 迭代利用了某些算法在每次迭代中不会更改解决方案的每个数据点的事实。

除了在每次迭代中反馈的部分解决方案(称为工作集)之外,delta 迭代还在迭代中维护状态(称为解决方案集),可以通过增量更新。迭代计算的结果是最后一次迭代之后的状态。有关 delta 迭代的基本原理的概述,请参阅 迭代简介 。

定义 DeltaIteration 类似于定义 BulkIteration。对于 delta 迭代,两个数据集构成每次迭代的输入(工作集和解决方案集),并且在每次迭代中生成两个数据集作为结果(新工作集,解决方案集 delta)。

创建 DeltaIteration 调用 iterateDelta(DataSet, int, int) (或 iterateDelta(DataSet, int, int[]) 分别)。在初始解决方案集上调用此方法。参数是初始增量集,最大迭代次数和关键位置。返回的 DeltaIteration 对象使您可以通过方法 iteration.getWorkset() 和方式访问表示工作集和解决方案集的 DataSet iteration.getSolutionSet()

下面是 delta 迭代语法的示例

// read the initial data sets
DataSet<Tuple2<Long, Double>> initialSolutionSet = // [...]

DataSet<Tuple2<Long, Double>> initialDeltaSet = // [...]

int maxIterations = 100;
int keyPosition = 0;

DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration = initialSolutionSet
  .iterateDelta(initialDeltaSet, maxIterations, keyPosition);

DataSet<Tuple2<Long, Double>> candidateUpdates = iteration.getWorkset()
  .groupBy(1)
  .reduceGroup(new ComputeCandidateChanges());

DataSet<Tuple2<Long, Double>> deltas = candidateUpdates
  .join(iteration.getSolutionSet())
  .where(0)
  .equalTo(0)
  .with(new CompareChangesToCurrent());

DataSet<Tuple2<Long, Double>> nextWorkset = deltas
  .filter(new FilterByThreshold());

iteration.closeWith(deltas, nextWorkset)
  .writeAsCsv(outputPath);

批量迭代

要创建 BulkIteration,请调用 iterate(int) 迭代的 DataSet 方法,并指定步进函数。step 函数获取当前迭代的输入 DataSet,并且必须返回一个新的 DataSet。迭代调用的参数是停止之后的最大迭代次数。

还有一个 iterateWithTermination(int) 函数接受一个返回两个 DataSet 的步骤函数:迭代步骤的结果和终止条件。一旦终止标准 DataSet 为空,就停止迭代。

以下示例迭代地估计数量 Pi。目标是计算落入单位圆的随机点数。在每次迭代中,挑选一个随机点。如果此点位于单位圆内,我们会增加计数。然后估计 Pi 作为结果计数除以迭代次数乘以 4。

val env = ExecutionEnvironment.getExecutionEnvironment()

// Create initial DataSet val initial = env.fromElements(0)

val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
  val result = iterationInput.map { i =>
  val x = Math.random()
  val y = Math.random()
  i + (if (x * x + y * y < 1) 1 else 0)
  }
  result
}

val result = count map { c => c / 10000.0 * 4 }

result.print()

env.execute("Iterative Pi Example")

您还可以查看 K-Means 示例 ,该 示例 使用 BulkIteration 来聚类一组未标记的点。

Delta 迭代

Delta 迭代利用了某些算法在每次迭代中不会更改解决方案的每个数据点的事实。

除了在每次迭代中反馈的部分解决方案(称为工作集)之外,delta 迭代还在迭代中维护状态(称为解决方案集),可以通过增量更新。迭代计算的结果是最后一次迭代之后的状态。有关 delta 迭代的基本原理的概述,请参阅 迭代简介 。

定义 DeltaIteration 类似于定义 BulkIteration。对于 delta 迭代,两个数据集构成每次迭代的输入(工作集和解决方案集),并且在每次迭代中生成两个数据集作为结果(新工作集,解决方案集 delta)。

要创建 DeltaIteration,请 iterateDelta(initialWorkset, maxIterations, key) 在初始解决方案集上调用。step 函数有两个参数:(solutionSet,workset),并且必须返回两个值:(solutionSetDelta,newWorkset)。

下面是 delta 迭代语法的示例

// read the initial data sets val initialSolutionSet: DataSet[(Long, Double)] = // [...] 
val initialWorkset: DataSet[(Long, Double)] = // [...] 
val maxIterations = 100
val keyPosition = 0

val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
  (solution, workset) =>
  val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
  val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())

  val nextWorkset = deltas.filter(new FilterByThreshold())

  (deltas, nextWorkset)
}

result.writeAsCsv(outputPath)

env.execute()

在函数中 算子操作数据对象

Flink 的运行时以 Java 对象的形式与用户函数交换数据。函数从运行时接收输入对象作为方法参数,并返回输出对象作为结果。由于这些对象是由用户函数和运行时代码访问的,因此理解并遵循有关用户代码如何访问(即读取和修改)这些对象的规则非常重要。

用户函数从 Flink 的运行时接收对象,作为常规方法参数(如 a MapFunction )或通过 Iterable 参数(如 a GroupReduceFunction )。我们将运行时传递给用户函数的 对象 称为 输入对象 。用户函数可以将对象作为方法返回值(如 a MapFunction )或通过 a Collector (如 a FlatMapFunction )发送到 Flink 运行时。我们将用户函数发出的 对象 称为 输出对象 。

Flink 的 DataSet API 具有两种模式,这些模式在 Flink 的运行时创建或重用输入对象方面有所不同。此行为会影响用户函数如何与输入和输出对象进行交互的保证和约束。以下部分定义了这些规则,并给出了编写安全用户函数代码的编码指南。

禁用对象重用(DEFAULT)

默认情况下,Flink 在禁用对象重用模式下运行。此模式可确保函数始终在函数调用中接收新的输入对象。禁用对象重用模式可提供更好的保证,并且使用起来更安全。但是,它带来了一定的处理开销,可能会导致更高的 Java 垃圾回收活动。下表说明了用户函数如何在禁用对象重用模式下访问输入和输出对象。


操作: 读取输入对象

保证和限制:在方法调用中,保证输入对象的值不会改变。这包括由 Iterable 提供的对象。例如,收集由 List 或 Map 中的 Iterable 提供的输入对象是安全的。请注意,在保存方法调用后,可以修改对象。在函数调用中记住对象是 不安全的


操作: 修改输入对象

保证和限制:您可以修改输入对象。


操作: 发射输入对象

保证和限制:您可以发出输入对象。输入对象的值在发出后可能已更改。在输出对象后,读取它是 不安全的


操作: 读取输出对象

保证和限制:提供给收集器或作为方法结果返回的对象可能已更改其值。读取输出对象是 不安全的


操作: 修改输出对象

保证和限制:您可以在发射对象后对其进行修改并再次发射。

禁用对象重用(默认)模式的编码指南:

  • 不记得并跨方法调用读取输入对象。
  • 发射后不要读取对象。

对象重用已启用

在对象重用启用模式下,Flink 的运行时最小化对象实例化的数量。这可以提高性能并可以 ReduceJava 垃圾收集压力。通过调用激活对象重用启用模式 ExecutionConfig.enableObjectReuse() 。下表说明了用户函数如何在对象重用启用模式下访问输入和输出对象。


操作: 读取作为常规方法参数接收的输入对象

保证和限制:在常规方法参数中接收的输入对象不会在函数调用中修改。在离开方法调用后,可以修改对象。在函数调用中记住对象是 不安全的


操作: 读取从 Iterable 参数接收的输入对象

保证和限制:从 Iterable 接收的输入对象仅在调用 next()方法之前有效。Iterable 或 Iterator 可以多次为同一个对象实例提供服务。记住从 Iterable 接收的输入对象是 不安全 的,例如,将它们放在 List 或 Map 中。


操作: 修改输入对象

保证和限制:除了 MapFunction,FlatMapFunction,MapPartitionFunction,GroupReduceFunction,GroupCombineFunction,CoGroupFunction 和 InputFormat.next(重用)的输入对象外,您 不能 修改输入对象。


操作: 发射输入对象

保证和限制:除了 MapFunction,FlatMapFunction,MapPartitionFunction,GroupReduceFunction,GroupCombineFunction,CoGroupFunction 和 InputFormat.next(重用)的输入对象外, 您 不能 发出输入对象。


操作: 读取输出对象

保证和限制:提供给收集器或作为方法结果返回的对象可能已更改其值。读取输出对象是 不安全的


操作: 修改输出对象

保证和限制:您可以修改输出对象并再次发出。

启用对象重用的编码指南:

  • 不记得从中收到的输入对象 Iterable
  • 不记得并跨方法调用读取输入对象。
  • 不要修改或发出输入对象,除了输入对象 MapFunctionFlatMapFunctionMapPartitionFunctionGroupReduceFunctionGroupCombineFunctionCoGroupFunction ,和 InputFormat.next(reuse)
  • 要 Reduce 对象实例化,您始终可以发出重复修改但从不读取的专用输出对象。

调试

在对分布式集群中的大型数据集运行数据分析程序之前,最好确保实现的算法按预期工作。因此,实施数据分析程序通常是检查结果,调试和改进的增量过程。

Flink 提供了一些很好的函数,通过支持 IDE 内的本地调试,测试数据的注入和结果数据的收集,显着简化了数据分析程序的开发过程。本节提供了一些如何简化 Flink 程序开发的提示。

本地运行环境

A LocalEnvironment 在创建它的同一 JVM 进程中启动 Flink 系统。如果从 IDE 启动 LocalEnvironment,则可以在代码中设置断点并轻松调试程序。

创建 LocalEnvironment 并使用如下:

final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

DataSet<String> lines = env.readTextFile(pathToTextFile);
// build your program

env.execute();
val env = ExecutionEnvironment.createLocalEnvironment()

val lines = env.readTextFile(pathToTextFile)
// build your program 
env.execute()

收集数据源和接收器

通过创建输入文件和读取输出文件来完成分析程序的输入并检查其输出是很麻烦的。Flink 具有特殊的数据源和接收器,由 Java 集合支持以简化测试。一旦程序经过测试,源和接收器可以很容易地被读取/写入外部数据存储(如 HDFS)的源和接收器替换。

集合数据源可以使用如下:

final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

// Create a DataSet from a list of elements
DataSet<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataSet from any Java collection
List<Tuple2<String, Integer>> data = ...
DataSet<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataSet from an Iterator
Iterator<Long> longIt = ...
DataSet<Long> myLongs = env.fromCollection(longIt, Long.class);

集合数据接收器指定如下:

DataSet<Tuple2<String, Integer>> myResult = ...

List<Tuple2<String, Integer>> outData = new ArrayList<Tuple2<String, Integer>>();
myResult.output(new LocalCollectionOutputFormat(outData));

注意: 目前,集合数据接收器仅限于本地执行,作为调试工具。

val env = ExecutionEnvironment.createLocalEnvironment()

// Create a DataSet from a list of elements val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataSet from any Collection val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)

// Create a DataSet from an Iterator val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

注意: 目前,集合数据源要求实现数据类型和迭代器 Serializable 。此外,收集数据源不能并行执行(并行度= 1)。

语义注释

语义注释可用于提供有关函数行为的 Flink 提示。它们告诉系统函数读取和评估函数输入的哪些字段以及未修改的字段从其输入转发到其输出。语义注释是加速执行的有力手段,因为它们允许系统推断在多个 算子操作中重用排序顺序或分区。使用语义注释最终可以使程序免于不必要的数据混洗或不必要的排序,并显着提高程序的性能。

注意: 语义注释的使用是可选的。但是,提供语义注释时保守是绝对至关重要的!不正确的语义注释会导致 Flink 对您的程序做出错误的假设,并最终可能导致错误的结果。如果算子的行为不明确可预测,则不应提供注释。请仔细阅读文档。

目前支持以下语义注释。

转发字段注释

转发字段信息声明输入字段,这些输入字段未被修改,由函数转发到相同位置或输出中的另一个位置。优化程序使用此信息来推断函数是否保存了数据属性(如排序或分区)。对于输入元件,诸如一组 算子操作的函数 GroupReduceGroupCombineCoGroup ,和 MapPartition ,被定义为转发字段的所有字段必须始终共同从相同的输入元件转发。由分组函数发出的每个数据元的转发字段可以源自函数输入组的不同数据元。

使用 字段表达式 指定字段转发信息。转发到输出中相同位置的字段可以按其位置指定。指定的位置必须对输入和输出数据类型有效,并且具有相同的类型。例如,String "f2" 声明 Java 输入元组的第三个字段始终等于输出元组中的第三个字段。

通过将输入中的源字段和输出中的目标字段指定为字段表达式来声明未修改的字段转发到输出中的另一个位置。String "f0-&gt;f2" 表示 Java 输入元组的第一个字段未更改,复制到 Java 输出元组的第三个字段。通配符表达式 * 可用于指代整个输入或输出类型,即 "f0-&gt;*" 表示函数的输出始终等于其 Java 输入元组的第一个字段。

可以在单个 String 中声明多个转发字段,方法是将它们用分号分隔为 "f0; f2-&gt;f1; f3-&gt;f2" 单独的字符串 "f0", "f2-&gt;f1", "f3-&gt;f2" 。指定转发字段时,不要求声明所有转发字段,但所有声明必须正确。

可以通过在函数类定义上附加 Java 注释或在调用 DataSet 上的函数后将它们作为 算子参数传递来声明转发的字段信息,如下所示。

函数类注释
  • @ForwardedFields 用于单输入函数,例如 Map 和 Reduce。
  • @ForwardedFieldsFirst 用于第一次输入具有两个输入的函数,例如 Join 和 CoGroup。
  • @ForwardedFieldsSecond 用于具有两个输入的函数的第二个输入,例如 Join 和 CoGroup。
算子参数
  • data.map(myMapFnc).withForwardedFields() 用于单输入函数,例如 Map 和 Reduce。
  • data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsFirst() 对于具有两个输入(例如 Join 和 CoGroup)的函数的第一个输入。
  • data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsSecond() 对于具有两个输入的函数的第二个输入,例如 Join 和 CoGroup。

请注意,无法覆盖由 算子参数指定为类注释的字段转发信息。

以下示例显示如何使用函数类注释声明转发的字段信息:

@ForwardedFields("f0->f2")
public class MyMap implements
        MapFunction<Tuple2<Integer, Integer>, Tuple3<String, Integer, Integer>> {
  @Override
  public Tuple3<String, Integer, Integer> map(Tuple2<Integer, Integer> val) {
  return new Tuple3<String, Integer, Integer>("foo", val.f1 / 2, val.f0);
  }
}
@ForwardedFields("_1->_3")
class MyMap extends MapFunction[(Int, Int), (String, Int, Int)]{
   def map(value: (Int, Int)): (String, Int, Int) = {
  return ("foo", value._2 / 2, value._1)
  }
}

非转发字段

非转发字段信息声明所有未保存在函数输出中相同位置的字段。所有其他字段的值都被视为保存在输出中的相同位置。因此,非转发字段信息与转发字段信息相反。对于分组方式算子,如非转场信息 GroupReduceGroupCombineCoGroup ,和 MapPartition 必须满足相同的要求转发的字段信息。

重要信息 :非转发字段信息的规范是可选的。但如果使用, 全部! 必须指定非转发字段,因为所有其他字段都被视为在适当位置转发。将转发字段声明为非转发是安全的。

非转发字段被指定为 字段表达式 列表。该列表可以作为单个字符串给出,字段表达式用分号分隔,也可以作为多个字符串。例如两者 "f1; f3""f1", "f3" 宣布一个 Java 元组的第二和第四场不保存到位等各个领域都在处保存。只能为具有相同输入和输出类型的函数指定非转发字段信息。

使用以下注释将未转发的字段信息指定为函数类注释:

  • @NonForwardedFields 用于单输入函数,例如 Map 和 Reduce。
  • @NonForwardedFieldsFirst 对于具有两个输入(例如 Join 和 CoGroup)的函数的第一个输入。
  • @NonForwardedFieldsSecond 对于具有两个输入的函数的第二个输入,例如 Join 和 CoGroup。

以下示例显示如何声明未转发的字段信息:

@NonForwardedFields("f1") // second field is not forwarded
public class MyMap implements
        MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
  @Override
  public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> val) {
  return new Tuple2<Integer, Integer>(val.f0, val.f1 / 2);
  }
}
@NonForwardedFields("_2") // second field is not forwarded class MyMap extends MapFunction[(Int, Int), (Int, Int)]{
  def map(value: (Int, Int)): (Int, Int) = {
  return (value._1, value._2 / 2)
  }
}

阅读字段

读取字段信息声明由函数访问和评估的所有字段,即函数用于计算其结果的所有字段。例如,在指定读取字段信息时,必须将在条件语句中计算或用于计算的字段标记为已读。只有未经修改的字段转发到输出而不评估它们的值或根本不被访问的字段不被认为是被读取的。

重要信息 :读取字段信息的规范是可选的。但如果使用, 全部! 必须指定读取字段。将非读取字段声明为读取是安全的。

读取字段被指定为 字段表达式 列表。该列表可以作为单个字符串给出,字段表达式用分号分隔,也可以作为多个字符串。例如同时 "f1; f3""f1", "f3" 声明一个 Java 元组的第二和第四场被读出并通过函数进行评价。

使用以下注释将读取字段信息指定为函数类注释:

  • @ReadFields 用于单输入函数,例如 Map 和 Reduce。
  • @ReadFieldsFirst 对于具有两个输入(例如 Join 和 CoGroup)的函数的第一个输入。
  • @ReadFieldsSecond 对于具有两个输入的函数的第二个输入,例如 Join 和 CoGroup。

以下示例显示如何声明读取字段信息:

@ReadFields("f0; f3") // f0 and f3 are read and evaluated by the function.
public class MyMap implements
        MapFunction<Tuple4<Integer, Integer, Integer, Integer>,
              Tuple2<Integer, Integer>> {
  @Override
  public Tuple2<Integer, Integer> map(Tuple4<Integer, Integer, Integer, Integer> val) {
  if(val.f0 == 42) {
    return new Tuple2<Integer, Integer>(val.f0, val.f1);
  } else {
    return new Tuple2<Integer, Integer>(val.f3+10, val.f1);
  }
  }
}
@ReadFields("_1; _4") // _1 and _4 are read and evaluated by the function. class MyMap extends MapFunction[(Int, Int, Int, Int), (Int, Int)]{
   def map(value: (Int, Int, Int, Int)): (Int, Int) = {
  if (value._1 == 42) {
    return (value._1, value._2)
  } else {
    return (value._4 + 10, value._2)
  }
  }
}

广播变量

除了常规的 算子操作输入之外,广播变量还允许您为 算子操作的所有并行实例提供数据集。这对于辅助数据集或与数据相关的参数化非常有用。然后,算子可以将数据集作为集合访问。

  • 广播 :广播集通过名称注册 withBroadcastSet(DataSet, String) ,和
  • 访问 :可通过 getRuntimeContext().getBroadcastVariable(String) 目标算子访问。
  • Java
  • Scala
// 1\. The DataSet to be broadcast
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
  @Override
  public void open(Configuration parameters) throws Exception {
    // 3\. Access the broadcast DataSet as a Collection
    Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
  }

  @Override
  public String map(String value) throws Exception {
    ...
  }
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2\. Broadcast the DataSet

broadcastSetName 注册和访问广播数据集时,请确保名称(在前面的示例中)匹配。有关完整的示例程序,请查看 K-Means 算法

// 1\. The DataSet to be broadcast val toBroadcast = env.fromElements(1, 2, 3)

val data = env.fromElements("a", "b")

data.map(new RichMapFunction[String, String]() {
  var broadcastSet: Traversable[String] = null

  override def open(config: Configuration): Unit = {
    // 3\. Access the broadcast DataSet as a Collection
    broadcastSet = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
  }

  def map(in: String): String = {
    ...
  }
}).withBroadcastSet(toBroadcast, "broadcastSetName") // 2. Broadcast the DataSet

Make sure that the names ( broadcastSetName in the previous example) match when registering and accessing broadcast data sets. For a complete example program, have a look at KMeans Algorithm .

注意 :由于广播变量的内容保存在每个节点的内存中,因此不应该变得太大。对于标量值之类的简单事物,您可以简单地将参数作为函数闭包的一部分,或者使用该 withParameters(...) 方法传递配置。

分布式缓存

Flink 提供了一个分布式缓存,类似于 Apache Hadoop,可以在本地访问用户函数的并行实例。此函数可用于共享包含静态外部数据的文件,如字典或机器学习的回归模型。

缓存的工作原理如下。程序在其作为缓存文件的特定名称下注册 本地或远程文件系统(如 HDFS 或 S3) 的文件或目录 ExecutionEnvironment 。执行程序时,Flink 会自动将文件或目录复制到所有工作程序的本地文件系统。用户函数可以查找指定名称下的文件或目录,并从 worker 的本地文件系统访问它。

分布式缓存使用如下:

注册中的文件或目录 ExecutionEnvironment

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

访问用户函数中的缓存文件或目录(此处为 a MapFunction )。该函数必须扩展 RichFunction 类,因为它需要访问 RuntimeContext

// extend a RichFunction to have access to the RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {

  @Override
  public void open(Configuration config) {

    // access cached file via RuntimeContext and DistributedCache
    File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
    // read the file (or navigate the directory)
    ...
  }

  @Override
  public Integer map(String value) throws Exception {
    // use content of cached file
    ...
  }
}

Register the file or directory in the ExecutionEnvironment .

val env = ExecutionEnvironment.getExecutionEnvironment

// register a file from HDFS env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...) env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute ...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()

Access the cached file in a user function (here a MapFunction ). The function must extend a RichFunction class because it needs access to the RuntimeContext .

// extend a RichFunction to have access to the RuntimeContext class MyMapper extends RichMapFunction[String, Int] {

  override def open(config: Configuration): Unit = {

  // access cached file via RuntimeContext and DistributedCache
  val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
  // read the file (or navigate the directory)
  ...
  }

  override def map(value: String): Int = {
  // use content of cached file
  ...
  }
}

将参数传递给函数

可以使用构造函数或 withParameters(Configuration) 方法将参数传递给函数。参数被序列化为函数对象的一部分并传送到所有并行任务实例。

还查看 有关如何将命令行参数传递给函数最佳实践指南

通过构造函数

DataSet<Integer> toFilter = env.fromElements(1, 2, 3);

toFilter.filter(new MyFilter(2));

private static class MyFilter implements FilterFunction<Integer> {

  private final int limit;

  public MyFilter(int limit) {
  this.limit = limit;
  }

  @Override
  public boolean filter(Integer value) throws Exception {
  return value > limit;
  }
}
val toFilter = env.fromElements(1, 2, 3)

toFilter.filter(new MyFilter(2))

class MyFilter(limit: Int) extends FilterFunction[Int] {
  override def filter(value: Int): Boolean = {
  value > limit
  }
}

通过 withParameters(Configuration)

此方法将 Configuration 对象作为参数,将其传递给 rich 函数open() 方法。Configuration 对象是从 String 键到不同值类型的 Map。

DataSet<Integer> toFilter = env.fromElements(1, 2, 3);

Configuration config = new Configuration();
config.setInteger("limit", 2);

toFilter.filter(new RichFilterFunction<Integer>() {
  private int limit;

  @Override
  public void open(Configuration parameters) throws Exception {
    limit = parameters.getInteger("limit", 0);
  }

  @Override
  public boolean filter(Integer value) throws Exception {
    return value > limit;
  }
}).withParameters(config);
val toFilter = env.fromElements(1, 2, 3)

val c = new Configuration()
c.setInteger("limit", 2)

toFilter.filter(new RichFilterFunction[Int]() {
  var limit = 0

  override def open(config: Configuration): Unit = {
    limit = config.getInteger("limit", 0)
  }

  def filter(in: Int): Boolean = {
    in > limit
  }
}).withParameters(c)

全局通过 ExecutionConfig

Flink 还允许将自定义配置值传递到 ExecutionConfig 环境的接口。由于执行配置可在所有(丰富)用户函数中访问,因此自定义配置将在所有函数中全局可用。

设置自定义全局配置

Configuration conf = new Configuration();
conf.setString("mykey","myvalue");
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(conf);
val env = ExecutionEnvironment.getExecutionEnvironment
val conf = new Configuration()
conf.setString("mykey", "myvalue")
env.getConfig.setGlobalJobParameters(conf)

请注意,您还可以将自定义 ExecutionConfig.GlobalJobParameters 类作为全局作业参数传递给执行配置。该接口允许实现该 Map&lt;String, String&gt; toMap() 方法,该方法将依次显示来自 Web 前端中的配置的值。

从全局配置中访问值

全局作业参数中的对象可在系统中的许多位置访问。实现 RichFunction 接口的所有用户函数都可以通过运行时上下文访问。

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

  private String mykey;
  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    Configuration globConf = (Configuration) globalParams;
    mykey = globConf.getString("mykey", null);
  }
  // ... more here ...

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。