返回介绍

DataStream 转换

发布于 2025-05-02 18:19:13 字数 20857 浏览 0 评论 0 收藏 0

转换: 映射 DataStream→DataStream

描述:采用一个数据元并生成一个数据元。一个 map 函数,它将输入流的值加倍:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
  @Override
  public Integer map(Integer value) throws Exception {
    return 2 * value;
  }
});

转换: FlatMap DataStream→DataStream

描述:采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的 flatmap 函数:

dataStream.flatMap(new FlatMapFunction<String, String>() {
  @Override
  public void flatMap(String value, Collector<String> out)
    throws Exception {
    for(String word: value.split(" ")){
      out.collect(word);
    }
  }
});

转换: Filter DataStream→DataStream

描述:计算每个数据元的布尔函数,并保存函数返回 true 的数据元。过滤掉零值的过滤器:

dataStream.filter(new FilterFunction<Integer>() {
  @Override
  public boolean filter(Integer value) throws Exception {
    return value != 0;
  }
});

转换: KeyBy DataStream→KeyedStream

描述:逻辑上将流分区为不相交的分区。具有相同 Keys 的所有记录都分配给同一分区。在内部, keyBy() 是使用散列分区实现的。 指定键 有不同的方法。此转换返回 KeyedStream ,其中包括使用 被 Keys 化状态 所需的 KeyedStream 。

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

注意 如果出现以下情况,则类型 不能成为关键

  1. 它是 POJO 类型但不覆盖 hashCode() 方法并依赖于 Object.hashCode() 实现。
  2. 它是任何类型的数组。

转换: Reduce KeyedStream→DataStream

描述:被 Keys 化数据流上的“滚动”Reduce。将当前数据元与最后一个 Reduce 的值组合并发出新值。

reduce 函数,用于创建部分和的流:

keyedStream.reduce(new ReduceFunction<Integer>() {
  @Override
  public Integer reduce(Integer value1, Integer value2)
  throws Exception {
    return value1 + value2;
  }
});

转换: 折叠 KeyedStream→DataStream

描述:具有初始值的被 Keys 化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值。

折叠函数,当应用于序列(1,2,3,4,5)时,发出序列“start-1”,“start-1-2”,“start-1-2-3”,. ..

DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
  @Override
  public String fold(String current, Integer value) {
    return current + "-" + value;
  }
  });

转换: 聚合 KeyedStream→DataStream

描述:在被 Keys 化数据流上滚动聚合。min 和 minBy 之间的差异是 min 返回最小值,而 minBy 返回该字段中具有最小值的数据元(max 和 maxBy 相同)。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

转换: Window KeyedStream→WindowedStream

描述:可以在已经分区的 KeyedStream 上定义 Windows。Windows 根据某些特征(例如,在最后 5 秒内到达的数据)对每个 Keys 中的数据进行分组。有关 窗口 的完整说明,请参见 windows。

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

转换: WindowAll DataStream→AllWindowedStream

描述:Windows 可以在常规 DataStream 上定义。Windows 根据某些特征(例如,在最后 5 秒内到达的数据)对所有流事件进行分组。有关 窗口 的完整说明,请参见 windows。 警告: 在许多情况下,这 是非并行 转换。所有记录将收集在 windowAll 算子的一个任务中。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

转换: Window Apply WindowedStream→DataStream AllWindowedStream→DataStream

描述:将一般函数应用于整个窗口。下面是一个手动求和窗口数据元的函数。 注意: 如果您正在使用 windowAll 转换,则需要使用 AllWindowFunction。

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
  public void apply (Tuple tuple,
      Window window,
      Iterable<Tuple2<String, Integer>> values,
      Collector<Integer> out) throws Exception {
    int sum = 0;
    for (value t: values) {
      sum += t.f1;
    }
    out.collect (new Integer(sum));
  }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
  public void apply (Window window,
      Iterable<Tuple2<String, Integer>> values,
      Collector<Integer> out) throws Exception {
    int sum = 0;
    for (value t: values) {
      sum += t.f1;
    }
    out.collect (new Integer(sum));
  }
});

转换: Window Reduce WindowedStream→DataStream

描述:将函数缩减函数应用于窗口并返回缩小的值。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
  public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
    return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
  }
});

转换: Window Fold WindowedStream→DataStream

描述:将函数折叠函数应用于窗口并返回折叠值。示例函数应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”:

