返回介绍

用户定义的源和接收器

发布于 2025-05-02 18:19:16 字数 24093 浏览 0 评论 0 收藏 0

A TableSource 提供对存储在外部系统(数据库,键值存储,消息队列)或文件中的数据的访问。 TableSourceTableEnvironment 中注册后 ,可以通过 Table API 或 SQL 查询访问它。

A TableSink 将表 发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(在不同的编码中,例如,CSV,Parquet 或 ORC)。

A TableFactory 允许将与外部系统的连接声明与实际实现分开。表工厂从规范化的基于字符串的属性创建表源和接收器的已配置实例。可以使用 SQL 客户端 Descriptor 或通过 YAML 配置文件以编程方式生成属性。

有关如何 注册 TableSource 以及如何 通过 TableSink 发出表的 详细信息,请查看 常见概念和 API 页面。有关如何使用工厂的示例 , 请参阅 内置源,接收器和格式 页面。

定义 TableSource

A TableSource 是一个通用接口,它使 Table API 和 SQL 查询可以访问存储在外部系统中的数据。它提供了表的模式以及使用表的模式映射到行的记录。根据 TableSource 是在流式查询还是批量查询中使用,记录将生成为 DataSetDataStream

如果 TableSource 在流式查询中使用 a,则必须实现该 StreamTableSource 接口,如果在批处理查询中使用它,则必须实现该 BatchTableSource 接口。A TableSource 还可以实现两个接口,并用于流式和批量查询。

StreamTableSourceBatchTableSource 扩展 TableSource 定义以下方法的基接口:

TableSource<T> {

  public TableSchema getTableSchema();

  public TypeInformation<T> getReturnType();

  public String explainSource();
}
TableSource[T] {

  def getTableSchema: TableSchema

  def getReturnType: TypeInformation[T]

  def explainSource: String

}
  • getTableSchema() :返回表的架构,即表的字段的名称和类型。字段类型使用 Flink 定义 TypeInformation (请参阅 Table API 类型SQL 类型 )。
  • getReturnType() :返回 DataStreamStreamTableSource )或 DataSetBatchTableSource )的物理类型以及由此生成的记录 TableSource
  • explainSource() :返回描述 TableSource 。的 String 。此方法是可选的,仅用于显示目的。

所述 TableSource 界面分离从物理类型的返回的逻辑表模式 DataStreamDataSet 。因此,表 schema( getTableSchema() )的所有字段都必须映射到具有相应类型的物理返回类型( getReturnType() )的字段。默认情况下,此映射基于字段名称完成。例如, TableSource 定义具有两个字段的表模式的 a [name: String, size: Integer] 要求 TypeInformation 具有至少两个被调用的字段 namesize 类型 String 和的字段 Integer 。这可能是一个 PojoTypeInfoRowTypeInfo 有两个名为领域 name ,并 size 与匹配的类型。

但是,某些类型(如 Tuple 或 CaseClass 类型)确实支持自定义字段名称。如果 a TableSource 返回 a DataStreamDataSet 具有固定字段名称的类型,则它可以实现 DefinedFieldMapping 接口以将字段名称从表模式映射到物理返回类型的字段名称。

定义 BatchTableSource

BatchTableSource 接口扩展了 TableSource 接口,并定义一个额外的方法:

BatchTableSource<T> extends TableSource<T> {

  public DataSet<T> getDataSet(ExecutionEnvironment execEnv);
}
BatchTableSource[T] extends TableSource[T] {

  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}
  • getDataSet(execEnv) :返回 DataSet 带有表数据的 a 。类型 DataSet 必须与 TableSource.getReturnType() 方法定义的返回类型相同。在 DataSet 通过使用常规创建罐 数据源 的数据集的 API。通常,a BatchTableSource 通过打包 InputFormat批处理连接器来实现

定义 StreamTableSource

StreamTableSource 接口扩展了 TableSource 接口,并定义一个额外的方法:

StreamTableSource<T> extends TableSource<T> {

  public DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
}
StreamTableSource[T] extends TableSource[T] {

  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
  • getDataStream(execEnv) :返回 DataStream 带有表数据的 a 。类型 DataStream 必须与 TableSource.getReturnType() 方法定义的返回类型相同。在 DataStream 通过使用常规罐创建 数据源 数据流的 API。通常,a StreamTableSource 通过打包 SourceFunction流连接器来实现

使用时间属性定义 TableSource

Table APISQL 查询的基于时间的 算子操作(例如窗口化聚合或连接)需要明确指定的 时间属性

A TableSource 将时间属性定义 Types.SQL_TIMESTAMP 为其表模式中的类型字段。与模式中的所有常规字段相比,时间属性不能与表源的返回类型中的物理字段匹配。相反,a TableSource 通过实现某个接口来定义时间属性。

定义处理时间属性

处理时间属性 通常用于流式查询。处理时间属性返回访问它的算子的当前挂钟时间。A TableSource 通过实现 DefinedProctimeAttribute 接口来定义处理时间属性。界面如下:

DefinedProctimeAttribute {

  public String getProctimeAttribute();
}
DefinedProctimeAttribute {

  def getProctimeAttribute: String
}
  • getProctimeAttribute() :返回处理时间属性的名称。必须 Types.SQL_TIMESTAMP 在表模式中定义指定的属性类型,并且可以在基于时间的 算子操作中使用该属性。一个 DefinedProctimeAttribute 表源可以通过返回没有定义的处理时间属性 null

注意两者 StreamTableSourceBatchTableSource 可以实现 DefinedProctimeAttribute 并定义的处理时间属性。在 BatchTableSource 处理时间字段的情况下,在表扫描期间使用当前时间戳初始化字段。

定义行时属性

Rowtime 属性 是类型的属性, TIMESTAMP 并在流和批处理查询中以统一的方式处理。

SQL_TIMESTAMP 可以通过指定将类型的表模式字段声明为 rowtime 属性

  • 该字段的名称,
  • a TimestampExtractor 计算属性的实际值(通常来自一个或多个其他字段),和
  • a WatermarkStrategy 指定如何为 rowtime 属性生成水印。

A TableSource 通过实现 DefinedRowtimeAttributes 接口来定义行时属性。界面如下:

DefinedRowtimeAttribute {

  public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors();
}
DefinedRowtimeAttributes {

  def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
}
  • getRowtimeAttributeDescriptors() :返回列表 RowtimeAttributeDescriptor 。A RowtimeAttributeDescriptor 描述了具有以下属性的 rowtime 属性:
    • attributeName :表架构中 rowtime 属性的名称。必须使用类型定义该字段 Types.SQL_TIMESTAMP
    • timestampExtractor :时间戳提取器从具有返回类型的记录中提取时间戳。例如,它可以将 Long 字段转换为时间戳或解析字符串编码的时间戳。Flink 附带了一组 TimestampExtractor 针对常见用例的内置实现。还可以提供自定义实现。
    • watermarkStrategy :水印策略定义如何为 rowtime 属性生成水印。Flink 附带了一组 WatermarkStrategy 针对常见用例的内置实现。还可以提供自定义实现。

注意虽然该 getRowtimeAttributeDescriptors() 方法返回描述符列表,但目前仅支持单个 rowtime 属性。我们计划在将来删除此限制,并支持具有多个 rowtime 属性的表。

注意两者, StreamTableSource 并且 BatchTableSource ,可以实现 DefinedRowtimeAttributes 并定义 rowtime 属性。在任何一种情况下,都使用提取行时字段 TimestampExtractor 。因此, TableSource 实现 StreamTableSourceBatchTableSource 定义行时属性的实现为流和批处理查询提供完全相同的数据。

提供时间戳提取器

Flink 提供 TimestampExtractor 了常见用例的实现。

TimestampExtractor 目前提供以下实现:

  • ExistingField(fieldName) :提取一个 rowtime 属性的从现有的值 LONGSQL_TIMESTAMP 或时间戳记格式化 STRING 字段。这种字符串的一个例子是'2018-05-28 12:34:56.000'。
  • StreamRecordTimestamp() :从时间戳中提取 rowtime 属性的值 DataStream StreamRecord 。请注意,这 TimestampExtractor 不适用于批处理表源。

TimestampExtractor 可以通过实现相应的接口来定义自定义。

提供水印策略

Flink 提供 WatermarkStrategy 了常见用例的实现。

WatermarkStrategy 目前提供以下实现:

  • AscendingTimestamps :用于提升时间戳的水印策略。带有无序时间戳的记录将被视为迟到。
  • BoundedOutOfOrderTimestamps(delay) :时间戳的水印策略,指定的延迟最多是无序的。
  • PreserveWatermarks() :一种策略,指示应从底层保存水印 DataStream

WatermarkStrategy 可以通过实现相应的接口来定义自定义。

使用 Projection Push-Down 定义 TableSource

A TableSource 通过实现 ProjectableTableSource 接口支持 Projection 下推。该接口定义了一个方法:

ProjectableTableSource<T> {

  public TableSource<T> projectFields(int[] fields);
}
ProjectableTableSource[T] {

  def projectFields(fields: Array[Int]): TableSource[T]
}
  • projectFields(fields) :返回一个 副本 的的 TableSource 与调整身体返回类型。该 fields 参数提供必须由提供的字段的索引 TableSource 。索引与 TypeInformation 物理返回类型有关, 而 与逻辑表模式 无关 。复制 TableSource 必须调整其返回类型和返回的 DataStreamDataSet 。在 TableSchema 复制的 TableSource 不能改变,即它必须跟原来一样 TableSource 。如果 TableSource 实现 DefinedFieldMapping 接口,则必须将字段映射调整为新的返回类型。

ProjectableTableSource 增加支持项目平场。如果 TableSource 使用嵌套模式定义表,则可以实现 NestedFieldsProjectableTableSource 将 Projection 扩展到嵌套字段。的 NestedFieldsProjectableTableSource 定义如下:

NestedFieldsProjectableTableSource<T> {

  public TableSource<T> projectNestedFields(int[] fields, String[][] nestedFields);
}
NestedFieldsProjectableTableSource[T] {

  def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]]): TableSource[T]
}
  • projectNestedField(fields, nestedFields) :返回一个 副本 的的 TableSource 与调整身体返回类型。可以删除或重新排序物理返回类型的字段,但不得更改其类型。该方法的合同与该方法基本相同 ProjectableTableSource.projectFields() 。此外,该 nestedFields 参数包含 fields 列表中每个字段索引,该列表指向查询访问的所有嵌套字段的路径。所有其他嵌套字段不需要在由 TableSource 。生成的记录中读取,解析和设置。 重要信息 不得更改 Projection 字段的类型,但可以将未使用的字段设置为空或默认值。

