返回介绍

概念和通用 API

发布于 2025-05-02 18:19:15 字数 42560 浏览 0 评论 0 收藏

Table API 和 SQL 集成在一个联合 API 中。此 API 的核心概念是 Table 用作查询的输入和输出。本文档显示了具有 Table API 和 SQL 查询的程序的常见结构,如何注册 Table ,如何查询 Table 以及如何发出 Table

Table API 和 SQL 程序的结构

批处理和流式传输的所有 Table API 和 SQL 程序都遵循相同的模式。以下代码示例显示了 Table API 和 SQL 程序的常见结构。

// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// create a TableEnvironment
// for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register a Table
tableEnv.registerTable("table1", ...)      // or
tableEnv.registerTableSource("table2", ...);   // or
tableEnv.registerExternalCatalog("extCat", ...);

// create a Table from a  Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
// create a Table from a SQL query
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");

// emit a  Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...);

// execute
env.execute();
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment val env = StreamExecutionEnvironment.getExecutionEnvironment

// create a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

// register a Table tableEnv.registerTable("table1", ...)       // or tableEnv.registerTableSource("table2", ...)   // or tableEnv.registerExternalCatalog("extCat", ...) 

// create a Table from a  Table API query val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// emit a  Table API result Table to a TableSink, same for SQL result tapiResult.writeToSink(...)

// execute env.execute()

注意: Table API 和 SQL 查询可以轻松集成并嵌入到 DataStream 或 DataSet 程序中。查看 与 DataStream 和 DataSet API集成 部分,了解如何将 DataStream 和 DataSet 转换为 Tables,反之亦然。

创建一个 TableEnvironment

TableEnvironment 是 Table API 和 SQL 集成的核心概念。它负责:

  • Table 在内部目录中注册 a
  • 注册外部目录
  • 执行 SQL 查询
  • 注册用户定义的(标量,表或聚合)函数
  • 转换 a DataStreamDataSet 转换为 a Table
  • 持有对 ExecutionEnvironment 或的引用 StreamExecutionEnvironment

A Table 总是与特定的约束 TableEnvironment 。不可能在同一查询中组合不同 TableEnvironments 的表,例如,关联或联合它们。

TableEnvironment 是通过调用静态创建 TableEnvironment.getTableEnvironment() 用的方法 StreamExecutionEnvironmentExecutionEnvironment 与可选的 TableConfig 。该 TableConfig 可用于配置 TableEnvironment 或定制查询优化和翻译过程(参见 查询优化 )。

// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for streaming queries
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);

// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for batch queries
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
// ***************
// STREAMING QUERY
// *************** val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

// ***********
// BATCH QUERY
// *********** val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

在目录中注册表

A TableEnvironment 维护按名称注册的表的目录。有两种类型的表, 输入表 和 输出表 。输入表可以在 Table API 和 SQL 查询中引用,并提供输入数据。输出表可用于将 Table API 或 SQL 查询的结果发送到外部系统。

可以从各种来源注册输入表:

  • 现有 Table 对象,通常是 Table API 或 SQL 查询的结果。
  • a TableSource ,访问外部数据,例如文件,数据库或消息传递系统。
  • a DataStreamDataSet 来自 DataStream 或 DataSet 程序。注册一个 DataStreamDataSet与 DataStream 和 DataSet API 集成中 讨论。

可以使用 a 注册输出表 TableSink

注册表

A TableTableEnvironment 以下注册:

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Table is the result of a simple projection query 
Table projTable = tableEnv.scan("X").select(...);

// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable);
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table is the result of a simple projection query val projTable: Table = tableEnv.scan("X").select(...)

// register the Table projTable as table "projectedX" tableEnv.registerTable("projectedTable", projTable)

注意: 注册 Table 的处理方式 VIEW 与关系数据库系统中已知的类似,即定义的查询 Table 未经优化,但在另一个查询引用已注册的内容时将内联 Table 。如果多个查询引用相同的注册 Table ,这将被内联的每个引用的查询和执行多次,即,注册的结果 Table 将 不 被共享。

注册 TableSource

A TableSource 提供对外部数据的访问,存储在存储系统中,例如数据库(MySQL,HBase,...),具有特定编码的文件(CSV,Apache [Parquet,Avro,ORC] ......)或消息系统(Apache Kafka,RabbitMQ,......)。