windowedStream.fold("start", new FoldFunction<Integer, String>() {
  public String fold(String current, Integer value) {
    return current + "-" + value;
  }
});

转换: Windows 上的聚合 WindowedStream→DataStream

描述:聚合窗口的内容。min 和 minBy 之间的差异是 min 返回最小值,而 minBy 返回该字段中具有最小值的数据元(max 和 maxBy 相同)。

windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");

转换: Union DataStream *→DataStream

描述:两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元。

dataStream.union(otherStream1, otherStream2, ...);

转换: Window Join DataStream,DataStream→DataStream

描述:在给定 Keys 和公共窗口上连接两个数据流。

dataStream.join(otherStream)
  .where(<key selector>).equalTo(<key selector>)
  .window(TumblingEventTimeWindows.of(Time.seconds(3)))
  .apply (new JoinFunction () {...});

转换: Interval Join KeyedStream,KeyedStream→DataStream

描述:在给定的时间间隔内使用公共 Keys 关联两个被 Key 化的数据流的两个数据元 e1 和 e2,以便 e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

// this will join the two streams so that
// key1 == key2 && leftTs - 2 &lt; rightTs &lt; leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
  .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
  .upperBoundExclusive(true) // optional
  .lowerBoundExclusive(true) // optional
  .process(new IntervalJoinFunction() {...});

转换: Window CoGroup DataStream,DataStream→DataStream

描述:在给定 Keys 和公共窗口上对两个数据流进行 Cogroup。

dataStream.coGroup(otherStream)
  .where(0).equalTo(1)
  .window(TumblingEventTimeWindows.of(Time.seconds(3)))
  .apply (new CoGroupFunction () {...});

转换: 连接 DataStream,DataStream→ConnectedStreams

描述:“连接”两个保存其类型的数据流。连接允许两个流之间的共享状态。

DataStream&lt;Integer&gt; someStream = //...
DataStream&lt;String&gt; otherStream = //...

ConnectedStreams&lt;Integer, String&gt; connectedStreams = someStream.connect(otherStream);

转换: CoMap,CoFlatMap ConnectedStreams→DataStream

描述:类似于连接数据流上的 map 和 flatMap

connectedStreams.map(new CoMapFunction&lt;Integer, String, Boolean&gt;() {
  @Override
  public Boolean map1(Integer value) {
    return true;
  }

  @Override
  public Boolean map2(String value) {
    return false;
  }
});
connectedStreams.flatMap(new CoFlatMapFunction&lt;Integer, String, String&gt;() {

   @Override
   public void flatMap1(Integer value, Collector&lt;String&gt; out) {
     out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector&lt;String&gt; out) {
     for (String word: value.split(" ")) {
     out.collect(word);
     }
   }
});

转换: 拆分 DataStream→SplitStream

描述:根据某些标准将流拆分为两个或更多个流。

SplitStream&lt;Integer&gt; split = someDataStream.split(new OutputSelector&lt;Integer&gt;() {
  @Override
  public Iterable&lt;String&gt; select(Integer value) {
    List&lt;String&gt; output = new ArrayList&lt;String&gt;();
    if (value % 2 == 0) {
      output.add("even");
    }
    else {
      output.add("odd");
    }
    return output;
  }
});

转换: 选择 SplitStream→DataStream

描述:从拆分流中选择一个或多个流。

SplitStream&lt;Integer&gt; split;
DataStream&lt;Integer&gt; even = split.select("even");
DataStream&lt;Integer&gt; odd = split.select("odd");
DataStream&lt;Integer&gt; all = split.select("even","odd");

转换: 迭代 DataStream→IterativeStream→DataStream

描述:通过将一个 算子的输出重定向到某个先前的 算子,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于 0 的数据元将被发送回反馈通道,其余数据元将向下游转发。有关完整说明,请参阅 迭代

IterativeStream&lt;Long&gt; iteration = initialStream.iterate();
DataStream&lt;Long&gt; iterationBody = iteration.map (/*do something*/);
DataStream&lt;Long&gt; feedback = iterationBody.filter(new FilterFunction&lt;Long&gt;(){
  @Override
  public boolean filter(Integer value) throws Exception {
    return value &gt; 0;
  }
});
iteration.closeWith(feedback);
DataStream&lt;Long&gt; output = iterationBody.filter(new FilterFunction&lt;Long&gt;(){
  @Override
  public boolean filter(Integer value) throws Exception {
    return value &lt;= 0;
  }
});

