- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
SQL 客户端测试版
Flink 的 Table&SQL API 可以处理用 SQL 语言编写的查询,但是这些查询需要嵌入在用 Java 或 Scala 编写的表程序中。而且,这些程序需要在提交到集群之前使用构建工具打包。这或多或少地限制了 Flink 对 Java / Scala 程序员的使用。
在 SQL 客户端 旨在提供编写,调试,并提交表格程序到 Flink 集群的一个简单的方法没有的 Java 或 Scala 代码一行。在 SQL 客户端 CLI 允许检索和命令行中运行分布式应用可视化实时结果。
注意 SQL 客户端处于早期开发阶段。即使应用程序还没有生产就绪,它可以是一个非常有用的工具,用于原型设计和 Flink SQL。在未来,社区计划通过提供基于 REST 的 SQL 客户端网关 来扩展其函数。
入门
本节介绍如何从命令行设置和运行第一个 Flink SQL 程序。
SQL 客户端捆绑在常规 Flink 分发中,因此可以开箱即用。它只需要一个正在运行的 Flink 集群,其中可以执行表程序。有关设置 Flink 群集的详细信息,请参阅 群集和部署 部分。如果您只想尝试 SQL 客户端,还可以使用以下命令启动具有一个 worker 的本地群集:
./bin/start-cluster.sh
启动 SQL 客户端 CLI
SQL 客户端脚本也位于 Flink 的二进制目录中。 将来 ,用户可以通过启动嵌入式独立进程或连接到远程 SQL 客户端网关来启动 SQL Client CLI。目前仅 embedded
支持该模式。您可以通过调用以下命令启动 CLI:
./bin/sql-client.sh embedded
默认情况下,SQL 客户端将从位于的环境文件中读取其配置 ./conf/sql-client-defaults.yaml
。有关环境文件结构的更多信息,请参阅 配置部分 。
运行 SQL 查询
启动 CLI 后,您可以使用该 HELP
命令列出所有可用的 SQL 语句。要验证您的设置和群集连接,您可以输入第一个 SQL 查询并 Enter
按键执行它:
SELECT 'Hello World'
此查询不需要表源,并生成单行结果。CLI 将从群集中检索结果并将其可视化。您可以通过按键关闭结果视图 Q
。
CLI 支持 两种 维护和可视化结果的 模式 。
该 表模式 物化在内存中的结果和可视化他们的常客,分页表表示。可以通过在 CLI 中执行以下命令来启用它:
SET execution.result-mode=table
的 更改日志模式 不实现结果和可视化,其由所产生的结果数据流 的连续查询 由插入的( +
)和撤消( -
)。
SET execution.result-mode=changelog
您可以使用以下查询来查看两种结果模式:
SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name
此查询执行有界字数计数示例。
在 更改日志模式下 ,可视化更改日志应类似于:
+ Bob, 1
+ Alice, 1
+ Greg, 1
- Bob, 1
+ Bob, 2
在 表模式下 ,可视化结果表不断更新,直到表程序结束为:
Bob, 2
Alice, 1
Greg, 1
在 SQL 查询的原型设计期间,两种结果模式都很有用。在这两种模式下,结果都存储在 SQL 客户端的 Java 堆内存中。为了使 CLI 界面保持响应,更改日志模式仅显示最新的 1000 个更改。表模式允许导航更大的结果,这些结果仅受可用主存储器和配置的 最大行数 ( max-table-result-rows
)的限制。
注意只能使用 table
结果模式检索在批处理环境中执行的查询。
定义查询后,可以将其作为长时间运行的分离 Flink 作业提交给集群。为此,需要使用 INSERT INTO 语句 指定存储结果的目标系统。在 配置部分 解释了如何申报表源读取数据,如何申报表汇写入数据,以及如何配置其它表程序性能。
配置
可以使用以下可选 CLI 命令启动 SQL Client。它们将在随后的章节中详细讨论。
./bin/sql-client.sh embedded --help
Mode "embedded" submits Flink jobs from the local machine.
Syntax: embedded [OPTIONS]
"embedded" mode options:
-d,--defaults <environment file> The environment properties with which
every new session is initialized.
Properties might be overwritten by
session properties.
-e,--environment <environment file> The environment properties to be
imported into the session. It might
overwrite default environment
properties.
-h,--help Show the help message with
descriptions of all options.
-j,--jar <JAR file> A JAR file to be imported into the
session. The file might contain
user-defined classes needed for the
execution of statements such as
functions, table sources, or sinks.
Can be used multiple times.
-l,--library <JAR directory> A JAR file directory with which every
new session is initialized. The files
might contain user-defined classes
needed for the execution of
statements such as functions, table
sources, or sinks. Can be used
multiple times.
-s,--session <session identifier> The identifier for a session.
'default' is the default identifier.
环境文件
SQL 查询需要一个执行它的配置环境。所谓的 环境文件 定义了可用的表源和接收器,外部目录,用户定义的函数以及执行和部署所需的其他属性。
每个环境文件都是常规的 YAML 文件 。下面给出了这种文件的一个例子。
# Define table sources and sinks here.
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
tables:
- name: MyTableSource
type: source
update-mode: append
connector:
type: filesystem
path: "/path/to/something.csv"
format:
type: csv
fields:
- name: MyField1
type: INT
- name: MyField2
type: VARCHAR
line-delimiter: "\n"
comment-prefix: "#"
schema:
- name: MyField1
type: INT
- name: MyField2
type: VARCHAR
# Define table views here.
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
views:
- name: MyCustomView
query: "SELECT MyField2 FROM MyTableSource"
# Define user-defined functions here.
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
functions:
- name: myUDF
from: class
class: foo.bar.AggregateUDF
constructor:
- 7.6
- false
# Execution properties allow for changing the behavior of a table program.
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
execution:
type: streaming # required: execution mode either 'batch' or 'streaming'
result-mode: table # required: either 'table' or 'changelog'
max-table-result-rows: 1000000 # optional: maximum number of maintained rows in
# 'table' mode (1000000 by default, smaller 1 means unlimited)
time-characteristic: event-time # optional: 'processing-time' or 'event-time' (default)
parallelism: 1 # optional: Flink's parallelism (1 by default)
periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default)
min-idle-state-retention: 0 # optional: table program's minimum idle state time
max-idle-state-retention: 0 # optional: table program's maximum idle state time
restart-strategy: # optional: restart strategy
type: fallback # "fallback" to global restart strategy by default
# Deployment properties allow for describing the cluster to which table programs are submitted to.
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
deployment:
response-timeout: 5000
这个配置:
- 定义一个具有
MyTableSource
从 CSV 文件读取的表源的环境, - 定义一个
MyCustomView
使用 SQL 查询声明虚拟表的视图, - 定义了一个用户定义的函数
myUDF
,可以使用类名和两个构造函数参数进行实例化, - 为在此流式传输环境中执行的查询指定 1 的并行度,
- 指定事件时间特征,和
- 在
table
结果模式下运行查询。
根据用例,可以将配置拆分为多个文件。因此,可以为一般用途( 默认 使用 环境文件 --defaults
)以及每个会话(使用 会话环境文件 --environment
)创建 环境文件 。每个 CLI 会话都使用默认属性进行初始化,后跟会话属性。例如,默认环境文件可以指定在每个会话中可用于查询的所有表源,而会话环境文件仅声明特定的状态保存时间和并行性。启动 CLI 应用程序时,可以传递默认和会话环境文件。如果未指定默认环境文件,则 SQL 客户端将搜索 ./conf/sql-client-defaults.yaml
在 Flink 的配置目录中。
注意在 CLI 会话中设置的属性(例如,使用该 SET
命令)具有最高优先级:
CLI commands > session environment file > defaults environment file
重启策略
重新启动策略控制在发生故障时如何重新启动 Flink 作业。与 Flink 集群的 全局重新启动策略 类似,可以在环境文件中声明更细粒度的重新启动配置。
支持以下策略:
execution:
# falls back to the global strategy defined in flink-conf.yaml
restart-strategy:
type: fallback
# job fails directly and no restart is attempted
restart-strategy:
type: none
# attempts a given number of times to restart the job
restart-strategy:
type: fixed-delay
attempts: 3 # retries before job is declared as failed (default: Integer.MAX_VALUE)
delay: 10000 # delay in ms between retries (default: 10 s)
# attempts as long as the maximum number of failures per time interval is not exceeded
restart-strategy:
type: failure-rate
max-failures-per-interval: 1 # retries in interval until failing (default: 1)
failure-rate-interval: 60000 # measuring interval in ms for failure rate
delay: 10000 # delay in ms between retries (default: 10 s)
依赖
SQL 客户端不需要使用 Maven 或 SBT 设置 Java 项目。相反,您可以将依赖项作为提交到集群的常规 JAR 文件传递。您可以单独指定每个 JAR 文件(使用 --jar
),也可以定义整个库目录(使用 --library
)。对于连接外部系统(如 Apache Kafka)和相应数据格式(如 JSON)的连接器,Flink 提供了 即用型 JAR 包 。这些 JAR 文件 sql-jar
以 Maven 中央存储库的每个版本为后缀并可以下载。
可以在与 外部系统页面 的 连接上 找到提供的 SQL JAR 的完整列表以及有关如何使用它们的文档。
以下示例显示了一个环境文件,该文件定义从 Apache Kafka 读取 JSON 数据的表源。
tables:
- name: TaxiRides
type: source
update-mode: append
connector:
property-version: 1
type: kafka
version: 0.11
topic: TaxiRides
startup-mode: earliest-offset
properties:
- key: zookeeper.connect
value: localhost:2181
- key: bootstrap.servers
value: localhost:9092
- key: group.id
value: testGroup
format:
property-version: 1
type: json
schema: "ROW(rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP)"
schema:
- name: rideId
type: LONG
- name: lon
type: FLOAT
- name: lat
type: FLOAT
- name: rowTime
type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "rideTime"
watermarks:
type: "periodic-bounded"
delay: "60000"
- name: procTime
type: TIMESTAMP
proctime: true
生成的 TaxiRide
表模式包含 JSON 模式的大多数字段。此外,它还添加了 rowtime 属性 rowTime
和 processing-time 属性 procTime
。
双方 connector
并 format
允许定义属性的版本(这是目前版本 1
),为未来的向后兼容性。
用户定义的函数
SQL 客户端允许用户创建要在 SQL 查询中使用的自定义用户定义函数。目前,这些函数仅限于在 Java / Scala 类中以编程方式定义。
为了提供用户定义的函数,您需要首先实现和编译扩展的函数类 ScalarFunction
, AggregateFunction
或者 TableFunction
(参见 用户定义的函数 )。然后可以将一个或多个函数打包到 SQL 客户端的依赖项 JAR 中。
在调用之前,必须在环境文件中声明所有函数。对于列表中的每个项目 functions
,必须指定
- a
name
注册函数的, - 使用函数的来源
from
(限于class
现在), - 该
class
指示函数的完全合格的类名和一个可选列表constructor
的实例参数。
functions:
- name: ... # required: name of the function
from: class # required: source of the function (can only be "class" for now)
class: ... # required: fully qualified class name of the function
constructor: # optimal: constructor parameters of the function class
- ... # optimal: a literal parameter with implicit type
- class: ... # optimal: full class name of the parameter
constructor: # optimal: constructor parameters of the parameter's class
- type: ... # optimal: type of the literal parameter
value: ... # optimal: value of the literal parameter
确保指定参数的顺序和类型严格匹配函数类的一个构造函数。
构造函数参数
根据用户定义的函数,可能需要在 SQL 语句中使用它之前参数化实现。
如前面的示例所示,在声明用户定义的函数时,可以使用以下三种方法之一使用构造函数参数来配置类:
具有隐式类型的文字值: SQL 客户端将根据文字值本身自动派生类型。目前,仅值 BOOLEAN
, INT
, DOUBLE
和 VARCHAR
在这里的支持。如果自动派生不能按预期工作(例如,您需要 VARCHAR false
),请改用显式类型。
- true # -> BOOLEAN (case sensitive)
- 42 # -> INT
- 1234.222 # -> DOUBLE
- foo # -> VARCHAR
具有显式类型的文字值: 使用类型安全性 type
和 value
属性显式声明参数。
- type: DECIMAL
value: 11111111111111111
下表说明了受支持的 Java 参数类型和相应的 SQL 类型字符串。
Java 类型 | SQL 类型 |
---|---|
java.math.BigDecimal | DECIMAL |
java.lang.Boolean | BOOLEAN |
java.lang.Byte | TINYINT |
java.lang.Double | DOUBLE |
java.lang.Float | REAL , FLOAT |
java.lang.Integer | INTEGER , INT |
java.lang.Long | BIGINT |
java.lang.Short | SMALLINT |
java.lang.String | VARCHAR |
更多类型(例如, TIMESTAMP
或 ARRAY
),原始类型,并且 null
尚不支持。
(嵌套)类实例: 除文字值外,您还可以通过指定 class
和 constructor
属性为构造函数参数创建(嵌套)类实例。可以递归地执行此过程,直到所有构造函数参数都使用文字值表示。
- class: foo.bar.paramClass
constructor:
- StarryName
- class: java.lang.Integer
constructor:
- class: java.lang.String
constructor:
- type: VARCHAR
value: 3
分离的 SQL 查询
为了定义端到端 SQL 管道,SQL INSERT INTO
语句可用于向 Flink 集群提交长时间运行的分离查询。这些查询将结果生成到外部系统而不是 SQL 客户端。这允许处理更高的并行性和更大量的数据。CLI 本身在提交后对分离的查询没有任何控制权。
INSERT INTO MyTableSink SELECT * FROM MyTableSource
MyTableSink
必须在环境文件中声明表接收器。有关支持的外部系统及其配置的详细信息,请参阅 连接页面 。Apache Kafka 表接收器的示例如下所示。
tables:
- name: MyTableSink
type: sink
update-mode: append
connector:
property-version: 1
type: kafka
version: 0.11
topic: OutputTopic
properties:
- key: zookeeper.connect
value: localhost:2181
- key: bootstrap.servers
value: localhost:9092
- key: group.id
value: testGroup
format:
property-version: 1
type: json
derive-schema: true
schema:
- name: rideId
type: LONG
- name: lon
type: FLOAT
- name: lat
type: FLOAT
- name: rideTime
type: TIMESTAMP
SQL 客户端确保将语句成功提交到群集。提交查询后,CLI 将显示有关 Flink 作业的信息。
[INFO] Table update statement has been successfully submitted to the cluster:
Cluster ID: StandaloneClusterId
Job ID: 6f922fe5cba87406ff23ae4a7bb79044
Web interface: http://localhost:8081
注意 SQL Client 在提交后不会跟踪正在运行的 Flink 作业的状态。提交后可以关闭 CLI 进程,而不会影响分离的查询。Flink 的 重启策略 负责容错。可以使用 Flink 的 Web 界面,命令行或 REST API 取消查询。
SQL 视图
视图允许从 SQL 查询定义虚拟表。视图定义将立即进行解析和验证。但是,在提交常规 INSERT INTO
或 SELECT
语句期间访问视图时会发生实际执行。
视图可以在 环境文件中 定义,也可以在 CLI 会话中定义。
以下示例显示如何在文件中定义多个视图:
views:
- name: MyRestrictedView
query: "SELECT MyField2 FROM MyTableSource"
- name: MyComplexView
query: >
SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
FROM MyTableSource
WHERE MyField2 > 200
与表源和接收器类似,会话环境文件中定义的视图具有最高优先级。
也可以使用以下 CREATE VIEW
语句在 CLI 会话中创建视图:
CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
也可以使用以下 DROP VIEW
语句再次删除在 CLI 会话中创建的视图:
DROP VIEW MyNewView
注意视图的定义仅限于上面提到的语法。在将来的版本中,将支持为表名中的视图或转义空格定义模式。
局限与未来
当前的 SQL 客户端实施处于非常早期的开发阶段,并且可能会在未来作为更大的 Flink 改进提案 24( FLIP-24 )的一部分进行更改。随意加入讨论,并打开有关您认为有用的错误和函数的问题。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论