返回介绍

连接到外部系统

发布于 2025-05-02 18:19:15 字数 34272 浏览 0 评论 0 收藏

Flink 的 Table API 和 SQL 程序可以连接到其他外部系统,以便读取和写入批处理表和流表。表源提供对存储在外部系统(例如数据库,键值存储,消息队列或文件系统)中的数据的访问。表接收器向外部存储系统发出表。根据源和接收器的类型,它们支持不同的格式,如 CSV,Parquet 或 ORC。

本页介绍如何声明内置表源和/或表接收器,并在 Flink 中注册它们。注册源或接收器后,可以通过 Table API 和 SQL 语句访问它。

注意如果要实现自己的 自定义 表源或接收器,请查看 用户定义的源和接收器页面 。

依赖

下表列出了所有可用的连接器和格式。它们的相互兼容性在 表连接器表格格式 的相应部分中标记。下表提供了使用构建自动化工具(如 Maven 或 SBT)和带有 SQL JAR 包的 SQL Client 的两个项目的依赖关系信息。

此表仅适用于稳定版本。

概览

从 Flink 1.6 开始,与外部系统的连接声明与实际实现分开。

也可以指定连接

  • 以编程方式 使用 Descriptor under org.apache.flink.table.descriptors for Table&SQL API
  • 声明性地 通过 SQL 客户端的 YAML 配置文件

这不仅可以更好地统一 API 和 SQL Client,还可以在 自定义实现的 情况下 实现 更好的可扩展性,而无需更改实际声明。

每个声明都类似于 SQL CREATE TABLE 语句。可以预先定义表的名称,表的模式,连接器以及用于连接到外部系统的数据格式。

连接器 描述了存储表的数据的外部系统。可以在此处声明 Apacha Kafka 或常规文件系统等存储系统。连接器可能已经提供了具有字段和架构的固定格式。

某些系统支持不同的 数据格式 。例如,存储在 Kafka 或文件中的表可以使用 CSV,JSON 或 Avro 对其行进行编码。数据库连接器可能需要此处的表模式。每个 连接器 都记录了存储系统是否需要定义格式。不同的系统还需要不同 类型的格式 (例如,面向列的格式与面向行的格式)。该文档说明了哪些格式类型和连接器兼容。

表架构 定义了暴露在 SQL 查询表的架构。它描述了源如何将数据格式映射到表模式,反之亦然。模式可以访问连接器或格式定义的字段。它可以使用一个或多个字段来提取或插入 时间属性 。如果输入字段没有确定的字段顺序,则架构清楚地定义列名称,它们的顺序和原点。

后续部分将更详细地介绍每个定义部分( 连接器格式架构 )。以下示例显示了如何传递它们:

tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .registerTableSource("MyTable")
name: MyTable
type: source
update-mode: append
connector: ...
format: ...
schema: ...

表的类型( sourcesinkboth )确定表的注册方式。在表类型的情况下,表 both 源和表接收器都以相同的名称注册。从逻辑上讲,这意味着我们可以读取和写入这样的表,类似于常规 DBMS 中的表。

对于流式查询, 更新模式 声明如何在动态表和存储系统之间进行通信以进行连续查询。

以下代码显示了如何连接到 Kafka 以读取 Avro 记录的完整示例。

tableEnvironment
  // declare the external system to connect to
  .connect(
  new Kafka()
    .version("0.10")
    .topic("test-input")
    .startFromEarliest()
    .property("zookeeper.connect", "localhost:2181")
    .property("bootstrap.servers", "localhost:9092")
  )

  // declare a format for this system
  .withFormat(
  new Avro()
    .avroSchema(
    "{" +
    "  \"namespace\": \"org.myorganization\"," +
    "  \"type\": \"record\"," +
    "  \"name\": \"UserMessage\"," +
    "  \"fields\": [" +
    "    {\"name\": \"timestamp\", \"type\": \"string\"}," +
    "    {\"name\": \"user\", \"type\": \"long\"}," +
    "    {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" +
    "  ]" +
    "}" +
    )
  )

  // declare the schema of the table
  .withSchema(
  new Schema()
    .field("rowtime", Types.SQL_TIMESTAMP)
    .rowtime(new Rowtime()
      .timestampsFromField("ts")
      .watermarksPeriodicBounded(60000)
    )
    .field("user", Types.LONG)
    .field("message", Types.STRING)
  )

  // specify the update-mode for streaming tables
  .inAppendMode()

  // register as source, sink, or both and under a name
  .registerTableSource("MyUserTable");
