返回介绍

流处理概念

发布于 2025-05-02 18:19:15 字数 20774 浏览 0 评论 0 收藏

Flink 的 Table API 和 SQL 支持 是用于批处理和流处理的统一 API。这意味着 Table API 和 SQL 查询具有相同的语义,无论它们的输入是有界批量输入还是无界流输入。因为关系代数和 SQL 最初是为批处理而设计的,所以关于无界流输入的关系查询不像有界批输入上的关系查询那样容易理解。

在此页面上,我们将解释 Flink 关于流数据的关系 API 的概念,实际限制和特定于流的配置参数。

数据流上的关系查询

SQL 和关系代数的设计并未考虑流数据。因此,关系代数(和 SQL)与流处理之间的概念差距很小。

关系代数/ SQL流处理
关系(或表)是有界(多)元组的集合。流是无限的元组序列。
对批处理数据执行的查询(例如,关系数据库中的表)可以访问完整的输入数据。流式查询在启动时无法访问所有数据,必须“等待”流式传输数据。
批处理查询在生成固定大小的结果后终止。流式查询会根据收到的记录不断更新其结果,并且永远不会完成。

尽管存在这些差异,但使用关系查询和 SQL 处理流并非不可能。高级关系数据库系统提供称为 物化视图 的函数。物化视图定义为 SQL 查询,就像常规虚拟视图一样。与虚拟视图相比,物化视图缓存查询的结果,使得在访问视图时不需要评估查询。缓存的一个常见挑战是阻止缓存提供过时的结果。实例化视图在修改其定义查询的基表时会过时。 Eager View Maintenance 是一种更新 实体 化视图并在其基表更新后立即更新 实体化视图 的技术。

如果我们考虑以下内容,急切的视图维护和流上的 SQL 查询之间的联系就变得很明显:

  • 数据库表是一个结果 流 的 INSERTUPDATEDELETE DML 语句,通常被称为 更新日志流 。
  • 物化视图定义为 SQL 查询。为了更新视图,查询将持续处理视图基本关系的更改日志流。
  • 物化视图是流式 SQL 查询的结果。

考虑到这些要点,我们将在下一节介绍 Flink 的 动态表 概念。

动态表和连续查询

动态表 是 Flink 的 Table API 和 SQL 支持流数据的核心概念。与表示批处理数据的静态表相比,动态表随时间而变化。可以像静态批处理表一样查询它们。查询动态表会产生 连续查询 。连续查询永远不会终止并生成动态表作为结果。查询不断更新其(动态)结果表以反映其输入(动态)表的更改。实质上,对动态表的连续查询与物化视图的定义查询非常相似。

值得注意的是,连续查询的结果始终在语义上等同于在输入表的 SNAPSHOT 上以批处理模式执行的相同查询的结果。

下图显示了流,动态表和连续查询的关系:

![动态表格](img/stream-query-stream.png)

  1. 流转换为动态表。
  2. 在动态表上评估连续查询,生成新的动态表。
  3. 生成的动态表将转换回流。

注意: 动态表首先是一个逻辑概念。在查询执行期间,动态表不一定(完全)实现。

在下文中,我们将使用具有以下模式的单击事件流来解释动态表和连续查询的概念:

[ 
  user:  VARCHAR,   // the name of the user
  cTime: TIMESTAMP, // the time when the URL was accessed
  url:   VARCHAR  // the URL that was accessed by the user
]

在流上定义表

为了使用关系查询处理流,必须将其转换为 Table 。从概念上讲,流的每个记录都被解释为 INSERT 对结果表的修改。基本上,我们正在从一个 INSERT -only changelog 流构建一个表。

下图显示了 click 事件流(左侧)如何转换为表(右侧)。随着更多点击流记录的插入,生成的表不断增长。

![追加模式](img/append-mode.png)

注意: 在流上定义的表在内部未实现。

连续查询