使用过滤器下推定义 TableSource

FilterableTableSource 界面增加了对过滤器下推的支持 TableSource 。一个 TableSource 扩展这个接口能够过滤记录,从而使返回 DataStream 或者 DataSet 返回较少的记录。

界面如下:

FilterableTableSource<T> {

  public TableSource<T> applyPredicate(List<Expression> predicates);

  public boolean isFilterPushedDown();
}
FilterableTableSource[T] {

  def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]

  def isFilterPushedDown: Boolean
}
  • applyPredicate(predicates) :返回一个 副本 的 TableSource 添加了谓词。该 predicates 参数是一个可变的联合谓词列表,它被“提供”给 TableSource 。在 TableSource 受理用户从列表中删除它来评估一个谓语。列表中剩余的谓词将由后续过滤器 算子进行评估。
  • isFilterPushedDown() :如果 applyPredicate() 之前调用该方法,则返回 true 。因此, isFilterPushedDown() 必须对 TableSourceapplyPredicate() 调用返回的所有实例返回 true 。

定义 TableSink

A TableSink 指定如何向 Table 外部系统或位置发射。该接口是通用的,因此它可以支持不同的存储位置和格式。批处理表和流表有不同的表接收器。

一般界面如下所示:

TableSink<T> {

  public TypeInformation<T> getOutputType();

  public String[] getFieldNames();

  public TypeInformation[] getFieldTypes();

  public TableSink<T> configure(String[] fieldNames, TypeInformation[] fieldTypes);
}
TableSink[T] {

  def getOutputType: TypeInformation<T>

  def getFieldNames: Array[String]

  def getFieldTypes: Array[TypeInformation]

  def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation]): TableSink[T]
}

TableSink#configure 调用该方法以传递表的模式(字段名称和类型)以发送到 TableSink 。该方法必须返回 TableSink 的新实例,该实例配置为发出提供的表模式。