Flink 旨在为常见的数据格式和存储系统提供 TableSource。请查看 Table Sources 和 Sinks 页面,获取受支持的 TableSource 列表以及如何构建自定义的说明 TableSource

A TableSourceTableEnvironment 以下注册:

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// create a TableSource
TableSource csvSource = new CsvTableSource("/path/to/file", ...);

// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource);
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

// create a TableSource val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)

// register the TableSource as table "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource)

注册 TableSink

已注册 TableSink 可用于将 Table API 或 SQL 查询的结果 发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(在不同的编码中,例如,CSV,Apache [Parquet] ,Avro,ORC],......)。

Flink 旨在为常见的数据格式和存储系统提供 TableSink。有关可用接收 的详细信息以及有关如何实现自定义的说明,请参阅有关“ 表源和接收器” 页面的文档 TableSink

A TableSinkTableEnvironment 以下注册:

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// create a TableSink
TableSink csvSink = new CsvTableSink("/path/to/file", ...);

// define the field names and types
String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};

// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

// create a TableSink val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)

// define the field names and types val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)

// register the TableSink as table "CsvSinkTable" tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

注册外部目录

外部目录可以提供有关外部数据库和表的信息,例如其名称,架构,统计信息以及有关如何访问存储在外部数据库,表或文件中的数据的信息。

可以通过实现 ExternalCatalog 接口创建外部目录,并在 TableEnvironment 以下内容中注册:

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// create an external catalog
ExternalCatalog catalog = new InMemoryExternalCatalog();

// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog);
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

// create an external catalog val catalog: ExternalCatalog = new InMemoryExternalCatalog

// register the ExternalCatalog catalog tableEnv.registerExternalCatalog("InMemCatalog", catalog)

一旦在 a 中注册,就可以通过指定其完整路径,例如,从 Table API 或 SQL 查询中访问 TableEnvironment a 中定义的所有表。 ExternalCatalog``catalog.database.table

目前,Flink 提供了 InMemoryExternalCatalog 演示和测试目的。但是,该 ExternalCatalog 接口也可用于将目录(如 HCatalog 或 Metastore)连接到 Table API。

查询表

Table API

Table API 是 Scala 和 Java 语言集成查询 API。与 SQL 相反,查询未指定为字符串,而是在宿主语言中逐步组合。

API 基于 Table 表示表(流或批处理)的类,并提供应用关系 算子操作的方法。这些方法返回一个新 Table 对象,它表示在输入上应用关系运算的结果 Table 。一些关系 算子操作由多个方法调用组成,例如 table.groupBy(...).select() ,其中 groupBy(...) 指定了分组 table ,以及 select(...) 对分组的 Projection table

Table API 文档介绍了支持流处理和批次表中的所有 Table API 算子操作。

以下示例显示了一个简单的 Table API 聚合查询:

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register Orders table

// scan registered Orders table
Table orders = tableEnv.scan("Orders");
// compute revenue for all customers from France
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// emit or convert Table
// execute query
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

// register Orders table 
// scan registered Orders table val orders = tableEnv.scan("Orders")
// compute revenue for all customers from France val revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName)
  .select('cID, 'cName, 'revenue.sum AS 'revSum)

// emit or convert Table // execute query

Note: The Scala Table API uses Scala Symbols, which start with a single tick ( ' ) to reference the attributes of a Table . The Table API uses Scala implicits. Make sure to import org.apache.flink.api.scala._ and org.apache.flink.table.api.scala._ in order to use Scala implicit conversions.

SQL

Flink 的 SQL 集成基于 Apache Calcite ,它实现了 SQL 标准。SQL 查询被指定为常规字符串。

SQL 文件描述 Flink 的流处理和批量表的 SQL 支持。

以下示例显示如何指定查询并将结果作为 a 返回 Table

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register Orders table

// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
  "SELECT cID, cName, SUM(revenue) AS revSum " +
  "FROM Orders " +
  "WHERE cCountry = 'FRANCE' " +
  "GROUP BY cID, cName"
  );

// emit or convert Table
// execute query
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

// register Orders table 
// compute revenue for all customers from France val revenue = tableEnv.sqlQuery("""
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// emit or convert Table // execute query

