返回介绍

在 Flink 流程序中嵌入 Storm 算子

发布于 2025-05-02 18:19:17 字数 6932 浏览 0 评论 0 收藏 0

作为替代方案,Spouts 和 Bolts 可以嵌入到常规流处理节目中。Storm 兼容层为每个提供了一个打包类,即 SpoutWrapperBoltWrapperorg.apache.flink.storm.wrappers )。

每默认情况下,打包转换风暴输出元组 Flink 的 元组 类型(即, Tuple0Tuple25 根据风暴元组的字段数)。对于单场输出元组,也可以转换为字段的数据类型(例如, String 代替 Tuple1<String> )。

由于 Flink 无法推断 Storm 算子的输出字段类型,因此需要手动指定输出类型。为了获得正确的 TypeInformation 对象, TypeExtractor 可以使用 Flink 。

嵌入 Spouts

要将 Spout 用作 Flink 源,请使用 StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation) 。Spout 对象被传递给它的构造函数 SpoutWrapper<OUT> ,作为第一个参数 addSource(...) 。泛型类型声明 OUT 指定源输出流的类型。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// stream has `raw` type (single field output streams only)
DataStream<String> rawInput = env.addSource(
  new SpoutWrapper<String>(new FileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
  TypeExtractor.getForClass(String.class)); // output type

// process data stream
[...]

如果 Spout 发出有限数量的元组, SpoutWrapper 可以通过 numberOfInvocations 在其构造函数中设置参数来配置为自动终止。这允许 Flink 程序在处理完所有数据后自动关闭。默认情况下,程序将一直运行,直到手动 取消

嵌入螺栓

要使用 Bolt 作为 Flink 算子,请使用 DataStream.transform(String, TypeInformation, OneInputStreamOperator) 。Bolt 对象被传递给它的构造函数 BoltWrapper&lt;IN,OUT&gt; ,作为最后一个参数 transform(...) 。泛型类型声明 IN 并分别 OUT 指定 算子的输入和输出流的类型。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile(localFilePath);

DataStream<Tuple2<String, Integer>> counts = text.transform(
  "tokenizer", // operator name
  TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // output type
  new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())); // Bolt operator

// do further processing
[...]

嵌入式螺栓的命名属性访问

螺栓可以通过名称访问输入元组字段(另外通过索引访问)。要在嵌入式螺栓中使用此函数,您需要具有 a

  1. POJO 类型输入流或
  2. 元组 类型输入流并指定输入模式(即名称到索引映射)

对于 POJO 输入类型,Flink 通过反射访问字段。对于这种情况,Flink 期望相应的公共成员变量或公共 getter 方法。例如,如果 Bolt 通过名称 sentence (例如 String s = input.getStringByField("sentence"); )访问字段,则输入 POJO 类必须具有成员变量 public String sentence; 或方法 public String getSentence() { ... }; (注意驼峰式命名)。

对于 Tuple 输入类型,需要使用 Storm 的 Fields 类指定输入模式。对于这种情况,构造函数 BoltWrapper 需要另外一个参数: new BoltWrapper&lt;Tuple1&lt;String&gt;, ...&gt;(..., new Fields("sentence")) 。输入类型是 Tuple1&lt;String&gt;Fields("sentence") 指定 input.getStringByField("sentence") 相当于 input.getString(0)

有关 示例 ,请参阅 BoltTokenizerWordCountPojoBoltTokenizerWordCountWithNames

配置喷口和螺栓

在 Storm 中,Spouts 和 Bolts 可以配置一个全局分布的 Map 对象,该对象被赋予 submitTopology(...) 方法 LocalClusterStormSubmitter 。这 Map 是由拓扑旁边的用户提供的,并作为参数转发给呼叫 Spout.open(...)Bolt.prepare(...) 。如果在 Flink 中使用 FlinkTopologyBuilder 等执行整个拓扑,则不需要特别注意 - 它与常规 Storm 一样。

对于嵌入式使用,必须使用 Flink 的配置机制。可以在 StreamExecutionEnvironment via 中设置全局配置 .getConfig().setGlobalJobParameters(...) 。Flink 的常规 Configuration 课程可用于配置 Spouts 和 Bolts。但是, Configuration 不像 Storm 那样支持任意 Keys 数据类型(只 String 允许 Keys)。因此,Flink 还提供 StormConfig 了可以像 raw 一样使用的类, Map 以提供与 Storm 的完全兼容性。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StormConfig config = new StormConfig();
// set config values
[...]

// set global Storm configuration
env.getConfig().setGlobalJobParameters(config);

// assemble program with embedded Spouts and/or Bolts
[...]

多输出流

Flink 还可以处理 Spout 和 Bolts 的多个输出流的声明。如果在 Flink 中使用 FlinkTopologyBuilder 等执行整个拓扑,则不需要特别注意 - 它与常规 Storm 一样。

对于嵌入式使用,输出流将是数据类型 SplitStreamType&lt;T&gt; ,必须使用 DataStream.split(...) 和拆分 SplitStream.select(...) 。Flink 提供预定义输出选择 StormStreamSelector&lt;T&gt;.split(...) 已经。此外, SplitStreamTuple&lt;T&gt; 可以使用除去打包类型 SplitStreamMapper&lt;T&gt;

[...]

// get DataStream from Spout or Bolt which declares two output streams s1 and s2 with output type SomeType
DataStream<SplitStreamType<SomeType>> multiStream = ...

SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new StormStreamSelector<SomeType>());

// remove SplitStreamType using SplitStreamMapper to get data stream of type SomeType
DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);
DataStream<SomeType> s2 = splitStream.select("s2").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);

// do further processing on s1 and s2
[...]

有关完整示例,请参阅 SpoutSplitExample.java

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。