返回介绍

本地安装教程

发布于 2025-05-02 18:19:12 字数 7466 浏览 0 评论 0 收藏

只需几个简单的步骤即可启动并运行 Flink 示例程序。

设置:下载并启动 Flink

Flink 可在 Linux,Mac OS X 和 Windows 上运行 。为了能够运行 Flink,唯一的要求是安装一个有效的 Java 8.x. Windows 用户,请查看 Windows 上的 Flink 指南,该指南介绍了如何在 Windows 上运行 Flink 以进行本地设置。

您可以通过发出以下命令来检查 Java 正确安装:

java -version

如果你有 Java 8,输出将如下所示:

java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)

下载并编译

从我们的某个 存储 库克隆源代码,例如:

$ git clone https://github.com/apache/flink.git
$ cd flink
$ mvn clean package -DskipTests # this will take up to 10 minutes
$ cd build-target         # this is where Flink is installed to

启动本地 Flink 群集

$ ./bin/start-cluster.sh  # Start Flink

检查 分派器的 web 前端HTTP://localhost:8081 ,并确保一切都正常运行。Web 前端应报告单个可用的 TaskManager 实例。

调度员:概述

您还可以通过检查 logs 目录中的日志文件来验证系统是否正在运行:

$ tail log/flink-*-standalonesession-*.log
INFO ... - Rest endpoint listening at localhost:8081
INFO ... - http://localhost:8081 was granted leadership ...
INFO ... - Web frontend listening at http://localhost:8081.
INFO ... - Starting RPC endpoint for StandaloneResourceManager at akka://flink/user/resourcemanager .
INFO ... - Starting RPC endpoint for StandaloneDispatcher at akka://flink/user/dispatcher .
INFO ... - ResourceManager akka.tcp://[[email protected]](/cdn-cgi/l/email-protection):6123/user/resourcemanager was granted leadership ...
INFO ... - Starting the SlotManager.
INFO ... - Dispatcher akka.tcp://[[email protected]](/cdn-cgi/l/email-protection):6123/user/dispatcher was granted leadership ...
INFO ... - Recovering all persisted jobs.
INFO ... - Registering TaskManager ... under ... at the SlotManager.

阅读代码

您可以在 ScalaJava 上的 GitHub 上找到此 SocketWindowWordCount 示例的完整源代码。

object SocketWindowWordCount {

  def main(args: Array[String]) : Unit = {

    // the port to connect to
    val port: Int = try {
      ParameterTool.fromArgs(args).getInt("port")
    } catch {
      case e: Exception => {
        System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
        return
      }
    }

    // get the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // get input data by connecting to the socket
    val text = env.socketTextStream("localhost", port, '\n')

    // parse the data, group it, window it, and aggregate the counts
    val windowCounts = text
      .flatMap { w => w.split("\\s") }
      .map { w => WordWithCount(w, 1) }
      .keyBy("word")
      .timeWindow(Time.seconds(5), Time.seconds(1))
      .sum("count")

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1)

    env.execute("Socket Window WordCount")
  }

  // Data type for words with count
  case class WordWithCount(word: String, count: Long)
}
public class SocketWindowWordCount {

  public static void main(String[] args) throws Exception {

    // the port to connect to
    final int port;
    try {
      final ParameterTool params = ParameterTool.fromArgs(args);
      port = params.getInt("port");
    } catch (Exception e) {
      System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
      return;
    }

    // get the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // get input data by connecting to the socket
    DataStream<String> text = env.socketTextStream("localhost", port, "\n");

    // parse the data, group it, window it, and aggregate the counts
    DataStream<WordWithCount> windowCounts = text
      .flatMap(new FlatMapFunction<String, WordWithCount>() {
        @Override
        public void flatMap(String value, Collector<WordWithCount> out) {
          for (String word : value.split("\\s")) {
            out.collect(new WordWithCount(word, 1L));
          }
        }
      })
      .keyBy("word")
      .timeWindow(Time.seconds(5), Time.seconds(1))
      .reduce(new ReduceFunction<WordWithCount>() {
        @Override
        public WordWithCount reduce(WordWithCount a, WordWithCount b) {
          return new WordWithCount(a.word, a.count + b.count);
        }
      });

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1);

    env.execute("Socket Window WordCount");
  }

  // Data type for words with count
  public static class WordWithCount {

    public String word;
    public long count;

    public WordWithCount() {}

    public WordWithCount(String word, long count) {
      this.word = word;
      this.count = count;
    }

    @Override
    public String toString() {
      return word + " : " + count;
    }
  }
}

运行示例

现在,我们将运行此 Flink 应用程序。它将从套接字读取文本,并且每 5 秒打印一次前 5 秒内每个不同单词的出现次数,即处理时间的翻滚窗口,只要文字漂浮在其中。

  • 首先,我们使用 netcat 来启动本地服务器
$ nc -l 9000
  • 提交 Flink 计划:
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Starting execution of program

程序连接到套接字并等待输入。您可以检查 Web 界面以验证作业是否按预期运行:

调度员:概述(续) 调度程序:运行作业

  • 单词在 5 秒的时间窗口(处理时间,翻滚窗口)中计算并打印到 stdout 。监视 TaskManager 的输出文件并写入一些文本 nc (输入在点击后逐行发送到 Flink <return>):</return>
$ nc -l 9000
lorem ipsum
ipsum ipsum ipsum
bye

.out 文件将在每个时间窗口结束时,只要打印算作字浮在,例如:

$ tail -f log/flink-*-taskexecutor-*.out
lorem : 1
bye : 1
ipsum : 4

停止 Flink 当你做类型:

$ ./bin/stop-cluster.sh

下一步

查看更多 示例 以更好地了解 Flink 的编程 API。完成后,请继续阅读 流处理指南

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。