以下示例说明如何指定将其结果插入已注册表的更新查询。

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register "Orders" table
// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate(
  "INSERT INTO RevenueFrance " +
  "SELECT cID, cName, SUM(revenue) AS revSum " +
  "FROM Orders " +
  "WHERE cCountry = 'FRANCE' " +
  "GROUP BY cID, cName"
  );

// execute query
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

// register "Orders" table
// register "RevenueFrance" output table 
// compute revenue for all customers from France and emit to "RevenueFrance" tableEnv.sqlUpdate("""
  |INSERT INTO RevenueFrance
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// execute query

混合 Table API 和 SQL

Table API 和 SQL 查询可以轻松混合,因为它们都返回 Table 对象:

  • 可以在 Table SQL 查询返回的对象上定义 Table API 查询。
  • 通过在 SQL 查询的子句中 注册生成的表并 在其中 TableEnvironment 引用它,可以在 Table API 查询的结果上定义 FROM SQL 查询。

发射暴露一张表

一个 Table 由它写入发出 TableSink 。A TableSink 是支持各种文件格式(例如 CSV,Apache Parquet,Apache Avro),存储系统(例如,JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息传递系统(例如,Apache Kafka,RabbitMQ)的通用接口)。

批处理 Table 只能写入 a BatchTableSink ,而流式处理 Table 需要 a AppendStreamTableSink ,a RetractStreamTableSink 或 an UpsertStreamTableSink

有关可用接收 的详细信息以及如何实现自定义的说明,请参阅有关 表源和接收器 的文档 TableSink

有两种方法可以发出表格:

  1. Table.writeToSink(TableSink sink) 方法使用提供的方法发出表, TableSink 并使用要发出的表的模式自动配置接收器。
  2. Table.insertInto(String sinkTable) 方法查找 TableSinkTableEnvironment 目录中提供的名称下使用特定模式注册的方法。要发出的表的模式根据已注册的模式进行验证 TableSink

以下示例显示如何发出 Table

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// compute a result Table using  Table API operators and/or SQL queries
Table result = ...

// create a TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");

// METHOD 1:
//   Emit the result Table to the TableSink via the writeToSink() method
result.writeToSink(sink);

// METHOD 2:
//   Register the TableSink with a specific schema
String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
//   Emit the result Table to the registered TableSink via the insertInto() method
result.insertInto("CsvSinkTable");

// execute the program
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

// compute a result Table using  Table API operators and/or SQL queries val result: Table = ...

// create a TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")

// METHOD 1:
//   Emit the result Table to the TableSink via the writeToSink() method result.writeToSink(sink)

// METHOD 2:
//   Register the TableSink with a specific schema val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
//   Emit the result Table to the registered TableSink via the insertInto() method result.insertInto("CsvSinkTable")

// execute the program

翻译并执行查询

Table API 和 SQL 查询将转换为 DataStreamDataSet 程序,具体取决于它们的输入是流式还是批量输入。查询在内部表示为逻辑查询计划,并分为两个阶段:

  1. 优化逻辑计划,
  2. 转换为 DataStream 或 DataSet 程序。

在以下情况下转换 Table API 或 SQL 查询:

  • a Table 被发射到 a TableSink ,即何时 Table.writeToSink() 或被 Table.insertInto() 称为。
  • 指定了 SQL 更新查询,即 TableEnvironment.sqlUpdate() 调用时。
  • a Table 转换为 a DataStreamDataSet (请参阅 与 DataStream 和 DataSet API 集成 )。

一旦翻译,一 Table API 或 SQL 查询像一个普通的数据流中或数据集处理程序,当被执行 StreamExecutionEnvironment.execute() 或者 ExecutionEnvironment.execute() 被调用。

与 DataStream 和 DataSet API 集成

Table API 和 SQL 查询可以轻松集成并嵌入到 DataStreamDataSet 程序中。例如,可以查询外部表(例如来自 RDBMS),进行一些预处理,例如过滤,预测,聚合或关联元数据,然后使用 DataStream 或进一步处理数据。 DataSet API(以及在这些 API 之上构建的任何库,例如 CEP 或 Gelly)。相反, Table API 或 SQL 查询也可以应用于 DataStream 或 DataSet 程序的结果。

