返回介绍

测试

发布于 2025-05-02 18:19:14 字数 5549 浏览 0 评论 0 收藏 0

本页简要讨论如何在 IDE 或本地环境中测试 Flink 应用程序。

单元测试

通常,可以假设 Flink 在用户定义之外产生正确的结果 Function 。因此,建议 Function 尽可能使用单元测试来测试包含主业务逻辑的类。

例如,如果实现以下内容 ReduceFunction

public class SumReduce implements ReduceFunction<Long> {

  @Override
  public Long reduce(Long value1, Long value2) throws Exception {
    return value1 + value2;
  }
}
class SumReduce extends ReduceFunction[Long] {

  override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = {
    value1 + value2
  }
}

通过传递合适的参数并验证输出,可以很容易地使用您喜欢的框架对其进行单元测试:

public class SumReduceTest {

  @Test
  public void testSum() throws Exception {
    // instantiate your function
    SumReduce sumReduce = new SumReduce();

    // call the methods that you have implemented
    assertEquals(42L, sumReduce.reduce(40L, 2L));
  }
}
class SumReduceTest extends FlatSpec with Matchers {

  "SumReduce" should "add values" in {
    // instantiate your function
    val sumReduce: SumReduce = new SumReduce()

    // call the methods that you have implemented
    sumReduce.reduce(40L, 2L) should be (42L)
  }
}

集成测试

为了端到端测试 Flink 流管道,您还可以编写针对本地 Flink 迷你集群执行的集成测试。

为此,添加测试依赖项 flink-test-utils

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils_2.11</artifactId>
  <version>1.7-SNAPSHOT</version>
</dependency>

例如,如果要测试以下内容 MapFunction

public class MultiplyByTwo implements MapFunction<Long, Long> {

  @Override
  public Long map(Long value) throws Exception {
    return value * 2;
  }
}
class MultiplyByTwo extends MapFunction[Long, Long] {

  override def map(value: Long): Long = {
    value * 2
  }
}

您可以编写以下集成测试:

public class ExampleIntegrationTest extends AbstractTestBase {

  @Test
  public void testMultiply() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // configure your test environment
    env.setParallelism(1);

    // values are collected in a static variable
    CollectSink.values.clear();

    // create a stream of custom elements and apply transformations
    env.fromElements(1L, 21L, 22L)
        .map(new MultiplyByTwo())
        .addSink(new CollectSink());

    // execute
    env.execute();

    // verify your results
    assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
  }

  // create a testing sink
  private static class CollectSink implements SinkFunction<Long> {

    // must be static
    public static final List<Long> values = new ArrayList<>();

    @Override
    public synchronized void invoke(Long value) throws Exception {
      values.add(value);
    }
  }
}
class ExampleIntegrationTest extends AbstractTestBase {

  @Test
  def testMultiply(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // configure your test environment
    env.setParallelism(1)

    // values are collected in a static variable
    CollectSink.values.clear()

    // create a stream of custom elements and apply transformations
    env
      .fromElements(1L, 21L, 22L)
      .map(new MultiplyByTwo())
      .addSink(new CollectSink())

    // execute
    env.execute()

    // verify your results
    assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
  }
}  

// create a testing sink class CollectSink extends SinkFunction[Long] {

  override def invoke(value: java.lang.Long): Unit = {
    synchronized {
      values.add(value)
    }
  }
}

object CollectSink {

  // must be static
  val values: List[Long] = new ArrayList()
}

CollectSink 此处使用静态变量 in ,因为 Flink 在将所有 算子分布到集群之前将其序列化。通过静态变量与本地 Flink 迷你集群实例化的算子进行通信是解决此问题的一种方法。或者,您可以使用测试接收器将数据写入临时目录中的文件。您还可以实现自己的自定义源以发出水印。

测试检查点和状态处理

测试状态处理的一种方法是在集成测试中启用检查点。

您可以通过 StreamExecutionEnvironment 在测试中配置来完成此 算子操作:

env.enableCheckpointing(500);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
env.enableCheckpointing(500)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))

例如,向 Flink 应用程序添加一个身份映射器 算子,该 算子将每次抛出一次异常 1000ms 。但是,由于动作之间存在时间依赖关系,因此编写此类测试可能会非常棘手。

另一种方法是写使用 Flink 内部测试效用一个单元测试 AbstractStreamOperatorTestHarnessflink-streaming-java 模块。

对于如何做到这一点,请看看在一个例子 org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest 中也 flink-streaming-java 模块。

请注意, AbstractStreamOperatorTestHarness 目前它不是公共 API 的一部分,可能会有所变化。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。