BatchTableSink

定义外部 TableSink 以发出批处理表。

界面如下:

BatchTableSink<T> extends TableSink<T> {

  public void emitDataSet(DataSet<T> dataSet);
}
BatchTableSink[T] extends TableSink[T] {

  def emitDataSet(dataSet: DataSet[T]): Unit
}

AppendStreamTableSink

定义外部 TableSink 以发出仅具有插入更改的流表。

界面如下:

AppendStreamTableSink<T> extends TableSink<T> {

  public void emitDataStream(DataStream<T> dataStream);
}
AppendStreamTableSink[T] extends TableSink[T] {

  def emitDataStream(dataStream: DataStream<T>): Unit
}

如果还通过更新或删除更改来修改表, TableException 则将抛出 a。

RetractStreamTableSink

定义外部 TableSink 以发出包含插入,更新和删除更改的流表。

界面如下:

RetractStreamTableSink<T> extends TableSink<Tuple2<Boolean, T>> {

  public TypeInformation<T> getRecordType();

  public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {

  def getRecordType: TypeInformation[T]

  def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}

该表将被转换为累积和撤销消息流,这些消息被编码为 Java Tuple2 。第一个字段是一个布尔标志,用于指示消息类型( true 表示插入, false 表示删除)。第二个字段保存所请求类型的记录 T

UpsertStreamTableSink

定义外部 TableSink 以发出包含插入,更新和删除更改的流表。

界面如下:

UpsertStreamTableSink<T> extends TableSink<Tuple2<Boolean, T>> {

  public void setKeyFields(String[] keys);

  public void setIsAppendOnly(boolean isAppendOnly);

  public TypeInformation<T> getRecordType();

  public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {

  def setKeyFields(keys: Array[String]): Unit

  def setIsAppendOnly(isAppendOnly: Boolean): Unit

  def getRecordType: TypeInformation[T]

  def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}

该表必须具有唯一的键字段(原子或复合)或仅附加。如果表没有唯一键并且不是仅附加, TableException 则将抛出 a。表的唯一键由该 UpsertStreamTableSink#setKeyFields() 方法配置。

该表将被转换为 upsert 和 delete 消息流,这些消息被编码为 Java Tuple2 。第一个字段是一个布尔标志,用于指示消息类型。第二个字段保存所请求类型的记录 T

具有 true 布尔字段的消息是已配置 Keys 的 upsert 消息。带有错误标志的消息是已配置 Keys 的删除消息。如果表是仅附加的,则所有消息都将具有 true 标志,并且必须解释为插入。

定义 TableFactory

A TableFactory 允许从基于字符串的属性创建不同的表相关实例。调用所有可用工厂以匹配给定的属性集和相应的工厂类。

工厂利用 Java 服务提供者接口(SPI) 进行发现。这意味着每个依赖项和 JAR 文件都应该 org.apache.flink.table.factories.TableFactoryMETA_INF/services 资源目录中包含一个文件,该文件列出了它提供的所有可用表工厂。

每个表工厂都需要实现以下接口:

package org.apache.flink.table.factories;

interface TableFactory {

  Map<String, String> requiredContext();

  List<String> supportedProperties();
}
package org.apache.flink.table.factories

trait TableFactory {

  def requiredContext(): util.Map[String, String]

  def supportedProperties(): util.List[String]
}
  • requiredContext() :指定已为此工厂实现的上下文。如果满足指定的属性和值集,框架保证仅匹配此工厂。典型属性可能是 connector.typeformat.typeupdate-mode 。诸如 connector.property-version 和的属性键 format.property-version 保存用于将来的向后兼容性情况。
  • supportedProperties :此工厂可以处理的属性键列表。此方法将用于验证。如果传递了该工厂无法处理的属性,则会抛出异常。该列表不得包含上下文指定的键。

为了创建特定实例,工厂类可以实现以下提供的一个或多个接口 org.apache.flink.table.factories

  • BatchTableSourceFactory :创建批处理表源。
  • BatchTableSinkFactory :创建批处理表接收器。
  • StreamTableSoureFactory :创建流表源。
  • StreamTableSinkFactory :创建流表接收器。
  • DeserializationSchemaFactory :创建反序列化架构格式。
  • SerializationSchemaFactory :创建序列化架构格式。

工厂的发现发生在多个阶段:

  • 发现所有可用的工厂。
  • 按工厂类别过滤(例如 StreamTableSourceFactory )。
  • 通过匹配上下文过滤。
  • 按支持的属性过滤。
  • 验证一个工厂是否匹配,否则抛出一个 AmbiguousTableFactoryExceptionNoMatchingTableFactoryException

以下示例显示如何为参数化提供带有附加 connector.debug 属性标志的自定义流式源。

import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

class MySystemTableSourceFactory extends StreamTableSourceFactory<Row> {

  @Override
  public Map<String, String> requiredContext() {
  Map<String, String> context = new HashMap<>();
  context.put("update-mode", "append");
  context.put("connector.type", "my-system");
  return context;
  }

  @Override
  public List<String> supportedProperties() {
  List<String> list = new ArrayList<>();
  list.add("connector.debug");
  return list;
  }

  @Override
  public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
  boolean isDebug = Boolean.valueOf(properties.get("connector.debug"));

  # additional validation of the passed properties can also happen here

  return new MySystemAppendTableSource(isDebug);
  }
}
import java.util
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.types.Row

class MySystemTableSourceFactory extends StreamTableSourceFactory[Row] {

  override def requiredContext(): util.Map[String, String] = {
  val context = new util.HashMap[String, String]()
  context.put("update-mode", "append")
  context.put("connector.type", "my-system")
  context
  }

  override def supportedProperties(): util.List[String] = {
  val properties = new util.ArrayList[String]()
  properties.add("connector.debug")
  properties
  }

  override def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[Row] = {
  val isDebug = java.lang.Boolean.valueOf(properties.get("connector.debug"))

  # additional validation of the passed properties can also happen here

  new MySystemAppendTableSource(isDebug)
  }
}

在 SQL 客户端中使用 TableFactory

在 SQL 客户端环境文件中,先前显示的工厂可以声明为:

tables:
 - name: MySystemTable
   type: source
   update-mode: append
   connector:
   type: my-system
   debug: true

YAML 文件被转换为扁平字符串属性,并使用描述与外部系统的连接的属性调用表工厂:

update-mode=append
connector.type=my-system
connector.debug=true

注意属性,例如 tables.#.name 或是 tables.#.type SQL 客户端细节,不会传递给任何工厂。该 type 属性决定,取决于运行环境,无论是 BatchTableSourceFactory / StreamTableSourceFactory (对 source ),一个 BatchTableSinkFactory / StreamTableSinkFactory (对于 sink ),或两者(对 both )需要发现。

在 Table&SQL API 中使用 TableFactory

对于具有解释性 Scaladoc / Javadoc 的类型安全的编程方法,Table&SQL API 提供 org.apache.flink.table.descriptors 了转换为基于字符串的属性的描述符。请参阅源,接收器和格式的 内置描述符 作为参考。

MySystem 我们示例中的连接器可以扩展 ConnectorDescriptor ,如下所示:

import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.DescriptorProperties;

/**
  * Connector to MySystem with debug mode.
  */
public class MySystemConnector extends ConnectorDescriptor {

  public final boolean isDebug;

  public MySystemConnector(boolean isDebug) {
  super("my-system", 1, false);
  this.isDebug = isDebug;
  }

  @Override
  public void addConnectorProperties(DescriptorProperties properties) {
  properties.putString("connector.debug", Boolean.toString(isDebug));
  }
}
import org.apache.flink.table.descriptors.ConnectorDescriptor
import org.apache.flink.table.descriptors.DescriptorProperties

/**
  * Connector to MySystem with debug mode.
  */
class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, formatNeeded = false) {

  override protected def addConnectorProperties(properties: DescriptorProperties): Unit = {
  properties.putString("connector.debug", isDebug.toString)
  }
}

然后可以在 API 中使用描述符,如下所示:

StreamTableEnvironment tableEnv = // ...

tableEnv
  .connect(new MySystemConnector(true))
  .inAppendMode()
  .registerTableSource("MySystemTable");
val tableEnv: StreamTableEnvironment = // ... 
tableEnv
  .connect(new MySystemConnector(isDebug = true))
  .inAppendMode()
  .registerTableSource("MySystemTable")

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。