这种相互作用可以通过将 a DataStreamDataSet a 转换为 a 来实现, Table 反之亦然。在本节中,我们将介绍如何完成这些转换。

Scala 的隐式转换

Scala Table API 函数的隐式转换 DataSetDataStream 以及 Table 类。 org.apache.flink.table.api.scala._ 除了 org.apache.flink.api.scala._ Scala DataStream API 之外,还可以通过导入包来启用这些转换。

将 DataStream 或 DataSet 注册为表

A DataStreamDataSet 可以在 TableEnvironment 表中注册。结果表的模式取决于已注册 DataStream 或的数据类型 DataSet 。有关详细信息,请查看有关 将数据类型映射到表架构 的部分。

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
// get TableEnvironment
// registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// register the DataStream as Table "myTable" with fields "f0", "f1" tableEnv.registerDataStream("myTable", stream)

// register the DataStream as table "myTable2" with fields "myLong", "myString" tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)

注意:a 的名称 DataStream Table 必须与 ^_DataStreamTable_[0-9]+ 模式不匹配,并且 a 的名称 DataSet Table 必须与 ^_DataSetTable_[0-9]+ 模式不匹配。这些模式仅供内部使用。

将 DataStream 或 DataSet 转换为表

它也可以直接转换为 a 而不是注册 a DataStreamDataSet in 。如果要在 Table API 查询中使用 Table,这很方便。 TableEnvironment``Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);

// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
// get TableEnvironment
// registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// convert the DataStream into a Table with default fields '_1, '_2 val table1: Table = tableEnv.fromDataStream(stream)

// convert the DataStream into a Table with fields 'myLong, 'myString val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

将表转换为 DataStream 或 DataSet

A Table 可以转换为 DataStreamDataSet 。通过这种方式,可以在 Table API 或 SQL 查询的结果上运行自定义 DataStream 或 DataSet 程序。

当转换一个 TableDataStreamDataSet ,需要指定将所得的数据类型 DataStreamDataSet ,即,数据类型到其中的行 Table 是要被转换。通常最方便的转换类型是 Row 。以下列表概述了不同选项的函数:

  • Row :字段按位置,任意数量的字段映射,支持 null 值,无类型安全访问。
  • POJO :字段按名称映射(POJO 字段必须命名为 Table 字段),任意数量的字段,支持 null 值,类型安全访问。
  • Case Class :字段按位置映射,不支持 null 值,类型安全访问。
  • 元组 :字段按位置映射,限制为 22(Scala)或 25(Java)字段,不支持 null 值,类型安全访问。
  • 原子类型Table 必须具有单个字段,不支持 null 值,类型安全访问。

将表转换为 DataStream

一个 Table 是流处理查询的结果将动态更新,即它正在改变,因为新记录的查询的输入流到达。因此, DataStream 转换这种动态查询需要对表的更新进行编码。

将 a 转换 Table 为 a 有两种模式 DataStream

  1. 追加模式 :只有在动态 Table 仅通过 INSERT 更改修改时才能使用此模式,即它仅附加并且以前发出的结果永远不会更新。
  2. 缩进模式 :始终可以使用此模式。它用标志编码 INSERTDELETE 改变 boolean
  3. Java
  4. Scala
// get StreamTableEnvironment. 
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Table with two fields (String name, Integer age)
Table table = ...

// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);

// convert the Table into an append DataStream of Tuple2<String, Integer> 
//   via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
  tableEnv.toAppendStream(table, tupleType);

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream =
  tableEnv.toRetractStream(table, Row.class);
// get TableEnvironment.
// registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age) val table: Table = ...

// convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple =
  tableEnv.toAppendStream[(String, Int)](table)

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)].
//   The boolean field indicates the type of the change.
//   True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

注意: 有关动态表及其属性的详细讨论,请参阅 Streaming Queries 文档。

将表转换为 DataSet

A Table 转换 DataSet 为如下:

// get BatchTableEnvironment
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Table with two fields (String name, Integer age)
Table table = ...

// convert the Table into a DataSet of Row by specifying a class
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);

// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple =
  tableEnv.toDataSet(table, tupleType);
// get TableEnvironment
// registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age) val table: Table = ...