在动态表上计算连续查询,并生成新的动态表作为结果。与批处理查询相反,连续查询永远不会根据其输入表上的更新终止并更新其结果表。在任何时间点,连续查询的结果在语义上等同于在输入表的 SNAPSHOT 上以批处理模式执行的相同查询的结果。

在下文中,我们 clicks 将在对单击事件流定义的表上显示两个示例查询。

第一个查询是一个简单的 GROUP-BY COUNT 聚合查询。这组 clicks 对表 user 字段和计数访问的网址的数量。下图显示了在 clicks 使用其他行更新表时,如何评估查询。

![连续非窗口查询](img/table-streaming/query-groupBy-cnt.png)

查询启动时, clicks 表(左侧)为空。当第一行插入表中时,查询开始计算结果 clicks 表。 [Mary, ./home] Insert 第一行后,结果表(右侧,顶部)由一行组成 [Mary, 1] 。当第二行 [Bob, ./cart] Insert clicks 表中时,查询将更新结果表并插入新行 [Bob, 1] 。第三行 [Mary, ./prod?id=1] 产生已更新的已计算结果行的 [Mary, 1] 更新 [Mary, 2] 。最后, [Liz, 1] 当第四行附加到 clicks 表时,查询将第三行插入到结果表中。

第二个查询类似于第一个查询,但 clicks 除了 user 属性之外还在 每小时滚动窗口 上对表进行分组,然后计算 URL 的数量(基于时间的计算,例如窗口基于特殊 时间属性 ,这将在下面讨论) )。同样,该图显示了不同时间点的输入和输出,以显示动态表的变化性质。

![连续组窗口查询](img/table-streaming/query-groupBy-window-cnt.png)

和以前一样,输入表 clicks 显示在左侧。查询每小时连续计算结果并更新结果表。点击表包含四行,时间戳( cTime )位于 12:00:00 和之间 12:59:59 。查询从此输入计算两个结果行(每个一行 user )并将它们附加到结果表。对于 13:00:00 和之间的下一个窗口 13:59:59 ,该 clicks 表包含三行,这导致另外两行被追加到结果表中。结果表已更新, clicks 随着时间的推移会附加更多行。

更新并附加查询

尽管两个示例查询看起来非常相似(都计算了分组计数聚合),但它们在一个重要方面有所不同:

  • 第一个查询更新以前发出的结果,即定义结果表的更改日志流包含 INSERTUPDATE 更改。
  • 第二个查询仅附加到结果表,即结果表的更改日志流仅包含 INSERT 更改。

查询是生成仅附加表还是更新表有一些含义:

  • 产生更新更改的查询通常必须保持更多状态(请参阅下一节)。
  • 将仅附加表转换为流与更新表的转换不同(请参阅 表到流转换 部分)。

查询限制

可以将许多(但不是全部)语义上有效的查询评估为对流的连续查询。有些查询的计算成本太高,或者是由于需要维护的状态大小,或者计算更新太昂贵。

  • 状态大小: 连续查询在无界流上进行评估,通常应该运行数周或数月。因此,连续查询处理的数据总量可能非常大。必须更新先前发出的结果的查询需要维护所有发出的行,以便能够更新它们。例如,第一个示例查询需要存储每个用户的 URL 计数,以便能够增加计数,并在输入表收到新行时发送新结果。如果仅跟踪注册用户,则要维护的计数可能不会太高。但是,如果未注册的用户分配了唯一的用户名,则要维护的计数数将随着时间的推移而增长,并最终可能导致查询失败。
SELECT user, COUNT(url)
FROM clicks
GROUP BY user;
  • 计算更新: 即使只添加或更新了单个输入记录,某些查询也需要重新计算和更新大部分发出的结果行。显然,这样的查询不适合作为连续查询执行。一个示例是以下查询,该查询 RANK 基于最后一次点击的时间计算每个用户 a 。一旦 clicks 表格收到新行, lastAction 就会更新用户,并且必须计算新的排名。但是,由于两行不能具有相同的等级,因此所有排名较低的行也需要更新。