tables:
  - name: MyUserTable    # name the new table
  type: source       # declare if the table should be "source", "sink", or "both"
  update-mode: append  # specify the update-mode for streaming tables

  # declare the external system to connect to
  connector:
    type: kafka
    version: "0.10"
    topic: test-input
    startup-mode: earliest-offset
    properties:
    - key: zookeeper.connect
      value: localhost:2181
    - key: bootstrap.servers
      value: localhost:9092

  # declare a format for this system
  format:
    type: avro
    avro-schema: >
    {
      "namespace": "org.myorganization",
      "type": "record",
      "name": "UserMessage",
      "fields": [
        {"name": "ts", "type": "string"},
        {"name": "user", "type": "long"},
        {"name": "message", "type": ["string", "null"]}
      ]
    }

  # declare the schema of the table
  schema:
    - name: rowtime
    type: TIMESTAMP
    rowtime:
      timestamps:
      type: from-field
      from: ts
      watermarks:
      type: periodic-bounded
      delay: "60000"
    - name: user
    type: BIGINT
    - name: message
    type: VARCHAR

在两种方式中,所需的连接属性都转换为规范化的,基于字符串的键值对。所谓的 表工厂 从键值对创建配置的表源,表接收器和相应的格式。在搜索完全匹配的表工厂时,会考虑通过 Java 服务提供程序接口(SPI) 找到的所有表工厂。

如果找不到工厂或多个工厂匹配给定的属性,则会抛出一个异常,其中包含有关已考虑的工厂和支持的属性的其他信息。

表格式

表模式定义类的名称和类型,类似于 SQL CREATE TABLE 语句的列定义。另外,可以指定列的表示方式和表格数据编码格式的字段。如果列的名称应与输入/输出格式不同,则字段的来源可能很重要。例如,列 user_name 应从 $$-user-name JSON 格式引用该字段。此外,需要架构将类型从外部系统映射到 Flink 的表示。对于表接收器,它确保仅将具有有效模式的数据写入外部系统。

以下示例显示了一个没有时间属性的简单模式,以及输入/输出到表列的一对一字段映射。

.withSchema(
  new Schema()
  .field("MyField1", Types.INT)   // required: specify the fields of the table (in this order)
  .field("MyField2", Types.STRING)
  .field("MyField3", Types.BOOLEAN)
)
schema:
  - name: MyField1  # required: specify the fields of the table (in this order)
  type: INT
  - name: MyField2
  type: VARCHAR
  - name: MyField3
  type: BOOLEAN

对于 每个字段 ,除了列的名称和类型之外,还可以声明以下属性:

.withSchema(
  new Schema()
  .field("MyField1", Types.SQL_TIMESTAMP)
    .proctime()    // optional: declares this field as a processing-time attribute
  .field("MyField2", Types.SQL_TIMESTAMP)
    .rowtime(...)  // optional: declares this field as a event-time attribute
  .field("MyField3", Types.BOOLEAN)
    .from("mf3")   // optional: original field in the input that is referenced/aliased by this field
)
schema:
  - name: MyField1
  type: TIMESTAMP
  proctime: true  # optional: boolean flag whether this field should be a processing-time attribute
  - name: MyField2
  type: TIMESTAMP
  rowtime: ...    # optional: wether this field should be a event-time attribute
  - name: MyField3
  type: BOOLEAN
  from: mf3     # optional: original field in the input that is referenced/aliased by this field

使用无界流表时,时间属性是必不可少的。因此,处理时间和事件时间(也称为“行时”)属性都可以定义为模式的一部分。

有关 Flink 中时间处理的更多信息,特别是事件时间,我们建议使用常规 事件时间部分

行时属性

为了控制表的事件时间行为,Flink 提供了预定义的时间戳提取器和水印策略。

支持以下时间戳提取器:

// Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute.
.rowtime(
  new Rowtime()
  .timestampsFromField("ts_field")  // required: original field name in the input
)

