- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
Scala REPL
Flink 附带了一个集成的交互式 Scala Shell。它可以在本地设置和群集设置中使用。
要将 shell 与集成的 Flink 集群一起使用,只需执行:
bin/start-scala-shell.sh local
在二进制 Flink 目录的根目录中。要在群集上运行 Shell,请参阅下面的“设置”部分。
用法
shell 支持 Batch 和 Streaming。启动后会自动预先绑定两个不同的 ExecutionEnvironments。使用“benv”和“senv”分别访问 Batch 和 Streaming 环境。
DataSet API
以下示例将在 Scala shell 中执行 wordcount 程序:
Scala-Flink> val text = benv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala-Flink> val counts = text
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.groupBy(0).sum(1)
Scala-Flink> counts.print()
print()命令将自动将指定的任务发送到 JobManager 执行,并在终端中显示计算结果。
可以将结果写入文件。但是,在这种情况下,您需要调用 execute
,以运行您的程序:
Scala-Flink> benv.execute("MyProgram")
DataStream API
与上面的批处理程序类似,我们可以通过 DataStream API 执行流程序:
Scala-Flink> val textStreaming = senv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala-Flink> val countsStreaming = textStreaming
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.keyBy(0).sum(1)
Scala-Flink> countsStreaming.print()
Scala-Flink> senv.execute("Streaming Wordcount")
请注意,在 Streaming 情况下,打印 算子操作不会直接触发执行。
Flink Shell 附带命令历史记录和自动完成函数。
添加外部依赖项
可以将外部类路径添加到 Scala-shell。当调用 execute 时,它们将与 shell 程序一起自动发送到 JobManager。
使用参数 -a <path/to/jar.jar>
或 --addclasspath <path/to/jar.jar>
加载其他类。
bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>
建立
要了解 Scala Shell 提供的选项,请使用
bin/start-scala-shell.sh --help
本地
要将 shell 与集成的 Flink 集群一起使用,只需执行:
bin/start-scala-shell.sh local
远程
要将其与正在运行的集群一起使用,请使用关键字启动 scala shell, remote
并为 JobManager 提供以下主机和端口:
bin/start-scala-shell.sh remote <hostname> <portnumber>
YARNScala Shell 集群
shell 可以将 Flink 集群部署到 YARN,YARN 专门由 shell 使用。YARN 容器的数量可以通过参数控制 -n <arg>
。shell 在 YARN 上部署新的 Flink 集群并连接集群。您还可以为 YARN 群集指定选项,例如 JobManager 的内存,YARN 应用程序的名称等。
例如,要使用两个 TaskManagers 为 Scala Shell 启动 Yarn 集群,请使用以下命令:
bin/start-scala-shell.sh yarn -n 2
对于所有其他选项,请参阅底部的完整参考。
YARN Session
如果您之前使用 Flink Yarn 会话部署了 Flink 集群,则 Scala shell 可以使用以下命令与其连接:
bin/start-scala-shell.sh yarn
完整参考
Flink Scala Shell
Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...
Command: local [options]
Starts Flink scala shell with a local Flink cluster
-a <path/to/jar> | --addclasspath <path/to/jar>
Specifies additional jars to be used in Flink
Command: remote [options] <host> <port>
Starts Flink scala shell connecting to a remote cluster
<host>
Remote host name as string
<port>
Remote port as integer
-a <path/to/jar> | --addclasspath <path/to/jar>
Specifies additional jars to be used in Flink
Command: yarn [options]
Starts Flink scala shell connecting to a yarn cluster
-n arg | --container arg
Number of YARN container to allocate (= Number of TaskManagers)
-jm arg | --jobManagerMemory arg
Memory for JobManager container with optional unit (default: MB)
-nm <value> | --name <value>
Set a custom name for the application on YARN
-qu <arg> | --queue <arg>
Specifies YARN queue
-s <arg> | --slots <arg>
Number of slots per TaskManager
-tm <arg> | --taskManagerMemory <arg>
Memory per TaskManager container with optional unit (default: MB)
-a <path/to/jar> | --addclasspath <path/to/jar>
Specifies additional jars to be used in Flink
--configDir <value>
The configuration directory.
-h | --help
Prints this usage text
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论