返回介绍

Hadoop 兼容性测试版

发布于 2025-05-02 18:19:15 字数 9143 浏览 0 评论 0 收藏

Flink 与 Apache Hadoop MapReduce 接口兼容,因此允许重用为 Hadoop MapReduce 实现的代码。

您可以:

本文档展示了如何在 Flink 中使用现有的 Hadoop MapReduce 代码。有关从 Hadoop 支持的文件系统中读取的信息,请参阅“ 连接到其他系统” 指南。

项目配置

Hadoop 的输入/输出格式的支持是的一部分 flink-javaflink-scala 写入 Flink 作业时总是需要的 Maven 模块。的代码位于 org.apache.flink.api.java.hadooporg.apache.flink.api.scala.hadoop 在附加的子包的 mapredmapreduce API。

flink-hadoop-compatibility Maven 模块中包含对 Hadoop Mappers 和 Reducers 的支持。此代码驻留在 org.apache.flink.hadoopcompatibility 包中。

pom.xml 如果要重用 Mappers 和 Reducers,请将以下依赖项添加到您的。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-hadoop-compatibility_2.11</artifactId>
  <version>1.7-SNAPSHOT</version>
</dependency>

使用 Hadoop 数据类型

Flink 支持开箱即用的所有 Hadoop WritableWritableComparable 数据类型。如果您只想使用 Hadoop 数据类型,则不需要包含 Hadoop 兼容性依赖项。有关详细信息,请参阅 编程指南

使用 Hadoop InputFormats

要使用的 Hadoop InputFormats 与 Flink 格式必须首先使用任一包裹 readHadoopFilecreateHadoopInput 在的 HadoopInputs utilty 类。前者用于从 FileInputFormat 后者输出的输入格式,而后者必须用于通用输入格式。结果 InputFormat 可用于通过使用创建数据源 ExecutionEnvironmen#createInput

结果 DataSet 包含 2 元组,其中第一个字段是键,第二个字段是从 Hadoop InputFormat 检索的值。

以下示例显示了如何使用 Hadoop TextInputFormat

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<LongWritable, Text>> input =
  env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
            LongWritable.class, Text.class, textPath));

// Do something with the data.
[...]
val env = ExecutionEnvironment.getExecutionEnvironment

val input: DataSet[(LongWritable, Text)] =
  env.createInput(HadoopInputs.readHadoopFile(
          new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))

// Do something with the data. [...]

使用 Hadoop OutputFormats

Flink 为 Hadoop 提供了兼容性打包器 OutputFormats 。支持实现 org.apache.hadoop.mapred.OutputFormat 或扩展的 任何类 org.apache.hadoop.mapreduce.OutputFormat 。OutputFormat 打包器期望其输入数据是包含 2 元组键和值的 DataSet。这些将由 Hadoop OutputFormat 处理。

以下示例显示了如何使用 Hadoop TextOutputFormat

// Obtain the result we want to emit
DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]

// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
  // create the Flink wrapper.
  new HadoopOutputFormat<Text, IntWritable>(
  // set the Hadoop OutputFormat and specify the job.
  new TextOutputFormat<Text, IntWritable>(), job
  );
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));

// Emit data using the Hadoop TextOutputFormat.
hadoopResult.output(hadoopOF);
// Obtain your result to emit. val hadoopResult: DataSet[(Text, IntWritable)] = [...]

val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
  new TextOutputFormat[Text, IntWritable],
  new JobConf)

hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))

hadoopResult.output(hadoopOF)

使用 Hadoop Mappers 和 Reducers

Hadoop Mappers 在语义上等同于 Flink 的 FlatMapFunctions, 而 Hadoop Reducers 等同于 Flink 的 GroupReduceFunctions 。Flink 为 Hadoop MapReduce MapperReducer 接口的实现提供打包器,即,您可以在常规 Flink 程序中重用您的 Hadoop Mappers 和 Reducers。目前,只 org.apache.hadoop.mapred 支持 Hadoop 的 mapred API()的 Mapper 和 Reduce 接口。

打包器将 DataSet&lt;Tuple2&lt;KEYIN,VALUEIN&gt;&gt; 输入作为输入并生成 DataSet&lt;Tuple2&lt;KEYOUT,VALUEOUT&gt;&gt; 输出,其中 KEYINKEYOUT 是键, VALUEIN 并且 VALUEOUT 是 Hadoop 函数处理的 Hadoop 键值对的值。对于 Reducers,Flink 为 GroupReduceFunction 提供了一个打包器( HadoopReduceCombineFunction ),没有 Combiner( HadoopReduceFunction )。打包器接受一个可选 JobConf 对象来配置 Hadoop Mapper 或 Reducer。

Flink 的函数打包器是

  • org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction ,和
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction

并可用作常规 Flink FlatMapFunctionsGroupReduceFunctions

以下示例显示了如何使用 Hadoop MapperReducer 函数。

// Obtain data to process somehow.
DataSet<Tuple2<Text, LongWritable>> text = [...]

DataSet<Tuple2<Text, LongWritable>> result = text
  // use Hadoop Mapper (Tokenizer) as MapFunction
  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
  new Tokenizer()
  ))
  .groupBy(0)
  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
  new Counter(), new Counter()
  ));

请注意: Reducer 打包器适用于 Flink 的 groupBy() 算子操作定义的组。它不考虑您可能在中设置的任何自定义分区器,排序或分组比较器 JobConf

完成 Hadoop WordCount 示例

以下示例显示了使用 Hadoop 数据类型,Input-和 OutputFormats 以及 Mapper 和 Reducer 实现的完整 WordCount 实现。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Set up the Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
  new HadoopInputFormat<LongWritable, Text>(
  new TextInputFormat(), LongWritable.class, Text.class, job
  );
TextInputFormat.addInputPath(job, new Path(inputPath));

// Read data using the Hadoop TextInputFormat.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);

DataSet<Tuple2<Text, LongWritable>> result = text
  // use Hadoop Mapper (Tokenizer) as MapFunction
  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
  new Tokenizer()
  ))
  .groupBy(0)
  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
  new Counter(), new Counter()
  ));

// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
  new HadoopOutputFormat<Text, IntWritable>(
  new TextOutputFormat<Text, IntWritable>(), job
  );
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));

// Emit data using the Hadoop TextOutputFormat.
result.output(hadoopOF);

// Execute Program
env.execute("Hadoop WordCount");

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。