// Converts the assigned timestamps from a DataStream API record into the rowtime attribute 
// and thus preserves the assigned timestamps from the source.
// This requires a source that assigns timestamps (e.g., Kafka 0.10+).
.rowtime(
  new Rowtime()
  .timestampsFromSource()
)

// Sets a custom timestamp extractor to be used for the rowtime attribute.
// The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.
.rowtime(
  new Rowtime()
  .timestampsFromExtractor(...)
)
# Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

rowtime:
  timestamps:
  type: from-field
  from: "ts_field"         # required: original field name in the input

# Converts the assigned timestamps from a DataStream API record into the rowtime attribute 

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

# and thus preserves the assigned timestamps from the source.

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

rowtime:
  timestamps:
  type: from-source

支持以下水印策略:

// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum 
// observed timestamp so far minus 1\. Rows that have a timestamp equal to the max timestamp
// are not late.
.rowtime(
  new Rowtime()
  .watermarksPeriodicAscending()
)

// Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
// Emits watermarks which are the maximum observed timestamp minus the specified delay.
.rowtime(
  new Rowtime()
  .watermarksPeriodicBounded(2000)  // delay in milliseconds
)

// Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
// underlying DataStream API and thus preserves the assigned watermarks from the source.
.rowtime(
  new Rowtime()
  .watermarksFromSource()
)
# Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum 

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

# observed timestamp so far minus 1\. Rows that have a timestamp equal to the max timestamp

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

# are not late.

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

rowtime:
  watermarks:
  type: periodic-ascending

# Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

# Emits watermarks which are the maximum observed timestamp minus the specified delay.

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

rowtime:
  watermarks:
  type: periodic-bounded
  delay: ...        # required: delay in milliseconds

# Sets a built-in watermark strategy which indicates the watermarks should be preserved from the

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

# underlying DataStream API and thus preserves the assigned watermarks from the source.

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)

rowtime:
  watermarks:
  type: from-source

确保始终声明时间戳和水印。触发基于时间的 算子操作需要水印。

类型字符串

由于类型信息仅在编程语言中可用,因此支持在 YAML 文件中定义以下类型字符串:

VARCHAR
BOOLEAN
TINYINT
SMALLINT
INT
BIGINT
FLOAT
DOUBLE
DECIMAL
DATE
TIME
TIMESTAMP
ROW(fieldtype, ...)        # unnamed row; e.g. ROW(VARCHAR, INT) that is mapped to Flink's RowTypeInfo
                 # with indexed fields names f0, f1, ...
ROW(fieldname fieldtype, ...)  # named row; e.g., ROW(myField VARCHAR, myOtherField INT) that
                 # is mapped to Flink's RowTypeInfo
POJO(class)            # e.g., POJO(org.mycompany.MyPojoClass) that is mapped to Flink's PojoTypeInfo
ANY(class)             # e.g., ANY(org.mycompany.MyClass) that is mapped to Flink's GenericTypeInfo
ANY(class, serialized)       # used for type information that is not supported by Flink's Table & SQL API

更新模式

对于流式查询,需要声明如何 在动态表和外部连接器之间 执行 转换 。的 更新模式 指定哪些类型的消息应与外部系统进行交换:

追加模式: 在追加模式下,动态表和外部连接器仅交换 INSERT 消息。

回退模式: 在回退模式下,动态表和外部连接器交换 ADD 和 RETRACT 消息。INSERT 更改被编码为 ADD 消息,DELETE 更改被编码为 RETRACT 消息,UPDATE 更改被编码为更新(先前)行的 RETRACT 消息和更新(新)行的 ADD 消息。在此模式下,不能定义键而不是 upsert 模式。但是,每次更新都包含两个效率较低的消息。

Upsert 模式: 在 upsert 模式下,动态表和外部连接器交换 UPSERT 和 DELETE 消息。此模式需要一个(可能是复合的)唯一键,通过该键可以传播更新。外部连接器需要知道唯一键属性才能正确应用消息。INSERT 和 UPDATE 更改被编码为 UPSERT 消息。DELETE 更改为 DELETE 消息。与收回流的主要区别在于 UPDATE 更改使用单个消息进行编码,因此更有效。

注意每个连接器的文档都说明支持哪些更新模式。

.connect(...)
  .inAppendMode()  // otherwise: inUpsertMode() or inRetractMode()
