返回介绍

本地执行

发布于 2025-05-02 18:19:15 字数 3721 浏览 0 评论 0 收藏

即使在单个 Java 虚拟机中,Flink 也可以在一台机器上运行。这允许用户在本地测试和调试 Flink 程序。本节概述了本地执行机制。

本地环境和执行程序允许您在本地 Java 虚拟机中运行 Flink 程序,或在任何 JVM 中作为现有程序的一部分运行。只需点击 IDE 的“运行”按钮,即可在本地启动大多数示例。

Flink 支持两种不同的本地执行。在 LocalExecutionEnvironment 开始全面运行 Flink,包括 JobManager 和 TaskManager。这些包括内存管理和在群集模式下执行的所有内部算法。

CollectionEnvironment 正在执行的 Java 集合 Flink 程序。此模式不会启动完整的 Flink 运行时,因此执行的开销和轻量级都非常低。例如, DataSet.map() 通过将 map() 函数应用于 Java 列表中的所有数据元来执行转换。

调试

如果您在本地运行 Flink 程序,您也可以像调试任何其他 Java 程序一样调试程序。您可以使用 System.out.println() 写出一些内部变量,也可以使用调试器。可以在其中设置断点 map()reduce() 以及所有其他方法。另请参阅 Java API 文档中的 调试部分 ,以获取 Java API 中的测试和本地调试实用程序指南。

Maven 依赖

如果您在 Maven 项目中开发程序,则必须 flink-clients 使用此依赖项添加模块:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.11</artifactId>
  <version>1.7-SNAPSHOT</version>
</dependency>

本地环境

LocalEnvironment 是 Flink 程序本地执行的句柄。使用它在本地 JVM 中运行程序 - 独立或嵌入其他程序。

通过该方法实例化本地环境 ExecutionEnvironment.createLocalEnvironment() 。默认情况下,它将使用尽可能多的本地线程来执行,因为您的计算机具有 CPU 核心(硬件上下文)。您也可以指定所需的并行度。可以将本地环境配置为使用 enableLogging() / 登录控制台 disableLogging()

在大多数情况下,呼叫 ExecutionEnvironment.getExecutionEnvironment() 是更好的方式。该方法 LocalEnvironment 在本地启动程序时(在命令行界面之外)返回,并在 命令行界面 调用程序时返回预先配置的集群运行环境。

public static void main(String[] args) throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

  DataSet<String> data = env.readTextFile("file:///path/to/file");

  data
    .filter(new FilterFunction<String>() {
      public boolean filter(String value) {
        return value.startsWith("http://");
      }
    })
    .writeAsText("file:///path/to/result");

  JobExecutionResult res = env.execute();
}

JobExecutionResult 执行完成后返回的对象包含程序运行时和累加器结果。

LocalEnvironment 还允许通过自定义的配置值 Flink。

Configuration conf = new Configuration();
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);

注意: 本地运行环境不会启动任何 Web 前端来监视执行。

集合环境

使用 Java 执行 Java Collections CollectionEnvironment 是一种执行 Flink 程序的低开销方法。此模式的典型用例是自动测试,调试和代码重用。

对于更具交互性的情况,用户也可以使用为批处理实现的算法。可以在 Java Application Server 中使用稍微更改的 Flink 程序变体来处理传入的请求。

基于集合的执行的框架

public static void main(String[] args) throws Exception {
  // initialize a new Collection-based execution environment
  final ExecutionEnvironment env = new CollectionEnvironment();

  DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */);

  /* Data Set transformations ... */

  // retrieve the resulting Tuple2 elements into a ArrayList.
  Collection<...> result = new ArrayList<...>();
  resultDataSet.output(new LocalCollectionOutputFormat<...>(result));

  // kick off execution.
  env.execute();

  // Do some work with the resulting ArrayList (=Collection).
  for(... t : result) {
    System.err.println("Result = "+t);
  }
}

flink-examples-batch 模块包含一个完整的示例,称为 CollectionExecutionExample

请注意,基于集合的 Flink 程序的执行仅适用于适合 JVM 堆的小数据。集合上的执行不是多线程的,只使用一个线程。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。