转换: 提取时间戳 DataStream→DataStream

描述:从记录中提取时间戳,以便使用使用事件时间语义的窗口。查看 活动时间

stream.assignTimestamps (new TimeStampExtractor() {...});

转换: Map DataStream → DataStream

描述:Takes one element and produces one element. A map function that doubles the values of the input stream:

dataStream.map { x =&gt; x * 2 }

转换: FlatMap DataStream → DataStream

描述:Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

dataStream.flatMap { str =&gt; str.split(" ") }

转换: Filter DataStream → DataStream

描述:Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

dataStream.filter { _ != 0 }

转换: KeyBy DataStream → KeyedStream

描述:Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedStream.

dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple

转换: Reduce KeyedStream → DataStream

描述:A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

A reduce function that creates a stream of partial sums:

keyedStream.reduce { _ + _ }

转换: Fold KeyedStream → DataStream

描述:A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

val result: DataStream[String] =
  keyedStream.fold("start")((str, i) =&gt; { str + "-" + i })

转换: Aggregations KeyedStream → DataStream

描述:Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")

转换: Window KeyedStream → WindowedStream

描述:Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows.

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data

转换: WindowAll DataStream → AllWindowedStream

描述:Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data

转换: Window Apply WindowedStream → DataStream AllWindowedStream → DataStream | Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window. Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.

windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction }

转换: Window Reduce WindowedStream → DataStream

描述:Applies a functional reduce function to the window and returns the reduced value.

windowedStream.reduce { _ + _ }

转换: Window Fold WindowedStream → DataStream

描述:Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":

val result: DataStream[String] =
  windowedStream.fold("start", (str, i) =&gt; { str + "-" + i })

转换: Aggregations on windows WindowedStream → DataStream

描述:Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")

转换: Union DataStream* → DataStream

描述:Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.

dataStream.union(otherStream1, otherStream2, ...)

转换: Window Join DataStream,DataStream → DataStream

描述:Join two data streams on a given key and a common window.

dataStream.join(otherStream)
  .where(&lt;key selector&gt;).equalTo(&lt;key selector&gt;)
  .window(TumblingEventTimeWindows.of(Time.seconds(3)))
  .apply { ... }

转换: Window CoGroup DataStream,DataStream → DataStream

描述:Cogroups two data streams on a given key and a common window.

dataStream.coGroup(otherStream)
  .where(0).equalTo(1)
  .window(TumblingEventTimeWindows.of(Time.seconds(3)))
  .apply {}

转换: Connect DataStream,DataStream → ConnectedStreams

描述:"Connects" two data streams retaining their types, allowing for shared state between the two streams.

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream)

转换: CoMap, CoFlatMap ConnectedStreams → DataStream

描述:Similar to map and flatMap on a connected data stream

connectedStreams.map(
  (_ : Int) =&gt; true,
  (_ : String) =&gt; false
)
connectedStreams.flatMap(
  (_ : Int) =&gt; true,
  (_ : String) =&gt; false
)

转换: Split DataStream → SplitStream

描述:Split the stream into two or more streams according to some criterion.

val split = someDataStream.split(
  (num: Int) =&gt;
  (num % 2) match {
    case 0 =&gt; List("even")
    case 1 =&gt; List("odd")
  }
)

转换: Select SplitStream → DataStream

描述:Select one or more streams from a split stream.

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")

转换: Iterate DataStream → IterativeStream → DataStream

描述:Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description.

initialStream.iterate {
  iteration =&gt; {
  val iterationBody = iteration.map {/*do something*/}
  (iterationBody.filter(_ &gt; 0), iterationBody.filter(_ &lt;= 0))
  }
}

转换: Extract Timestamps DataStream → DataStream

描述:Extracts timestamps from records in order to work with windows that use event time semantics. See Event Time .

stream.assignTimestamps { timestampExtractor }

Extraction from tuples, case classes and collections via anonymous pattern matching, like the following:

val data: DataStream[(Int, String, Double)] = // [...] data.map {
  case (id, name, temperature) => // [...] }

is not supported by the API out-of-the-box. To use this feature, you should use a Scala API extension .

以下转换可用于元组的数据流:


转换: Project DataStream→DataStream

描述:从元组中选择字段的子集

DataStream&lt;Tuple3&lt;Integer, Double, String&gt;&gt; in = // [...]
DataStream&lt;Tuple2&lt;String, Integer&gt;&gt; out = in.project(2,0);

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。