tables:
  - name: ...
  update-mode: append  # otherwise: "retract" or "upsert"

有关更多信息,另请参阅 常规流概念文档

表连接器

Flink 提供了一组用于连接外部系统的连接器。

请注意,并非所有连接器都可用于批量和流式传输。此外,并非每个流连接器都支持每种流模式。因此,相应地标记每个连接器。格式标记表示连接器需要特定类型的格式。

文件系统连接器

来源:批量 来源:流处理附加模式 接收器:批量 接收器:流式附加模式 格式:仅限 CSV

文件系统连接器允许从本地或分布式文件系统进行读写。文件系统可以定义为:

.connect(
  new FileSystem()
  .path("file:///path/to/whatever")  // required: path to a file or directory
)
connector:
  type: filesystem
  path: "file:///path/to/whatever"  # required: path to a file or directory

文件系统连接器本身包含在 Flink 中,不需要额外的依赖项。需要指定相应的格式,以便从文件系统读取和写入行。

注意确保包含 特定于 Flink 文件系统的依赖项

注意用于流式传输的文件系统源和接收器仅是实验性的。将来,我们将支持实际的流式使用案例,即目录监控和桶输出。

Kafka 连接器

来源:流附加模式 接收器:流附加模式 格式:序列化模式 格式:反序列化模式

Kafka 连接器允许从 Apache Kafka 主题读取和写入。它可以定义如下:

.connect(
  new Kafka()
  .version("0.11")  // required: valid connector versions are "0.8", "0.9", "0.10", and "0.11"
  .topic("...")     // required: topic name from which the table is read

  // optional: connector specific properties
  .property("zookeeper.connect", "localhost:2181")
  .property("bootstrap.servers", "localhost:9092")
  .property("group.id", "testGroup")

  // optional: select a startup mode for Kafka offsets
  .startFromEarliest()
  .startFromLatest()
  .startFromSpecificOffsets(...)

  // optional: output partitioning from Flink's partitions into Kafka's partitions
  .sinkPartitionerFixed()     // each Flink partition ends up in at-most one Kafka partition (default)
  .sinkPartitionerRoundRobin()  // a Flink partition is distributed to Kafka partitions round-robin
  .sinkPartitionerCustom(MyCustom.class)  // use a custom FlinkKafkaPartitioner subclass
)
connector:
  type: kafka
  version: 0.11     # required: valid connector versions are "0.8", "0.9", "0.10", and "0.11"
  topic: ...      # required: topic name from which the table is read

  properties:     # optional: connector specific properties
  - key: zookeeper.connect
    value: localhost:2181
  - key: bootstrap.servers
    value: localhost:9092
  - key: group.id
    value: testGroup

  startup-mode: ...   # optional: valid modes are "earliest-offset", "latest-offset",
            # "group-offsets", or "specific-offsets"
  specific-offsets:   # optional: used in case of startup mode with specific offsets
  - partition: 0
    offset: 42
  - partition: 1
    offset: 300

  sink-partitioner: ...  # optional: output partitioning from Flink's partitions into Kafka's partitions
               # valid are "fixed" (each Flink partition ends up in at most one Kafka partition),
               # "round-robin" (a Flink partition is distributed to Kafka partitions round-robin)
               # "custom" (use a custom FlinkKafkaPartitioner subclass)
  sink-partitioner-class: org.mycompany.MyPartitioner  # optional: used in case of sink partitioner custom

指定开始读取位置: 默认情况下,Kafka 源将开始从 Zookeeper 或 Kafka 代理中的已提交组偏移量中读取数据。您可以指定其他起始位置,这些位置对应于 Kafka 消费者起始位置配置 部分中的 配置

Flink-Kafka 接收器分区: 默认情况下, Kafka 接收 器最多写入与其自身并行性一样多的分区(接收器的每个并行实例仅写入一个分区)。为了将写入分发到更多分区或控制行到分区的路由,可以提供自定义接收器分区器。循环分区器可用于避免不平衡的分区。但是,它会在所有 Flink 实例和所有 Kafka 代理之间产生大量网络连接。

一致性保证: 默认情况下,如果在 启用检查点的 情况下执行查询,则 Kafka 接收器会使用至少一次保证将数据提取到 Kafka 主题中。

