- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
最佳实践
此页面包含 Flink 程序员关于如何解决经常遇到的问题的最佳实践的集合。
解析命令行参数并在 Flink 应用程序中传递它们
几乎所有 Flink 应用程序(批处理和流式处理)都依赖于外部配置参数。它们用于指定输入和输出源(如路径或地址),系统参数(并行性,运行时配置)和特定于应用程序的参数(通常在用户函数中使用)。
Flink 提供了一个简单的实用程序, ParameterTool
用于提供一些解决这些问题的基本工具。请注意,您不必使用 ParameterTool
此处描述的内容。其他框架(如 Commons CLI 和 argparse4j) 也适用于 Flink。
获取配置值 ParameterTool
它 ParameterTool
提供了一组用于读取配置的预定义静态方法。该工具在内部期望 a Map<String, String>
,因此很容易将其与您自己的配置风格集成。
来自 .properties
文件
以下方法将读取 属性 文件并提供键/值对:
String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
从命令行参数
这允许 --input hdfs:///mydata --elements 42
从命令行获取参数。
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. regular code ..
来自系统属性
启动 JVM 时,您可以将系统属性传递给它: -Dinput=hdfs:///mydata
。您还可以 ParameterTool
从这些系统属性初始化:
ParameterTool parameter = ParameterTool.fromSystemProperties();
使用 Flink 程序中的参数
现在我们已经从某个地方获得了参数(见上文),我们可以通过各种方式使用它们。
直接来自 ParameterTool
它 ParameterTool
本身有访问值的方法。
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.
您可以直接在 main()
提交应用程序的客户端的方法中使用这些方法的返回值。例如,您可以设置 算子的并行度,如下所示:
ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
由于 ParameterTool
可序列化,您可以将其传递给函数本身:
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
然后在函数内部使用它从命令行获取值。
全局注册参数
注册为全局作业参数的参数 ExecutionConfig
可以作为 JobManager Web 界面中的配置值以及用户定义的所有函数进行访问。
全局注册参数:
ParameterTool parameters = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
在任何丰富的用户函数中访问它们:
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
命名大型 TupleX 类型
建议使用 POJO(普通旧 Java 对象),而不是 TupleX
使用包含许多字段的数据类型。此外,POJO 可用于为大型 Tuple
公式命名。
例
而不是使用:
Tuple11<String, String, ..., String> var = new ...;
创建从大型元组类型扩展的自定义类型要容易得多。
CustomType var = new ...;
public static class CustomType extends Tuple11<String, String, ..., String> {
// constructor matching super
}
使用 Logback 而不是 Log4j
注意:本教程适用于从 Flink 0.10 开始
Apache Flink 使用 slf4j 作为代码中的日志记录抽象。建议用户在其用户函数中使用 sfl4j。
Sfl4j 是一个编译时日志记录接口,可以在运行时使用不同的日志记录实现,例如 log4j 或 Logback 。
Flink 默认依赖于 Log4j。本页介绍如何使用 Flink 和 Logback。用户报告说他们也可以使用本教程使用 Graylog 设置集中式日志记录。
要在代码中获取记录器实例,请使用以下代码:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyClass implements MapFunction {
private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
// ...
从 IDE / Java 应用程序运行 Flink 时使用 Logback
在所有情况下,类都是由依赖管理器(如 Maven)创建的类路径执行的,Flink 会将 log4j 拉入类路径。
因此,您需要从 Flink 的依赖项中排除 log4j。以下描述将假设从 Flink 快速入门 创建的 Maven 项目。
pom.xml
像这样更改项目文件:
<dependencies>
<!-- Add the two required logback dependencies -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.3</version>
</dependency>
<!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
Hadoop is logging to log4j! -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
该 <dependencies>
部分进行了以下更改:
log4j
从所有 Flink 依赖项中排除所有依赖项:这会导致 Maven 忽略 Flink 对 log4j 的传递依赖项。slf4j-log4j12
从 Flink 的依赖项中排除工件:由于我们将使用 slf4j 进行 logback 绑定,因此我们必须将 slf4j 删除为 log4j 绑定。- 添加 Logback 依赖项:
logback-core
和logback-classic
- 添加依赖项
log4j-over-slf4j
。log4j-over-slf4j
是一种工具,允许直接使用 Log4j API 的遗留应用程序使用 Slf4j 接口。Flink 依赖于 Hadoop,它直接使用 Log4j 进行日志记录。因此,我们需要将所有记录器调用从 Log4j 重定向到 Slf4j,后者又记录到 Logback。
请注意,您需要手动将排除项添加到要添加到 pom 文件的所有新 Flink 依赖项中。
您可能还需要检查其他(非 Flink)依赖项是否正在引入 log4j 绑定。您可以使用分析项目的依赖关系 mvn dependency:tree
。
在群集上运行 Flink 时使用 Logback
本教程适用于在 YARN 上运行 Flink 或作为独立群集。
要使用 Logback 而不是使用 Flink 的 Log4j,您需要从目录中删除 log4j-1.2.xx.jar
和。 sfl4j-log4j12-xxx.jar``lib/
接下来,您需要将以下 jar 文件放入该 lib/
文件夹:
logback-classic.jar
logback-core.jar
log4j-over-slf4j.jar
:此桥接器需要存在于类路径中,以便将日志记录调用从 Hadoop(使用 Log4j)重定向到 Slf4j。
请注意, lib/
在使用每个作业的 YARN 群集时,需要显式设置目录。
使用自定义记录器在 YARN 上提交 Flink 的命令是: ./bin/flink run -yt $FLINK_HOME/lib <... remaining arguments ...>
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论