// convert the Table into a DataSet of Row val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

// convert the Table into a DataSet of Tuple2[String, Int] val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

将数据类型映射到表模式

Flink 的 DataStream 和 DataSet API 支持各种类型。复合类型(如 Tuples(内置 Scala 和 Flink Java 元组),POJO,Scala 案例类和 Flink 的 Row 类型)允许嵌套数据结构具有可在表表达式中访问的多个字段。其他类型被视为原子类型。在下文中,我们将描述 Table API 如何将这些类型转换为内部行表示,并显示将 a 转换 DataStream 为 a 的示例 Table

数据类型到表模式的映射可以以两种方式发生: 基于字段位置基于字段名称

基于位置的映射

基于位置的映射可用于在保持字段顺序的同时为字段提供更有意义的名称。此映射可用于 具有已定义字段顺序 和原子类型的复合数据类型。复合数据类型(如元组,行和案例类)具有此类字段顺序。但是,必须根据字段名称映射 POJO 的字段(请参阅下一节)。

定义基于位置的映射时,指定的名称不得存在于输入数据类型中,否则 API 将假定映射应基于字段名称进行。如果未指定字段名称,则使用复合类型的默认字段名称和字段顺序,或者 f0 使用原子类型。

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, Integer>> stream = ...

// convert DataStream into Table with default field names "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with field names "myLong" and "myInt"
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, Int)] = ...

// convert DataStream into Table with default field names "_1" and "_2" val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field names "myLong" and "myInt" val table: Table = tableEnv.fromDataStream(stream, 'myLong 'myInt)

基于名称的映射

基于名称的映射可用于任何数据类型,包括 POJO。它是定义表模式映射的最灵活方式。映射中的所有字段都按名称引用,并且可能使用别名重命名 as 。字段可以重新排序和预测。

如果未指定字段名称,则使用复合类型的默认字段名称和字段顺序,或者 f0 使用原子类型。

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, Integer>> stream = ...

// convert DataStream into Table with default field names "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with field "f1" only
Table table = tableEnv.fromDataStream(stream, "f1");

// convert DataStream into Table with swapped fields
Table table = tableEnv.fromDataStream(stream, "f1, f0");

// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, Int)] = ...

// convert DataStream into Table with default field names "_1" and "_2" val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field "_2" only val table: Table = tableEnv.fromDataStream(stream, '_2)

// convert DataStream into Table with swapped fields val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)

// convert DataStream into Table with swapped fields and field names "myInt" and "myLong" val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myInt, '_1 as 'myLong)

原子类型

Flink 认为原语( IntegerDoubleString )或通用类型作为原子类型(无法进行分析和分解类型)。A DataStreamDataSet 原子类型转换为 Table 具有单个属性的 a。从原子类型推断属性的类型,并且可以指定属性的名称。

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Long> stream = ...

// convert DataStream into Table with default field name "f0"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with field name "myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[Long] = ...

// convert DataStream into Table with default field name "f0" val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field name "myLong" val table: Table = tableEnv.fromDataStream(stream, 'myLong)

元组(Scala 和 Java)和案例类(仅限 Scala)

Flink 支持 Scala 的内置元组,并为 Java 提供自己的元组类。两种元组的 DataStream 和 DataSet 都可以转换为表。可以通过为所有字段提供名称(基于位置的映射)来重命名字段。如果未指定字段名称,则使用默认字段名称。如果原始字段名( f0f1 ,...为 Flink 元组和 _1_2 ...Scala 元组)被引用时,API 假设映射,而不是基于位置的基于域名的。基于名称的映射允许使用别名( as )重新排序字段和 Projection。

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// convert DataStream into Table with default field names "f0", "f1"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with renamed field names "myLong", "myString" (position-based)
Table table = tableEnv.fromDataStream(stream, "myLong, myString");

// convert DataStream into Table with reordered fields "f1", "f0" (name-based)
Table table = tableEnv.fromDataStream(stream, "f1, f0");

// convert DataStream into Table with projected field "f1" (name-based)
Table table = tableEnv.fromDataStream(stream, "f1");

// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'");
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// convert DataStream into Table with renamed default field names '_1, '_2 val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field names "myLong", "myString" (position-based) val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