Kafka 0.10+时间戳: 自 Kafka 0.10 起,Kafka 消息的时间戳作为元数据,指定记录何时写入 Kafka 主题。通过分别在 YAML 和 Java / Scala 中选择,可以将这些时间戳用于 行时属性timestamps: from-source``timestampsFromSource()

确保添加特定于版本的 Kafka 依赖项。此外,需要指定相应的格式以便从 Kafka 读取和写入行。

表格格式

Flink 提供了一组可与表连接器一起使用的表格格式。

格式标记表示与连接器匹配的格式类型。

CSV 格式

CSV 格式允许读取和写入以逗号分隔的行。

.withFormat(
  new Csv()
  .field("field1", Types.STRING)  // required: ordered format fields
  .field("field2", Types.TIMESTAMP)
  .fieldDelimiter(",")        // optional: string delimiter "," by default 
  .lineDelimiter("\n")        // optional: string delimiter "\n" by default 
  .quoteCharacter('"')        // optional: single character for string values, empty by default
  .commentPrefix('#')         // optional: string to indicate comments, empty by default
  .ignoreFirstLine()        // optional: ignore the first line, by default it is not skipped
  .ignoreParseErrors()        // optional: skip records with parse error instead of failing by default
)
format:
  type: csv
  fields:          # required: ordered format fields
  - name: field1
    type: VARCHAR
  - name: field2
    type: TIMESTAMP
  field-delimiter: ","     # optional: string delimiter "," by default 
  line-delimiter: "\n"     # optional: string delimiter "\n" by default 
  quote-character: '"'     # optional: single character for string values, empty by default
  comment-prefix: '#'    # optional: string to indicate comments, empty by default
  ignore-first-line: false   # optional: boolean flag to ignore the first line, by default it is not skipped
  ignore-parse-errors: true  # optional: skip records with parse error instead of failing by default

CSV 格式包含在 Flink 中,不需要其他依赖项。

注意目前写入行的 CSV 格式有限。仅支持自定义字段分隔符作为可选参数。

JSON 格式

格式:序列化架构 格式:反序列化架构

JSON 格式允许读取和写入与给定格式模式相对应的 JSON 数据。格式模式可以定义为 Flink 类型,JSON 模式,也可以从所需的表模式派生。Flink 类型支持更类似 SQL 的定义并映射到相应的 SQL 数据类型。JSON 模式允许更复杂和嵌套的结构。

如果格式架构等于表架构,则还可以自动派生架构。这允许仅定义一次架构信息。格式的名称,类型和字段顺序由表的架构决定。如果时间属性的来源不是字段,则会忽略它们。一个 from 表中的模式定义解释为格式字段命名。

.withFormat(
  new Json()
  .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default

  // required: define the schema either by using type information which parses numbers to corresponding types
  .schema(Type.ROW(...))

  // or by using a JSON schema which parses to DECIMAL and TIMESTAMP
  .jsonSchema(
    "{" +
    "  type: 'object'," +
    "  properties: {" +
    "  lon: {" +
    "    type: 'number'" +
    "  }," +
    "  rideTime: {" +
    "    type: 'string'," +
    "    format: 'date-time'" +
    "  }" +
    "  }" +
    "}"
  )

  // or use the table's schema
  .deriveSchema()
)
format:
  type: json
  fail-on-missing-field: true   # optional: flag whether to fail if a field is missing or not, false by default

  # required: define the schema either by using a type string which parses numbers to corresponding types
  schema: "ROW(lon  FLOAT,  rideTime  TIMESTAMP)"

  # or by using a JSON schema which parses to DECIMAL and TIMESTAMP
  json-schema: >
  {
    type: 'object',
    properties: {
    lon: {
      type: 'number'
    },
    rideTime: {
      type: 'string',
      format: 'date-time'
    }
    }
  }

  # or use the table's schema
  derive-schema: true

下表显示了 JSON 模式类型到 Flink SQL 类型的映射:

JSON 模式Flink SQL
objectROW
booleanBOOLEAN
arrayARRAY[_]
numberDECIMAL
integerDECIMAL
stringVARCHAR
string with format: date-timeTIMESTAMP
string with format: dateDATE
string with format: timeTIME
string with encoding: base64ARRAY[TINYINT]
nullNULL (尚不支持)

