- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
SQL
SQL 查询是使用 sqlQuery()
方法指定的 TableEnvironment
。该方法返回 SQL 查询的结果为 Table
。A Table
可以在 后续的 SQL 和 Table API 查询中使用 ,可以 转换为 DataSet 或 DataStream ,也可以 写入 TableSink )。SQL 和 Table API 查询可以无缝混合,并进行整体优化并转换为单个程序。
要访问 SQL 查询中的表,必须 在 TableEnvironment 中注册 它。可以从 TableSource , Table , DataStream 或 DataSet 注册表 。或者,用户还可以 在 TableEnvironment 中注册外部目录 以指定数据源的位置。
为方便起见, Table.toString()
自动在其中以唯一名称注册表 TableEnvironment
并返回名称。因此, Table
对象可以直接内联到 SQL 查询中(通过字符串连接),如下面的示例所示。
注意: Flink 的 SQL 支持尚未完成。包含不受支持的 SQL 函数的查询会导致 a TableException
。以下部分列出了批处理和流表上 SQL 的受支持函数。
指定查询
以下示例显示如何在已注册和内联表中指定 SQL 查询。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// SQL query with an inlined (unregistered) table
Table table = tableEnv.toTable(ds, "user, product, amount");
Table result = tableEnv.sqlQuery(
"SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
// SQL query with a registered table
// register the DataStream as table "Orders"
tableEnv.registerDataStream("Orders", ds, "user, product, amount");
// run a SQL query on the Table and retrieve the result as a new Table
Table result2 = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
// SQL update with a registered table
// create and register a TableSink
TableSink csvSink = new CsvTableSink("/path/to/file", ...);
String[] fieldNames = {"product", "amount"};
TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink);
// run a SQL update query on the Table and emit the result to the TableSink
tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// SQL query with an inlined (unregistered) table val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
val result = tableEnv.sqlQuery(
s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
// SQL query with a registered table
// register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table val result2 = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
// SQL update with a registered table
// create and register a TableSink TableSink csvSink = new CsvTableSink("/path/to/file", ...)
val fieldNames: Array[String] = Array("product", "amount")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink)
// run a SQL update query on the Table and emit the result to the TableSink tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
支持的语法
Flink 使用 Apache Calcite 解析 SQL ,它支持标准的 ANSI SQL。Flink 不支持 DDL 语句。
以下 BNF 语法描述了批处理和流式查询中支持的 SQL 函数的超集。“ 算子” 部分显示支持的函数的示例,并指示仅批处理或流式查询支持哪些函数。
insert:
INSERT INTO tableReference
query
query:
values
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] [ [ catalogName . ] schemaName . ] tableName
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| UNNEST '(' expression ')'
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
Flink SQL 对类似于 Java 标识符(表,属性,函数名)使用词法策略:
- 无论是否引用标识符,都会保存标识符的大小写。
- 之后,标识符区分大小写。
- 与 Java 不同,反向标记允许标识符包含非字母数字字符(例如
"SELECT a AS
my fieldFROM t"
)。
算子
Scan,Projection 和过滤
操作: Scan/Select/As 批量 流
描述:
SELECT * FROM Orders
SELECT a, c AS d FROM Orders
操作: Where / Filter Batch Streaming
描述:
SELECT * FROM Orders WHERE b = 'red'
SELECT * FROM Orders WHERE a % 2 = 0
操作: User-defined Scalar Functions (Scalar UDF) 批量 流
描述:UDF 必须在 TableEnvironment 中注册。有关如何指定和注册标量 UDF 的详细信息,请参阅 UDF 文档 。
SELECT PRETTY_PRINT(user) FROM Orders
聚合
操作: GroupBy 聚合 批处理 流 结果更新
注意: 流表上的 GroupBy 会生成更新结果。有关详细信息,请参阅 Streaming Concepts 页面。
SELECT a, SUM(b) as d
FROM Orders
GROUP BY a
操作: GroupBy 窗口聚合 批量 流
描述:使用组窗口计算每个组的单个结果行。有关详细信息,请参阅 GroupWindows 部分。
SELECT user, SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
操作: Over Window 聚合 流
描述: 注意: 必须在同一窗口中定义所有聚合,即相同的分区,排序和范围。目前,仅支持具有 PRREDING(UNBOUNDED 和有界)到 CURRENT ROW 范围的窗口。尚不支持使用 FOLLOWING 的范围。必须在单个 时间属性 上指定 ORDER BY
SELECT COUNT(amount) OVER (
PARTITION BY user
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders
SELECT COUNT(amount) OVER w, SUM(amount) OVER w
FROM Orders
WINDOW w AS (
PARTITION BY user
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
操作: Distinct 批量 流 结果更新
SELECT DISTINCT users FROM Orders
注意: 对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同字段的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅 Streaming Concepts 。
操作: 分组集,汇总,多维数据集 批量
描述:
SELECT SUM(amount)
FROM Orders
GROUP BY GROUPING SETS ((user), (product))
操作: Having 批量 流
描述:
SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50
操作: 用户定义的聚合函数(UDAGG) 批量 流
描述:UDAGG 必须在 TableEnvironment 中注册。有关如何指定和注册 UDAGG 的详细信息,请参阅 UDF 文档 。
SELECT MyAggregate(amount)
FROM Orders
GROUP BY users
Join
操作: 内部 Equi-join 批量 流
描述:目前,仅支持等连接,即具有至少一个带有等式谓词的连接条件的连接。不支持任意交叉或 theta 连接。 注意: 连接顺序未优化。表按照 FROM 子句中指定的顺序连接。确保以不产生交叉连接(笛卡尔积)的顺序指定表,这些表不受支持并且会导致查询失败。
SELECT *
FROM Orders INNER JOIN Product ON Orders.productId = Product.id
注意: 对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅 Streaming Concepts 。
操作: 外部 Equi-join 批量 流 结果更新
描述:目前,仅支持等连接,即具有至少一个带有等式谓词的连接条件的连接。不支持任意交叉或 theta 连接。 注意: 连接顺序未优化。表按照 FROM 子句中指定的顺序连接。确保以不产生交叉连接(笛卡尔积)的顺序指定表,这些表不受支持并且会导致查询失败。
SELECT *
FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
SELECT *
FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
SELECT *
FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
注意: 对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅 Streaming Concepts 。
操作: Time-windowed Join 批量 流
描述: 注意: 时间窗口连接是可以以流方式处理的常规连接的子集。时间窗口连接需要至少一个等连接谓词和一个限制双方时间的连接条件。这样的条件可以由两个适当的范围谓词( <, <=, >=, >
), BETWEEN
谓词或单个等式谓词来定义,其比较两个输入表的相同类型的 时间属性 (即,处理时间或事件时间)。例如,以下谓词是有效的窗口连接条件:
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
如果订单在收到订单后四小时发货,上面的示例将关联所有订单及其相应的货件。
操作: 将数组扩展为关系 Batch Streaming
描述:尚未支持 UnANDing WITH ORDINALITY。
SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
操作: 关联用户定义的表函数(UDTF) 批量 流
描述:UDTF 必须在 TableEnvironment 中注册。有关如何指定和注册 UDTF 的详细信息,请参阅 UDF 文档 。内部联接
SELECT users, tag
FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag
左外连接
SELECT users, tag
FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE
注意: 目前, TRUE
对于横向表,只支持左外连接的谓词作为谓词。
设置 算子操作
操作: Union 批次
描述:
SELECT *
FROM (
(SELECT user FROM Orders WHERE a % 2 = 0)
UNION
(SELECT user FROM Orders WHERE b = 0)
)
操作: UnionAll Batch Streaming
描述:
SELECT *
FROM (
(SELECT user FROM Orders WHERE a % 2 = 0)
UNION ALL
(SELECT user FROM Orders WHERE b = 0)
)
操作: Intersect/ Except 批量
SELECT *
FROM (
(SELECT user FROM Orders WHERE a % 2 = 0)
INTERSECT
(SELECT user FROM Orders WHERE b = 0)
)
SELECT *
FROM (
(SELECT user FROM Orders WHERE a % 2 = 0)
EXCEPT
(SELECT user FROM Orders WHERE b = 0)
)
操作: IN 批量 流中
描述:如果表达式存在于给定的表子查询中,则返回 true。子查询表必须包含一列。此列必须与表达式具有相同的数据类型。
SELECT user, amount
FROM Orders
WHERE product IN (
SELECT product FROM NewProducts
)
注意: 对于流式查询, 算子操作将在连接和组 算子操作中重写。计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅 Streaming Concepts 。
操作: Exists 批量 流
描述:如果子查询至少返回一行,则返回 true。仅在可以在连接和组 算子操作中重写 算子操作时才支持。
SELECT user, amount
FROM Orders
WHERE product EXISTS (
SELECT product FROM NewProducts
)
注意: 对于流式查询, 算子操作将在连接和组 算子操作中重写。计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅 Streaming Concepts 。
OrderBy&Limit
操作: Order By 批量 流
描述: 注意: 流式查询的结果必须主要按升序 时间属性 排序。支持其他排序属性。
SELECT *
FROM Orders
ORDER BY orderTime
操作: Limit 批次
描述:
SELECT *
FROM Orders
LIMIT 3
Insert
操作: Insert 批量 流处理
描述:输出表必须在 TableEnvironment 中 注册 (请参阅 注册 TableSink )。此外,已注册表的模式必须与查询的模式匹配。
INSERT INTO OutputTable
SELECT users, tag
FROM Orders
GroupWindows
组窗口在 GROUP BY
SQL 查询的子句中定义。就像具有常规 GROUP BY
子句的查询一样,带有 GROUP BY
包含组窗口函数的子句的查询会为每个组计算单个结果行。批处理和流表上的 SQL 支持以下组窗口函数。
组窗口函数 | 描述 |
---|---|
TUMBLE(time_attr, interval) | 定义翻滚时间窗口。翻滚时间窗口将行分配给具有固定持续时间( interval )的非重叠连续窗口。例如,5 分钟的翻滚窗口以 5 分钟为间隔对行进行分组。可以在事件时间(流+批处理)或处理时间(流)上定义翻滚窗口。 |
HOP(time_attr, interval, interval) | 定义跳跃时间窗口(在 Table API 中称为滑动窗口)。跳跃时间窗口具有固定的持续时间(第二 interval 参数)并且按指定的跳跃间隔(第一 interval 参数)跳跃。如果跳跃间隔小于窗口大小,则跳跃窗口重叠。因此,可以将行分配给多个窗口。例如,15 分钟大小和 5 分钟跳跃间隔的跳跃窗口将每行分配给 3 个不同的 15 分钟大小的窗口,这些窗口以 5 分钟的间隔进行评估。可以在事件时间(流+批处理)或处理时间(流)上定义跳跃窗口。 |
SESSION(time_attr, interval) | 定义会话时间窗口。会话时间窗口没有固定的持续时间,但它们的界限由 interval 不活动时间定义,即如果在定义的间隙期间没有出现事件,则会话窗口关闭。例如,如果在 30 分钟不活动后观察到一行,则会开始一个 30 分钟间隙的会话窗口(否则该行将被添加到现有窗口中),如果在 30 分钟内未添加任何行,则会关闭。会话窗口可以在事件时间(流+批处理)或处理时间(流)上工作。 |
时间属性
对于流表的 SQL 查询, time_attr
组窗口函数的参数必须引用指定行的处理时间或事件时间的有效时间属性。请参阅 时间属性文档 以了解如何定义时间属性。
对于批处理表上的 SQL, time_attr
组窗口函数的参数必须是类型的属性 TIMESTAMP
。
选择组窗口开始和结束时间戳
可以使用以下辅助函数选择组窗口的开始和结束时间戳以及时间属性:
辅助函数 | 描述 |
---|---|
TUMBLE_START(time_attr, interval) | 返回相应的翻滚,跳跃或会话窗口的包含下限的时间戳。 |
HOP_START(time_attr, interval, interval) | |
SESSION_START(time_attr, interval) | |
TUMBLE_END(time_attr, interval) | 返回相应的翻滚,跳跃或会话窗口的 独占 上限的时间戳。 注意: 独占上限时间戳 不能 在后续基于时间的 算子操作中用作 行时属性 ,例如 时间窗口连接 和 组窗口或窗口聚合 。 |
HOP_END(time_attr, interval, interval) | |
SESSION_END(time_attr, interval) | |
TUMBLE_ROWTIME(time_attr, interval) | 返回相应的翻滚,跳跃或会话窗口的 包含 上限的时间戳。结果属性是 rowtime 属性 ,可用于后续基于时间的 算子操作,例如 时间窗口连接 和 组窗口或窗口聚合 。 |
HOP_ROWTIME(time_attr, interval, interval) | |
SESSION_ROWTIME(time_attr, interval) | |
TUMBLE_PROCTIME(time_attr, interval) | 返回 proctime 属性 ,该 属性 可用于后续基于时间的 算子操作,例如 时间窗口连接 和 组窗口或窗口聚合 。 |
HOP_PROCTIME(time_attr, interval, interval) | |
SESSION_PROCTIME(time_attr, interval) |
注意: 必须使用与 GROUP BY
子句中的组窗口函数完全相同的参数调用辅助函数。
以下示例显示如何在流表上指定具有组窗口的 SQL 查询。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// register the DataStream as table "Orders"
tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime, rowtime.rowtime");
// compute SUM(amount) per day (in event-time)
Table result1 = tableEnv.sqlQuery(
"SELECT user, " +
" TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart, " +
" SUM(amount) FROM Orders " +
"GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");
// compute SUM(amount) per day (in processing-time)
Table result2 = tableEnv.sqlQuery(
"SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user");
// compute every hour the SUM(amount) of the last 24 hours in event-time
Table result3 = tableEnv.sqlQuery(
"SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product");
// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
Table result4 = tableEnv.sqlQuery(
"SELECT user, " +
" SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, " +
" SESSION_ROWTIME(rowtime, INTERVAL '12' HOUR) AS snd, " +
" SUM(amount) " +
"FROM Orders " +
"GROUP BY SESSION(rowtime, INTERVAL '12' HOUR), user");
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// read a DataStream from an external source val ds: DataStream[(Long, String, Int)] = env.addSource(...)
// register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime)
// compute SUM(amount) per day (in event-time) val result1 = tableEnv.sqlQuery(
"""
|SELECT
| user,
| TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,
| SUM(amount)
| FROM Orders
| GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
""".stripMargin)
// compute SUM(amount) per day (in processing-time) val result2 = tableEnv.sqlQuery(
"SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user")
// compute every hour the SUM(amount) of the last 24 hours in event-time val result3 = tableEnv.sqlQuery(
"SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product")
// compute SUM(amount) per session with 12 hour inactivity gap (in event-time) val result4 = tableEnv.sqlQuery(
"""
|SELECT
| user,
| SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart,
| SESSION_END(rowtime, INTERVAL '12' HOUR) AS sEnd,
| SUM(amount)
| FROM Orders
| GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user
""".stripMargin)
数据类型
SQL 运行时构建于 Flink 的 DataSet 和 DataStream API 之上。在内部,它还使用 Flink TypeInformation
来定义数据类型。完全支持的类型列在 org.apache.flink.table.api.Types
。下表总结了 SQL 类型, Table API 类型和生成的 Java 类之间的关系。
Table API | SQL | Java 类型 |
---|---|---|
Types.STRING | VARCHAR | java.lang.String |
Types.BOOLEAN | BOOLEAN | java.lang.Boolean |
Types.BYTE | TINYINT | java.lang.Byte |
Types.SHORT | SMALLINT | java.lang.Short |
Types.INT | INTEGER, INT | java.lang.Integer |
Types.LONG | BIGINT | java.lang.Long |
Types.FLOAT | REAL, FLOAT | java.lang.Float |
Types.DOUBLE | DOUBLE | java.lang.Double |
Types.DECIMAL | DECIMAL | java.math.BigDecimal |
Types.SQL_DATE | DATE | java.sql.Date |
Types.SQL_TIME | TIME | java.sql.Time |
Types.SQL_TIMESTAMP | TIMESTAMP(3) | java.sql.Timestamp |
Types.INTERVAL_MONTHS | INTERVAL YEAR TO MONTH | java.lang.Integer |
Types.INTERVAL_MILLIS | INTERVAL DAY TO SECOND(3) | java.lang.Long |
Types.PRIMITIVE_ARRAY | ARRAY | 例如 int[] |
Types.OBJECT_ARRAY | ARRAY | 例如 java.lang.Byte[] |
Types.MAP | MAP | java.util.HashMap |
Types.MULTISET | MULTISET | 例如, java.util.HashMap<String, Integer> 对于多重集合 String |
Types.ROW | ROW | org.apache.flink.types.Row |
通用类型和(嵌套)复合类型(例如,POJO,元组,Row,Scala 案例类)也可以是行的字段。
可以使用 值访问函数 访问具有任意嵌套的复合类型的字段。
通用类型被视为黑盒子,可以由 用户定义的函数 传递或处理。
保存关键字
虽然并非每个 SQL 函数都已实现,但某些字符串组合已被保存为关键字以供将来使用。如果要将以下字符串之一用作字段名称,请确保使用反引号将其包围(例如 value
, count
)。
A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论