SELECT user, RANK() OVER (ORDER BY lastLogin)
FROM (
  SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);

所述 QueryConfig 节讨论的参数来控制的连续查询的执行。一些参数可用于交换维持状态的大小以获得结果准确性。

表到流转换

动态表可以通过不断修改 INSERTUPDATE 以及 DELETE 变化就像一个普通的数据库表。它可能是一个包含单行的表,它不断更新,只有一个插入表,no / not UPDATEDELETE 修改,或者介于两者之间。

将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink 的 Table API 和 SQL 支持三种编码动态表更改的方法:

  • 仅附加流: 只能通过 INSERT 更改修改的动态表可以通过发出插入的行转换为流。
  • 撤消 流: 撤消流是具有两种类型的消息的流, 添加消息 和 撤消消息 。通过将 INSERT 更改编码为添加消息,将 DELETE 更改编码为收回消息,将 UPDATE 更改编码为更新(先前)行的收回消息和更新(新)行的添加消息,将动态表转换为收回流。下图显示了动态表到回收流的转换。

![动态表格](img/table-streaming/undo-redo-mode.png)

  • Upsert 流: upsert 流是一种包含两种消息, upsert 消息 和 删除消息的流 。转换为 upsert 流的动态表需要(可能是复合的)唯一键。具有唯一键的动态表通过编码转换为动态表, INSERTUPDATE 更改为 upsert 消息并 DELETE 更改为删除消息。流消耗 算子需要知道唯一键属性才能正确应用消息。与收回流的主要区别在于, UPDATE 使用单个消息对更改进行编码,因此更有效。下图显示了动态表到 upsert 流的转换。

![动态表格](img/table-streaming/redo-mode.png)

DataStreamCommon Concepts 页面上讨论了将动态表转换为 a 的 API 。请注意,将动态表格转换为 a 时,仅支持附加和撤消流 DataStreamTableSources 和 TableSinks 页面 TableSink 讨论了向外部系统发出动态表的接口。

时间属性

Flink 能够根据不同的 时间 概念处理流数据。

  • 处理时间 是指执行相应 算子操作的机器的系统时间(也称为“挂钟时间”)。
  • 事件时间 是指基于附加到每一行的时间戳来处理流数据。时间戳可以在事件发生时进行编码。
  • 摄取时间 是事件进入 Flink 的时间; 在内部,它与事件时间类似。

有关 Flink 中时间处理的更多信息,请参阅有关 事件时间和水印的简介

表程序要求为流环境指定相应的时间特性:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default 
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Table APISQL 中 基于时间的 算子操作(如窗口)需要有关时间概念及其来源的信息。因此,表可以提供 逻辑时间属性, 用于指示时间和访问表程序中的相应时间戳。

时间属性可以是每个表模式的一部分。它们是在从 a 创建表时定义的, DataStream 或在使用时预定义的 TableSource 。一旦在开头定义了时间属性,它就可以作为字段引用,并可以用于基于时间的 算子操作。

只要时间属性未被修改并且只是从查询的一部分转发到另一部分,它仍然是有效的时间属性。时间属性的行为类似于常规时间戳,可以访问以进行计算。如果在计算中使用时间属性,则它将具体化并成为常规时间戳。常规时间戳不与 Flink 的时间和水印系统配合,因此不能再用于基于时间的 算子操作。

处理时间

处理时间允许表程序根据本地机器的时间产生结果。这是最简单的时间概念,但不提供决定论。它既不需要时间戳提取也不需要水印生成。

有两种方法可以定义处理时间属性。

在 DataStream 到表转换期间

处理时间属性是 .proctime 在架构定义期间使用属性定义的。time 属性必须仅通过附加逻辑字段扩展物理模式。因此,它只能在模式定义的末尾定义。

DataStream<Tuple2<String, String>> stream = ...;

// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
val stream: DataStream[(String, String)] = ...

// declare an additional logical field as a processing time attribute val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)

val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

使用 TableSource