目前,Flink 仅支持 JSON 模式规范 的子集 draft-07 。Union 类型(以及 allOfanyOfnot )尚未支持。 oneOf 和类型数组仅支持指定可为空性。

支持链接到文档中的通用定义的简单引用,如下面更复杂的示例所示:

{  "definitions":  {  "address":  {  "type":  "object",  "properties":  {  "street_address":  {  "type":  "string"  },  "city":  {  "type":  "string"  },  "state":  {  "type":  "string"  }  },  "required":  [  "street_address",  "city",  "state"  ]  }  },  "type":  "object",  "properties":  {  "billing_address":  {  "$ref":  "#/definitions/address"  },  "shipping_address":  {  "$ref":  "#/definitions/address"  },  "optional_address":  {  "oneOf":  [  {  "type":  "null"  },  {  "$ref":  "#/definitions/address"  }  ]  }  }  }

缺少字段处理: 默认情况下,缺少的 JSON 字段设置为 null 。您可以启用严格的 JSON 解析,如果缺少字段,将取消源(和查询)。

确保将 JSON 格式添加为依赖项。

Apache Avro 格式

格式:序列化架构 格式:反序列化架构

Apache 的 Avro 的 格式允许读取和写入对应于给定的格式模式 Avro 的数据。格式模式可以定义为 Avro 特定记录的完全限定类名,也可以定义为 Avro 架构字符串。如果使用类名,则在运行时期间类必须在类路径中可用。

.withFormat(
  new Avro()

  // required: define the schema either by using an Avro specific record class
  .recordClass(User.class)

  // or by using an Avro schema
  .avroSchema(
    "{" +
    "  \"type\": \"record\"," +
    "  \"name\": \"test\"," +
    "  \"fields\" : [" +
    "  {\"name\": \"a\", \"type\": \"long\"}," +
    "  {\"name\": \"b\", \"type\": \"string\"}" +
    "  ]" +
    "}"
  )
)
format:
  type: avro

  # required: define the schema either by using an Avro specific record class
  record-class: "org.organization.types.User"

  # or by using an Avro schema
  avro-schema: >
  {
    "type": "record",
    "name": "test",
    "fields" : [
    {"name": "a", "type": "long"},
    {"name": "b", "type": "string"}
    ]
  }

Avro 类型映射到相应的 SQL 数据类型。仅支持联合类型以指定可为空性,否则它们将转换为 ANY 类型。下表显示了映射:

Avro 架构Flink SQL
recordROW
enumVARCHAR
arrayARRAY[_]
mapMAP[VARCHAR, _]
union非 null 类型或 ANY
fixedARRAY[TINYINT]
stringVARCHAR
bytesARRAY[TINYINT]
intINT
longBIGINT
floatFLOAT
doubleDOUBLE
booleanBOOLEAN
int with logicalType: dateDATE
int with logicalType: time-millisTIME
int with logicalType: time-microsINT
long with logicalType: timestamp-millisTIMESTAMP
long with logicalType: timestamp-microsBIGINT
bytes with logicalType: decimalDECIMAL
fixed with logicalType: decimalDECIMAL
nullNULL (尚不支持)

Avro 使用 Joda-Time 来表示特定记录类中的逻辑日期和时间类型。Joda-Time 依赖不是 Flink 分发的一部分。因此,请确保 Joda-Time 在运行时期间与您的特定记录类一起位于类路径中。通过模式字符串指定的 Avro 格式不需要存在 Joda-Time。

确保添加 Apache Avro 依赖项。

进一步的 TableSources 和 TableSinks

尚未将以下表源和接收器迁移(或尚未完全迁移)到新的统一接口。

这些是 TableSource Flink 提供的附加函数:

班级名称Maven 依赖批量?流?描述
OrcTableSourceflink-orcYN一个 TableSource ORC 文件。

这些是 TableSink Flink 提供的附加函数:

班级名称Maven 依赖批量?流?描述
CsvTableSinkflink-tableY附加CSV 文件的简单接收器。
JDBCAppendTableSinkflink-jdbcY附加将表写入 JDBC 表。
CassandraAppendTableSinkflink-connector-cassandraN附加将表写入 Cassandra 表。

OrcTableSource