// convert DataStream into Table with reordered fields "_2", "_1" (name-based) val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)

// convert DataStream into Table with projected field "_2" (name-based) val table: Table = tableEnv.fromDataStream(stream, '_2)

// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based) val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong)

// define case class case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...

// convert DataStream into Table with default field names 'name, 'age val table = tableEnv.fromDataStream(streamCC)

// convert DataStream into Table with field names 'myName, 'myAge (position-based) val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)

// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)

POJO(Java 和 Scala)

Flink 支持 POJO 作为复合类型。 这里 记录了决定 POJO 的规则。

当转换一个 POJO DataStreamDataSetTable 没有指定字段名,则使用原始 POJO 字段的名称。名称映射需要原始名称,不能通过职位来完成。可以使用别名(使用 as 关键字),重新排序和 Projection 来重命名字段。

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Person is a POJO with fields "name" and "age"
DataStream<Person> stream = ...

// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");

// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");

// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

// Person is a POJO with field names "name" and "age" val stream: DataStream[Person] = ...

// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!) val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed fields "myAge", "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)

// convert DataStream into Table with projected field "name" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name)

// convert DataStream into Table with projected and renamed field "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

Row

Row 数据类型支持字段和字段与任意数量的 null 值。字段名称可以通过指定 RowTypeInfo 或转化时 Row DataStreamDataSetTable 。行类型支持按位置和名称映射字段。可以通过为所有字段提供名称(基于位置的映射)或为 Projection/排序/重命名(基于名称的映射)单独选择字段来重命名字段。

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
DataStream<Row> stream = ...

// convert DataStream into Table with default field names "name", "age"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
Table table = tableEnv.fromDataStream(stream, "myName, myAge");

// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");

// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");

// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo` val stream: DataStream[Row] = ...

// convert DataStream into Table with default field names "name", "age" val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed field names "myName", "myAge" (position-based) val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)

// convert DataStream into Table with renamed fields "myName", "myAge" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)

// convert DataStream into Table with projected field "name" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name)

// convert DataStream into Table with projected and renamed field "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

查询优化

Apache Flink 利用 Apache Calcite 优化和翻译查询。当前执行的优化包括 Projection 和过滤器下推,子查询去相关以及其他类型的查询重写。Flink 尚未优化连接顺序,但是以与查询中定义的顺序相同的顺序执行它们( FROM 子句中的表的顺序和/或子句中连接谓词的顺序 WHERE )。

可以通过提供 CalciteConfig 对象来调整在不同阶段应用的优化规则集。这可以通过调用构建器创建,并通过调用 CalciteConfig.createBuilder()) 提供给 TableEnvironment tableEnv.getConfig.setCalciteConfig(calciteConfig)

解释表

Table API 提供了一种机制来解释计算 a 的逻辑和优化查询计划 Table 。这是通过该 TableEnvironment.explain(table) 方法完成的。它返回一个描述三个计划的 String:

  1. 关系查询的抽象语法树,即未优化的逻辑查询计划,
  2. 优化的逻辑查询计划,以及
  3. 物理执行计划。

以下代码显示了一个示例和相应的输出:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));

Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
  .where("LIKE(word, 'F%')")
  .unionAll(table2);

String explanation = tEnv.explain(table);
System.out.println(explanation);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
  .where('word.like("F%"))
  .unionAll(table2)

val explanation: String = tEnv.explain(table)
println(explanation)
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, 'F%')])
  LogicalTableScan(table=[[_DataStreamTable_0]])
  LogicalTableScan(table=[[_DataStreamTable_1]])

== Optimized Logical Plan ==
DataStreamUnion(union=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
  DataStreamScan(table=[[_DataStreamTable_0]])
  DataStreamScan(table=[[_DataStreamTable_1]])

== Physical Execution Plan ==
Stage 1 : Data Source
  content : collect elements with CollectionInputFormat

Stage 2 : Data Source
  content : collect elements with CollectionInputFormat

  Stage 3 : Operator
  content : from: (count, word)
  ship_strategy : REBALANCE

  Stage 4 : Operator
    content : where: (LIKE(word, 'F%')), select: (count, word)
    ship_strategy : FORWARD

    Stage 5 : Operator
    content : from: (count, word)
    ship_strategy : REBALANCE

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。