返回介绍

Scala REPL

发布于 2025-05-02 18:19:19 字数 4441 浏览 0 评论 0 收藏 0

Flink 附带了一个集成的交互式 Scala Shell。它可以在本地设置和群集设置中使用。

要将 shell 与集成的 Flink 集群一起使用,只需执行:

bin/start-scala-shell.sh local

在二进制 Flink 目录的根目录中。要在群集上运行 Shell,请参阅下面的“设置”部分。

用法

shell 支持 Batch 和 Streaming。启动后会自动预先绑定两个不同的 ExecutionEnvironments。使用“benv”和“senv”分别访问 Batch 和 Streaming 环境。

DataSet API

以下示例将在 Scala shell 中执行 wordcount 程序:

Scala-Flink> val text = benv.fromElements(
  "To be, or not to be,--that is the question:--",
  "Whether 'tis nobler in the mind to suffer",
  "The slings and arrows of outrageous fortune",
  "Or to take arms against a sea of troubles,")
Scala-Flink> val counts = text
  .flatMap { _.toLowerCase.split("\\W+") }
  .map { (_, 1) }.groupBy(0).sum(1)
Scala-Flink> counts.print()

print()命令将自动将指定的任务发送到 JobManager 执行,并在终端中显示计算结果。

可以将结果写入文件。但是,在这种情况下,您需要调用 execute ,以运行您的程序:

Scala-Flink> benv.execute("MyProgram")

DataStream API

与上面的批处理程序类似,我们可以通过 DataStream API 执行流程序:

Scala-Flink> val textStreaming = senv.fromElements(
  "To be, or not to be,--that is the question:--",
  "Whether 'tis nobler in the mind to suffer",
  "The slings and arrows of outrageous fortune",
  "Or to take arms against a sea of troubles,")
Scala-Flink> val countsStreaming = textStreaming
  .flatMap { _.toLowerCase.split("\\W+") }
  .map { (_, 1) }.keyBy(0).sum(1)
Scala-Flink> countsStreaming.print()
Scala-Flink> senv.execute("Streaming Wordcount")

请注意,在 Streaming 情况下,打印 算子操作不会直接触发执行。

Flink Shell 附带命令历史记录和自动完成函数。

添加外部依赖项

可以将外部类路径添加到 Scala-shell。当调用 execute 时,它们将与 shell 程序一起自动发送到 JobManager。

使用参数 -a <path/to/jar.jar>--addclasspath <path/to/jar.jar> 加载其他类。

bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>

建立

要了解 Scala Shell 提供的选项,请使用

bin/start-scala-shell.sh --help

本地

要将 shell 与集成的 Flink 集群一起使用,只需执行:

bin/start-scala-shell.sh local

远程

要将其与正在运行的集群一起使用,请使用关键字启动 scala shell, remote 并为 JobManager 提供以下主机和端口:

bin/start-scala-shell.sh remote <hostname> <portnumber>

YARNScala Shell 集群

shell 可以将 Flink 集群部署到 YARN,YARN 专门由 shell 使用。YARN 容器的数量可以通过参数控制 -n &lt;arg&gt; 。shell 在 YARN 上部署新的 Flink 集群并连接集群。您还可以为 YARN 群集指定选项,例如 JobManager 的内存,YARN 应用程序的名称等。

例如,要使用两个 TaskManagers 为 Scala Shell 启动 Yarn 集群,请使用以下命令:

 bin/start-scala-shell.sh yarn -n 2

对于所有其他选项,请参阅底部的完整参考。

YARN Session

如果您之前使用 Flink Yarn 会话部署了 Flink 集群,则 Scala shell 可以使用以下命令与其连接:

 bin/start-scala-shell.sh yarn

完整参考

Flink Scala Shell
Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...

Command: local [options]
Starts Flink scala shell with a local Flink cluster
  -a <path/to/jar> | --addclasspath <path/to/jar>
    Specifies additional jars to be used in Flink
Command: remote [options] <host> <port>
Starts Flink scala shell connecting to a remote cluster
  <host>
    Remote host name as string
  <port>
    Remote port as integer

  -a <path/to/jar> | --addclasspath <path/to/jar>
    Specifies additional jars to be used in Flink
Command: yarn [options]
Starts Flink scala shell connecting to a yarn cluster
  -n arg | --container arg
    Number of YARN container to allocate (= Number of TaskManagers)
  -jm arg | --jobManagerMemory arg
    Memory for JobManager container with optional unit (default: MB)
  -nm <value> | --name <value>
    Set a custom name for the application on YARN
  -qu <arg> | --queue <arg>
    Specifies YARN queue
  -s <arg> | --slots <arg>
    Number of slots per TaskManager
  -tm <arg> | --taskManagerMemory <arg>
    Memory per TaskManager container with optional unit (default: MB)
  -a <path/to/jar> | --addclasspath <path/to/jar>
    Specifies additional jars to be used in Flink
  --configDir <value>
    The configuration directory.
  -h | --help
    Prints this usage text

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。