OrcTableSource 读取 ORC 文件 。ORC 是结构化数据的文件格式,并以压缩的柱状表示形式存储数据。ORC 非常高效,支持 Projection 和滤波器下推。

一个 OrcTableSource 被创建,如下所示:

// create Hadoop Configuration
Configuration config = new Configuration();

OrcTableSource orcTableSource = OrcTableSource.builder()
  // path to ORC file(s). NOTE: By default, directories are recursively scanned.
  .path("file:///path/to/data")
  // schema of ORC files
  .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
  // Hadoop configuration
  .withConfiguration(config)
  // build OrcTableSource
  .build();
// create Hadoop Configuration val config = new Configuration()

val orcTableSource = OrcTableSource.builder()
  // path to ORC file(s). NOTE: By default, directories are recursively scanned.
  .path("file:///path/to/data")
  // schema of ORC files
  .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
  // Hadoop configuration
  .withConfiguration(config)
  // build OrcTableSource
  .build()

注:OrcTableSource 不支持 ORC 的 Union 类型呢。

CsvTableSink

CsvTableSink 发出 Table 一个或一个以上的 CSV 文件。

接收器仅支持仅附加流表。它不能用于发出 Table 不断更新的内容。有关详细信息,请参阅 表到流转换文档 。发出流表时,行至少写入一次(如果启用了检查点), CsvTableSink 并且不将输出文件拆分为存储桶文件,而是连续写入相同的文件。

Table table = ...

table.writeToSink(
  new CsvTableSink(
  path,          // output path 
  "|",           // optional: delimit files by '|'
  1,           // optional: write to a single file
  WriteMode.OVERWRITE)); // optional: override existing files
val table: Table = ???

table.writeToSink(
  new CsvTableSink(
  path,               // output path
  fieldDelim = "|",         // optional: delimit files by '|'
  numFiles = 1,           // optional: write to a single file
  writeMode = WriteMode.OVERWRITE)) // optional: override existing files

JDBCAppendTableSink

JDBCAppendTableSink 发出 Table 一个 JDBC 连接。接收器仅支持仅附加流表。它不能用于发出 Table 不断更新的内容。有关详细信息,请参阅 表到流转换文档

所述 JDBCAppendTableSink 插入物每 Table 行至少一次到数据库表(如果启用了检查点)。但是,您可以使用 REPLACE 或指定插入查询来指定 INSERT OVERWRITE 对数据库的写入。

要使用 JDBC 接收器,必须将 JDBC 连接器依赖项( flink-jdbc )添加到项目中。然后您可以使用 JDBCAppendSinkBuilder 以下方法创建接收器

JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
  .setDBUrl("jdbc:derby:memory:ebookshop")
  .setQuery("INSERT INTO books (id) VALUES (?)")
  .setParameterTypes(INT_TYPE_INFO)
  .build();

Table table = ...
table.writeToSink(sink);
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
  .setDBUrl("jdbc:derby:memory:ebookshop")
  .setQuery("INSERT INTO books (id) VALUES (?)")
  .setParameterTypes(INT_TYPE_INFO)
  .build()

val table: Table = ???
table.writeToSink(sink)

与 using 类似 JDBCOutputFormat ,您必须显式指定 JDBC 驱动程序的名称,JDBC URL,要执行的查询以及 JDBC 表的字段类型。

CassandraAppendTableSink

CassandraAppendTableSink 发射 Table 到卡桑德拉表。接收器仅支持仅附加流表。它不能用于发出 Table 不断更新的内容。有关详细信息,请参阅 表到流转换文档

CassandraAppendTableSink 插入所有行至少一次到 Cassandra 的表,如果检查点已启用。但是,您可以将查询指定为 upsert 查询。

要使用 CassandraAppendTableSink ,必须将 Cassandra 连接器依赖项( flink-connector-cassandra )添加到项目中。以下示例显示了如何使用 CassandraAppendTableSink

ClusterBuilder builder = ... // configure Cassandra cluster connection

CassandraAppendTableSink sink = new CassandraAppendTableSink(
  builder,
  // the query must match the schema of the table
  INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?));

Table table = ...
table.writeToSink(sink);
val builder: ClusterBuilder = ... // configure Cassandra cluster connection 
val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
  builder,
  // the query must match the schema of the table
  INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?))

val table: Table = ???
table.writeToSink(sink)

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。