返回介绍

并行执行

发布于 2025-05-02 18:19:16 字数 4312 浏览 0 评论 0 收藏 0

本节介绍如何在 Flink 中配置程序的并行执行。Flink 程序由多个任务(转换/ 算子,数据源和接收器)组成。任务被分成几个并行实例以供执行,每个并行实例处理任务输入数据的子集。任务的并行实例数称为 并行性 。

如果要使用 保存点 ,还应考虑设置最大并行度(或 max parallelism )。从保存点恢复时,您可以更改特定 算子或整个程序的并行度,此设置指定并行度的上限。这是必需的,因为 Flink 在内部将状态划分为 Keys 组,并且我们不能拥有 +Inf 多个 Keys 组,因为这会对性能产生不利影响。

设置并行性

可以在不同级别的 Flink 中指定任务的并行性:

算子级别

可以通过调用其 setParallelism() 方法来定义单个 算子,数据源或数据接收器的并行性 。例如,像这样:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
  .flatMap(new LineSplitter())
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
val env = StreamExecutionEnvironment.getExecutionEnvironment

val text = [...]
val wordCounts = text
  .flatMap{ _.split(" ") map { (_, 1) } }
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1).setParallelism(5)
wordCounts.print()

env.execute("Word Count Example")

运行环境级别

如此处所述 Flink 程序在运行环境的上下文中执行。运行环境为其执行的所有算子,数据源和数据接收器定义默认并行性。可以通过显式配置 算子的并行性来覆盖运行环境并行性。

可以通过调用 setParallelism() 方法来指定运行环境的默认并行性 。要以并行方式执行所有 算子,数据源和数据接收器,请 3 按如下方式设置运行环境的默认并行度:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)

val text = [...]
val wordCounts = text
  .flatMap{ _.split(" ") map { (_, 1) } }
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1)
wordCounts.print()

env.execute("Word Count Example")

客户级别

在向 Flink 提交作业时,可以在客户端设置并行性。客户端可以是 Java 或 Scala 程序。这种客户端的一个例子是 Flink 的命令行界面(CLI)。

对于 CLI 客户端,可以使用指定 parallelism 参数 -p 。例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

在 Java / Scala 程序中,并行性设置如下:

try {
  PackagedProgram program = new PackagedProgram(file, args);
  InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
  Configuration config = new Configuration();

  Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

  // set the parallelism to 10 here
  client.run(program, 10, true);

} catch (ProgramInvocationException e) {
  e.printStackTrace();
}
try {
  PackagedProgram program = new PackagedProgram(file, args)
  InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
  Configuration config = new Configuration()

  Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())

  // set the parallelism to 10 here
  client.run(program, 10, true)

} catch {
  case e: Exception => e.printStackTrace
}

系统级别

可以通过设置 parallelism.default 属性来定义所有运行环境的系统范围默认并行度 ./conf/flink-conf.yaml 。有关详细信息,请参阅 配置 文档

设置最大并行度

可以在可以设置并行度的位置设置最大并行度(客户端级别和系统级别除外)。而不是调用, setParallelism() 你调用 setMaxParallelism() 设置最大并行度。

最大并行度的默认设置大致 operatorParallelism + (operatorParallelism / 2) 为下限 127 和上限 32768

注意将最大并行度设置为非常大的值可能对性能有害,因为某些状态后台必须保持内部数据结构随 Keys 组的数量(这是可重新缓存状态的内部实现机制)进行扩展。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。