返回介绍

执行 Storm 拓扑

发布于 2025-05-02 18:19:17 字数 1797 浏览 0 评论 0 收藏

Flink 提供与 Storm 兼容的 API( org.apache.flink.storm.api ),它可以替代以下类:

  • StormSubmitter 取而代之 FlinkSubmitter
  • NimbusClientClient 替换为 FlinkClient
  • LocalCluster 取而代之 FlinkLocalCluster

为了向 Flink 提交 Storm 拓扑,只需使用 组装 拓扑的 Storm 客户端代码 中的 Flink 替换来替换使用过的 Storm 类。实际的运行时代码,即 Spouts 和 Bolts,可以不加 修改 地使用。如果拓扑在远程集群执行时,参数 nimbus.hostnimbus.thrift.port 被用作 jobmanger.rpc.addressjobmanger.rpc.port 分别。如果未指定参数,则取值 flink-conf.yaml

TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder

// actual topology assembling code and used Spouts/Bolts can be used as-is
builder.setSpout("source", new FileSpout(inputFilePath));
builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("counter");

Config conf = new Config();
if(runLocal) { // submit to test cluster
  // replaces: LocalCluster cluster = new LocalCluster();
  FlinkLocalCluster cluster = new FlinkLocalCluster();
  cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
} else { // submit to remote cluster
  // optional
  // conf.put(Config.NIMBUS_HOST, "remoteHost");
  // conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
  // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
  FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
}

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。