返回介绍

最佳实践

发布于 2025-05-02 18:19:18 字数 8928 浏览 0 评论 0 收藏 0

此页面包含 Flink 程序员关于如何解决经常遇到的问题的最佳实践的集合。

解析命令行参数并在 Flink 应用程序中传递它们

几乎所有 Flink 应用程序(批处理和流式处理)都依赖于外部配置参数。它们用于指定输入和输出源(如路径或地址),系统参数(并行性,运行时配置)和特定于应用程序的参数(通常在用户函数中使用)。

Flink 提供了一个简单的实用程序, ParameterTool 用于提供一些解决这些问题的基本工具。请注意,您不必使用 ParameterTool 此处描述的内容。其他框架(如 Commons CLIargparse4j) 也适用于 Flink。

获取配置值 ParameterTool

ParameterTool 提供了一组用于读取配置的预定义静态方法。该工具在内部期望 a Map<String, String> ,因此很容易将其与您自己的配置风格集成。

来自 .properties 文件

以下方法将读取 属性 文件并提供键/值对:

String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);

File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);

InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);

从命令行参数

这允许 --input hdfs:///mydata --elements 42 从命令行获取参数。

public static void main(String[] args) {
  ParameterTool parameter = ParameterTool.fromArgs(args);
  // .. regular code ..

来自系统属性

启动 JVM 时,您可以将系统属性传递给它: -Dinput=hdfs:///mydata 。您还可以 ParameterTool 从这些系统属性初始化:

ParameterTool parameter = ParameterTool.fromSystemProperties();

使用 Flink 程序中的参数

现在我们已经从某个地方获得了参数(见上文),我们可以通过各种方式使用它们。

直接来自 ParameterTool

ParameterTool 本身有访问值的方法。

ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.

您可以直接在 main() 提交应用程序的客户端的方法中使用这些方法的返回值。例如,您可以设置 算子的并行度,如下所示:

ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

由于 ParameterTool 可序列化,您可以将其传递给函数本身:

ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

然后在函数内部使用它从命令行获取值。

全局注册参数

注册为全局作业参数的参数 ExecutionConfig 可以作为 JobManager Web 界面中的配置值以及用户定义的所有函数进行访问。

全局注册参数:

ParameterTool parameters = ParameterTool.fromArgs(args);

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

在任何丰富的用户函数中访问它们:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

  @Override
  public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  ParameterTool parameters = (ParameterTool)
    getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
  parameters.getRequired("input");
  // .. do more ..

命名大型 TupleX 类型

建议使用 POJO(普通旧 Java 对象),而不是 TupleX 使用包含许多字段的数据类型。此外,POJO 可用于为大型 Tuple 公式命名。

而不是使用:

Tuple11<String, String, ..., String> var = new ...;

创建从大型元组类型扩展的自定义类型要容易得多。

CustomType var = new ...;

public static class CustomType extends Tuple11<String, String, ..., String> {
  // constructor matching super
}

使用 Logback 而不是 Log4j

注意:本教程适用于从 Flink 0.10 开始

Apache Flink 使用 slf4j 作为代码中的日志记录抽象。建议用户在其用户函数中使用 sfl4j。

Sfl4j 是一个编译时日志记录接口,可以在运行时使用不同的日志记录实现,例如 log4jLogback

Flink 默认依赖于 Log4j。本页介绍如何使用 Flink 和 Logback。用户报告说他们也可以使用本教程使用 Graylog 设置集中式日志记录。

要在代码中获取记录器实例,请使用以下代码:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyClass implements MapFunction {
  private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
  // ...

从 IDE / Java 应用程序运行 Flink 时使用 Logback

在所有情况下,类都是由依赖管理器(如 Maven)创建的类路径执行的,Flink 会将 log4j 拉入类路径。

因此,您需要从 Flink 的依赖项中排除 log4j。以下描述将假设从 Flink 快速入门 创建的 Maven 项目。

pom.xml 像这样更改项目文件:

<dependencies>
  <!-- Add the two required logback dependencies -->
  <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-core</artifactId>
    <version>1.1.3</version>
  </dependency>
  <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.1.3</version>
  </dependency>

  <!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
   Hadoop is logging to log4j! -->
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>log4j-over-slf4j</artifactId>
    <version>1.7.7</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.7-SNAPSHOT</version>
    <exclusions>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>*</artifactId>
      </exclusion>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.7-SNAPSHOT</version>
    <exclusions>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>*</artifactId>
      </exclusion>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.7-SNAPSHOT</version>
    <exclusions>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>*</artifactId>
      </exclusion>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
</dependencies>

&lt;dependencies&gt; 部分进行了以下更改:

  • log4j 从所有 Flink 依赖项中排除所有依赖项:这会导致 Maven 忽略 Flink 对 log4j 的传递依赖项。
  • slf4j-log4j12 从 Flink 的依赖项中排除工件:由于我们将使用 slf4j 进行 logback 绑定,因此我们必须将 slf4j 删除为 log4j 绑定。
  • 添加 Logback 依赖项: logback-corelogback-classic
  • 添加依赖项 log4j-over-slf4jlog4j-over-slf4j 是一种工具,允许直接使用 Log4j API 的遗留应用程序使用 Slf4j 接口。Flink 依赖于 Hadoop,它直接使用 Log4j 进行日志记录。因此,我们需要将所有记录器调用从 Log4j 重定向到 Slf4j,后者又记录到 Logback。

请注意,您需要手动将排除项添加到要添加到 pom 文件的所有新 Flink 依赖项中。

您可能还需要检查其他(非 Flink)依赖项是否正在引入 log4j 绑定。您可以使用分析项目的依赖关系 mvn dependency:tree

在群集上运行 Flink 时使用 Logback

本教程适用于在 YARN 上运行 Flink 或作为独立群集。

要使用 Logback 而不是使用 Flink 的 Log4j,您需要从目录中删除 log4j-1.2.xx.jar 和。 sfl4j-log4j12-xxx.jar``lib/

接下来,您需要将以下 jar 文件放入该 lib/ 文件夹:

  • logback-classic.jar
  • logback-core.jar
  • log4j-over-slf4j.jar :此桥接器需要存在于类路径中,以便将日志记录调用从 Hadoop(使用 Log4j)重定向到 Slf4j。

请注意, lib/ 在使用每个作业的 YARN 群集时,需要显式设置目录。

使用自定义记录器在 YARN 上提交 Flink 的命令是: ./bin/flink run -yt $FLINK_HOME/lib &lt;... remaining arguments ...&gt;

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。