返回介绍

连接器

发布于 2025-05-02 18:19:15 字数 8169 浏览 0 评论 0 收藏

从文件系统中读取

Flink 内置支持以下文件系统:

文件系统方案笔记
Hadoop 分布式文件系统(HDFS)hdfs://支持所有 HDFS 版本
亚马逊 S3s3://通过 Hadoop 文件系统实现支持(见下文)
MapR 文件系统maprfs://用户必须手动将所需的 jar 文件放在 lib/ dir 中
Alluxioalluxio://通过 Hadoop 文件系统实现支持(见下文)

使用 Hadoop 文件系统实现

Apache Flink 允许用户使用任何实现该 org.apache.hadoop.fs.FileSystem 接口的文件系统。有 Hadoop FileSystem 实现

为了使用 Flink 的 Hadoop 文件系统,请确保

  • flink-conf.yaml 已设定的 fs.hdfs.hadoopconf 属性将 Hadoop 配置目录。对于自动测试或从 IDE 运行, flink-conf.yaml 可以通过定义 FLINK_CONF_DIR 环境变量来设置包含的目录。
  • Hadoop 配置(在该目录中)具有文件中所需文件系统的条目 core-site.xml 。S3 和 Alluxio 的示例链接/显示如下。
  • lib/ Flink 安装的文件夹中提供了使用文件系统所需的类(在运行 Flink 的所有计算机上)。如果无法将文件放入目录,Flink 还会尊重 HADOOP_CLASSPATH 环境变量以将 Hadoop jar 文件添加到类路径中。

亚马逊 S3

请参阅 部署和 算子操作 - 部署 - AWS - S3:简单存储服务, 以获取可用的 S3 文件系统实现,其配置和所需的库。

Alluxio

对于 Alluxio 支持,将以下条目添加到 core-site.xml 文件中:

<property>
  <name>fs.alluxio.impl</name>
  <value>alluxio.hadoop.FileSystem</value>
</property>

使用 Hadoop 的 Input / OutputFormat 打包器连接到其他系统

Apache Flink 允许用户访问许多不同的系统作为数据源或接收器。该系统的设计非常容易扩展。与 Apache Hadoop 类似,Flink 具有所谓的 InputFormat s 和 OutputFormat s 的概念。

这些 InputFormat 的一个实现是 HadoopInputFormat 。这是一个打包器,允许用户使用 Flink 的所有现有 Hadoop 输入格式。

本节介绍将 Flink 连接到其他系统的一些示例。 阅读有关 Flink 中 Hadoop 兼容性的更多信息

Avro 对 Flink 的支持

Flink 对 Apache Avro 提供了广泛的内置支持。这样可以使用 Flink 轻松读取 Avro 文件。此外,Flink 的序列化框架能够处理从 Avro 架构生成的类。确保将 Flink Avro 依赖项包含在项目的 pom.xml 中。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.7-SNAPSHOT</version>
</dependency>

要从 Avro 文件中读取数据,您必须指定一个 AvroInputFormat

示例

AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataSet<User> usersDS = env.createInput(users);

请注意,这 User 是 Avro 生成的 POJO。Flink 还允许执行这些 POJO 的基于字符串的 Keys 选择。例如:

usersDS.groupBy("name")

请注意,使用 GenericData.Record Flink 可以使用该类型,但不建议使用。由于记录包含完整的模式,因此其数据密集,因此可能使用起来很慢。

Flink 的 POJO 字段选择也适用于 Avro 生成的 POJO。但是,只有在将字段类型正确写入生成的类时才可以使用。如果字段是类型 Object ,则不能将该字段用作连接或分组键。像这样在 Avro 中指定一个字段 {"name": "type_double_test", "type": "double"}, 工作正常,但是将其指定为只有一个字段( {"name": "type_double_test", "type": ["double"]}, )的 UNION 类型将生成一个类型的字段 Object 。请注意,指定可空类型( {"name": "type_double_test", "type": ["null", "double"]}, )是可能的!

访问 Microsoft Azure 表存储

注意:此示例适用于 Flink 0.6-incubating

此示例使用 HadoopInputFormat 打包器使用现有的 Hadoop 输入格式实现来访问 Azure 的表存储

  1. 下载并编译 azure-tables-hadoop 项目。该项目开发的输入格式尚未在 Maven Central 中提供,因此,我们必须自己构建项目。执行以下命令:
 git clone https://github.com/mooso/azure-tables-hadoop.git
   cd azure-tables-hadoop
   mvn clean install
  1. 使用快速入门设置新的 Flink 项目:
 curl https://flink.apache.org/q/quickstart.sh | bash
  1. 将以下依赖项(在本 &lt;dependencies&gt; 节中)添加到您的 pom.xml 文件中:
 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-hadoop-compatibility_2.11</artifactId>
     <version>1.7-SNAPSHOT</version>
   </dependency>
   <dependency>
   <groupId>com.microsoft.hadoop</groupId>
   <artifactId>microsoft-hadoop-azure</artifactId>
   <version>0.0.4</version>
   </dependency>

flink-hadoop-compatibility 是一个 Flink 包,提供 Hadoop 输入格式打包器。 microsoft-hadoop-azure 将我们之前构建的项目添加到项目中。

该项目现在准备开始编码。我们建议将项目导入 IDE,例如 Eclipse 或 IntelliJ。(作为 Maven 项目导入!)。浏览到该 Job.java 文件的代码。它是 Flink 工作的空框架。

将以下代码粘贴到其中:

import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import com.microsoft.hadoop.azure.AzureTableConfiguration;
import com.microsoft.hadoop.azure.AzureTableInputFormat;
import com.microsoft.hadoop.azure.WritableEntity;
import com.microsoft.windowsazure.storage.table.EntityProperty;

public class AzureTableExample {

  public static void main(String[] args) throws Exception {
  // set up the execution environment
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

  // create a  AzureTableInputFormat, using a Hadoop input format wrapper
  HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());

  // set the Account URI, something like: https://apacheflink.table.core.windows.net
  hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
  // set the secret storage key here
  hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
  // set the table name here
  hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(), "TODO");

  DataSet<Tuple2<Text, WritableEntity>> input = env.createInput(hdIf);
  // a little example how to use the data in a mapper.
  DataSet<String> fin = input.map(new MapFunction<Tuple2<Text,WritableEntity>, String>() {
    @Override
    public String map(Tuple2<Text, WritableEntity> arg0) throws Exception {
    System.err.println("--------------------------------\nKey = "+arg0.f0);
    WritableEntity we = arg0.f1;

    for(Map.Entry<String, EntityProperty> prop : we.getProperties().entrySet()) {
      System.err.println("key="+prop.getKey() + " ; value (asString)="+prop.getValue().getValueAsString());
    }

    return arg0.f0.toString();
    }
  });

  // emit result (this works only locally)
  fin.print();

  // execute program
  env.execute("Azure Example");
  }
}

该示例显示了如何访问 Azure 表并将数据转换为 Flink DataSet (更具体地说,是集合的类型 DataSet&lt;Tuple2&lt;Text, WritableEntity&gt;&gt; )。使用 DataSet ,您可以将所有已知的转换应用于 DataSet。

访问 MongoDB

这个 GitHub 存储库记录了如何将 MongoDB 与 Apache Flink 一起使用(从 0.7-incubating 开始)

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。