处理时间属性由 TableSource 实现 DefinedProctimeAttribute 接口的 a 定义。逻辑时间属性附加到由返回类型定义的物理模式 TableSource

// define a table source with a processing attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {

  @Override
  public TypeInformation<Row> getReturnType() {
    String[] names = new String[] {"Username" , "Data"};
    TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    return Types.ROW(names, types);
  }

  @Override
  public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    // create stream 
    DataStream<Row> stream = ...;
    return stream;
  }

  @Override
  public String getProctimeAttribute() {
    // field with this name will be appended as a third field 
    return "UserActionTime";
  }
}

// register table source
tEnv.registerTableSource("UserActions", new UserActionSource());

WindowedTable windowedTable = tEnv
  .scan("UserActions")
  .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
// define a table source with a processing attribute class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {

  override def getReturnType = {
    val names = Array[String]("Username" , "Data")
    val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    Types.ROW(names, types)
  }

  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    // create stream     val stream = ...
    stream
  }

  override def getProctimeAttribute = {
    // field with this name will be appended as a third field     "UserActionTime"
  }
}

// register table source tEnv.registerTableSource("UserActions", new UserActionSource)

val windowedTable = tEnv
  .scan("UserActions")
  .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

活动时间

事件时间允许表程序根据每个记录中包含的时间生成结果。即使在无序事件或延迟事件的情况下,这也允许一致的结果。当从持久存储中读取记录时,它还确保表程序的可重放结果。

此外,事件时间允许批处理和流式环境中的表程序的统一语法。流式传输环境中的时间属性可以是批处理环境中的记录的常规字段。

为了处理乱序事件并区分流处理中的准时和迟到事件,Flink 需要从事件中提取时间戳并在时间上做出某种进展(所谓的 水印 )。

可以在 DataStream-to-Table 转换期间或使用 TableSource 定义事件时间属性。

在 DataStream 到表转换期间

.rowtime 在架构定义期间使用属性定义事件时间属性。必须在转换后分配 时间戳和水印 DataStream

将 a 转换 DataStream 为 a 时,有两种定义时间属性的方法 Table 。根据指定的 .rowtime 字段名称是否存在于模式中 DataStream ,时间戳字段也是

  • 作为新架构附加到架构或
  • 替换现有字段。

在任何一种情况下,事件时间时间戳字段都将保存事件时间时间戳的值 DataStream

// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");

// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");

// Usage:

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
// Option 1: 
// extract timestamp and assign watermarks based on knowledge of the stream val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// declare an additional logical field as an event time attribute val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)

// Option 2: 
// extract timestamp from first field, and assign watermarks based on knowledge of the stream val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)

// Usage: 
val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

使用 TableSource

事件时间属性由 TableSource 实现 DefinedRowtimeAttribute 接口的 a 定义。该 getRowtimeAttribute() 方法返回一个现有字段的名称,该字段包含表的事件时间属性,并且是类型 LONGTIMESTAMP

此外,方法 DataStream 返回的 getDataStream() 必须分配与定义的时间属性对齐的水印。请注意,忽略( DataStream 由 a 分配的 TimestampAssigner )时间戳。只有 TableSource 's rowtime 属性的值是相关的。

// define a table source with a rowtime attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute {

  @Override
  public TypeInformation<Row> getReturnType() {
    String[] names = new String[] {"Username", "Data", "UserActionTime"};
    TypeInformation[] types =
      new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
    return Types.ROW(names, types);
  }

  @Override
  public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    // create stream 
    // ...
    // assign watermarks based on the "UserActionTime" attribute
    DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
    return stream;
  }

  @Override
  public String getRowtimeAttribute() {
    // Mark the "UserActionTime" attribute as event-time attribute.
    return "UserActionTime";
  }
}

// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource());

WindowedTable windowedTable = tEnv
  .scan("UserActions")
  .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
// define a table source with a rowtime attribute class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribute {

  override def getReturnType = {
    val names = Array[String]("Username" , "Data", "UserActionTime")
    val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
    Types.ROW(names, types)
  }

  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    // create stream     // ...     // assign watermarks based on the "UserActionTime" attribute     val stream = inputStream.assignTimestampsAndWatermarks(...)
    stream
  }

  override def getRowtimeAttribute = {
    // Mark the "UserActionTime" attribute as event-time attribute.     "UserActionTime"
  }
}

// register the table source tEnv.registerTableSource("UserActions", new UserActionSource)

val windowedTable = tEnv
  .scan("UserActions")
  .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

查询配置

Table API 和 SQL 查询具有相同的语义,无论它们的输入是有界批量输入还是无界流输入。在许多情况下,对流输入的连续查询能够计算与离线计算结果相同的准确结果。然而,这在一般情况下是不可能的,因为连续查询必须限制它们维护的状态的大小,以避免耗尽存储并且能够在很长一段时间内处理无界流数据。因此,连续查询可能只能提供近似结果,具体取决于输入数据和查询本身的特征。

Flink 的 Table API 和 SQL 接口提供参数来调整连续查询的准确性和资源消耗。参数通过 QueryConfig 对象指定。在 QueryConfig 可以从获得 TableEnvironment 和被传递回一个时 Table 被转换,即,当它被 变换成数据流经由 TableSink 发射

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));

// define query
Table result = ...

// create TableSink
TableSink<Row> sink = ...

// emit result Table via a TableSink
result.writeToSink(sink, qConfig);

// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// obtain query configuration from TableEnvironment val qConfig: StreamQueryConfig = tableEnv.queryConfig
// set query parameters qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))

// define query val result: Table = ???

// create TableSink val sink: TableSink[Row] = ???

// emit result Table via a TableSink result.writeToSink(sink, qConfig)

// convert result Table into a DataStream[Row] val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)

在下文中,我们将描述它们的参数 QueryConfig 以及它们如何影响查询的准确性和资源消耗。

空闲状态保存时间

许多查询在一个或多个键属性上聚合或连接记录。当在流上执行此类查询时,连续查询需要收集记录或维护每个键的部分结果。如果输入流的关键域正在发展,即活动键值随时间变化,则随着观察到越来越多的不同键,连续查询累积越来越多的状态。但是,经常在一段时间后 Keys 变为非活动状态,并且它们的相应状态变得陈旧且无用。

例如,以下查询计算每个会话的单击次数。

SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;

sessionId 属性用作分组键,连续查询维护 sessionId 其观察的每个键的计数。该 sessionId 属性随着时间的推移而发展,并且 sessionId 值仅在会话结束之前有效,即,在有限的时间段内。但是,连续查询无法知道此属性, sessionId 并期望每个 sessionId 值都可以在任何时间点发生。它维护每个观察 sessionId 值的计数。因此,随着 sessionId 观察到越来越多的值,查询的总状态大小不断增长。

在 空闲状态保持时间 参数定义多久一个键的状态得以保持,而它被删除之前被更新。对于前面的示例查询, sessionId 只要在配置的时间段内没有更新,就会删除 a 的计数。

通过删除键的状态,连续查询完全忘记它之前已经看过这个键。如果处理了具有其状态已被删除的 Keys 的记录,则该记录将被视为具有相应 Keys 的第一个记录。对于上面的例子,这意味着 a 的计数 sessionId 将再次开始 0

配置空闲状态保存时间有两个参数:

  • 最小空闲状态保持时间 定义多久它被删除之前非活动键的状态至少保持。
  • 在 最大空闲状态保持时间 定义多久它被删除前的非激活键的状态最多保持。

参数指定如下:

StreamQueryConfig qConfig = ...

// set idle state retention time: min = 12 hours, max = 24 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
val qConfig: StreamQueryConfig = ???

// set idle state retention time: min = 12 hours, max = 24 hours qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))

清理状态需要额外副本记录,而且记录使得 minTimemaxTime 不是那么浪费时间和空间。 minTime 和之间的差异 maxTime 必须至少为 5 分钟。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。