很简单,主要参考官方文档1、官方文档2和这篇文档。其中会补充我实践时使用的情况。
flink版本 1.11.2。本文例子来自官网,完整例子来自github
编写单元测试是设计生产应用程序的基本任务之一。如果不进行测试,那么一个很小的代码变更都会导致生产任务的失败。因此,无论是清理数据、模型训练的简单作业,还是复杂的多租户实时数据处理系统,我们都应该为所有类型的应用程序编写单元测试。下面我们将提供有关 Apache Flink 应用程序的单元测试指南。Apache Flink 提供了一个强大的单元测试框架,以确保我们的应用程序在上线后符合我们的预期。
1. Maven依赖
如果我们要使用 Apache Flink 提供的单元测试框架,我们需要引入如下依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>test</scope><classifier>tests</classifier></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>test</scope><classifier>tests</classifier></dependency>
注意:
- 由于需要测试 JAR 包:org.apache.flink:flink-runtime_2.11:tests:1.11.2 和 org.apache.flink:flink-streaming-java_2.11:tests:1.11.2,所以依赖需要制定 classifier 为 tests。
- 我使用的是官方推荐maven构建flink的项目,pom.xml文件中已经有了flink-streaming-java_${scala.binary.version}这个项,那么此时不能直接使用上面的依赖写法,会由冲突,编译很久或失败。问题的关键点在于
test ,删除第一个即可。 - 构建单元测试时,需要在src下,复制一份main的目录结构,并将main改为test,大致如下。测试哪个文件,其测试文件名加个Test。

对于不同的算子,单元测试的编写也不一样。我们可以分为如下三种:
- 无状态算子
- 有状态算子
- 定时处理算子(ProcessFunction)
2. 无状态算子
只有在使用 Test Harnesses 时,我们才需要上述 Maven 依赖,所以在编写无状态算子的单元测试时,可以不添加上述 Maven 依赖。
无状态算子的单元测试编写比较简单。我们只需要遵循编写测试用例的基本规范,即创建函数类的实例并测试适当的方法。我们以一个简单的 Map 算子为例:
package com.flink.example.test;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.util.Collector;/*** 无状态算子单元测试 FlatMap* Created by wy on 2020/11/8.*/public class MyStatelessMap implements MapFunction<String, String> {@Overridepublic String map(String in) throws Exception {String out = "hello " + in;return out;}}
还有一个无状态的FlatMap算子:
package com.flink.example.test;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.util.Collector;public class MyStatelessFlatMap implements FlatMapFunction<String, String> {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {String out = "hello " + s;collector.collect(out);}}
上述两个算子的单元测试用例如下所示:
package com.flink.example.test;import com.google.common.collect.Lists;import org.apache.flink.api.common.functions.util.ListCollector;import org.junit.Assert;import org.junit.Test;import java.util.ArrayList;import java.util.List;public class StatelessUnitTest {@Testpublic void MyStatelessMap() throws Exception {MyStatelessMap statelessMap = new MyStatelessMap();String out = statelessMap.map("world");Assert.assertEquals("hello world", out);}@Testpublic void MyStatelessFlatMap() throws Exception {MyStatelessFlatMap statelessFlatMap = new MyStatelessFlatMap();List<String> out = new ArrayList<>();ListCollector<String> listCollector = new ListCollector<>(out);statelessFlatMap.flatMap("world", listCollector);Assert.assertEquals(Lists.newArrayList("hello world"), out);}}
FlatMap 算子需要一个 Collector 对象以及一个输入参数。编写测试用例,我们有如下两种方式:
- 使用 Mockito 模拟 Collector 对象
- 使用 Flink 提供的 ListCollector
我更喜欢第二种方法,因为只需要很少的代码即可,并且适合大多数情况。
3. 有状态算子
测试有状态算子(使用状态或者定时器)会比较困难。因为用户代码需要与 Flink Runtime 进行交互。为此,Flink 提供了一组 TestHarness,可用于测试用户定义的函数以及自定义算子:
- OneInputStreamOperatorTestHarness:适用于 DataStreams 上的算子
- KeyedOneInputStreamOperatorTestHarness:适用于 KeyedStreams 上的算子
- TwoInputStreamOperatorTestHarness:用于两个数据流的 ConnectedStream 的算子
- KeyedTwoInputStreamOperatorTestHarness:用于两个 KeyedStream 的 ConnectedStream 上的算子
我们以有状态的 FlatMap 函数为例:
package com.flink.example.test;import org.apache.flink.api.common.functions.RichFlatMapFunction;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.configuration.Configuration;import org.apache.flink.util.Collector;public class MyStatefulFlatMap extends RichFlatMapFunction<String, Long> {ValueState<Long> counterState;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("Counter",Types.LONG);this.counterState = getRuntimeContext().getState(descriptor);}@Overridepublic void flatMap(String s, Collector<Long> collector) throws Exception {Long count = 0L;if (this.counterState.value() != null) {count = this.counterState.value();}count ++;this.counterState.update(count);collector.collect(count);}}
编写上述类的单元测试最复杂部分是模拟应用程序的配置以及运行时上下文。我们使用 Flink 提供的 TestHarness 类,这样我们就不必自己创建模拟对象。使用 KeyedOperatorHarness 类进行单元测试如下所示:
package com.flink.example.test;import com.google.common.collect.Lists;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.operators.StreamFlatMap;import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;import org.junit.Assert;import org.junit.Before;import org.junit.Test;public class MyStatefullFlatMapUnitTest {private KeyedOneInputStreamOperatorTestHarness<String, String, Long> testHarness;private MyStatefulFlatMap statefulFlatMap;@Beforepublic void setupTestHarness() throws Exception {statefulFlatMap = new MyStatefulFlatMap();// KeyedOneInputStreamOperatorTestHarness 需要三个参数:算子对象、键 Selector、键类型testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMap),x -> "1",Types.STRING);testHarness.open();}@Testpublic void MyStatefulFlatMap() throws Exception{// test first recordtestHarness.processElement("a", 10);//Assert.assertEquals(Lists.newArrayList(new StreamRecord<>(1L, 10)),this.testHarness.extractOutputStreamRecords());// test second recordtestHarness.processElement("b", 20);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>(1L, 10),new StreamRecord<>(2L, 20)),testHarness.extractOutputStreamRecords());// test other recordtestHarness.processElement("c", 30);testHarness.processElement("d", 40);testHarness.processElement("e", 50);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>(1L, 10),new StreamRecord<>(2L, 20),new StreamRecord<>(3L, 30),new StreamRecord<>(4L, 40),new StreamRecord<>(5L, 50)),testHarness.extractOutputStreamRecords());}}
KeyedOneInputStreamOperatorTestHarness
TestHarness 提供了许多辅助方法,上述代码使用了其中三种:
- open:使用相关参数调用 FlatMap 函数的 open 方法,同时还会对上下文进行初始化。
- processElement:允许用户传入输入元素以及与该元素关联的时间戳。
- extractOutputStreamRecords:从 Collector 获取输出记录以及时间戳。
TestHarness 极大地简化了有状态算子的单元测试。
4. 定时处理算子
为与时间有关的 Process Function 编写单元测试与为有状态算子编写单元测试非常相似,我们都需要使用 TestHarness。但是,在这我们还需要考虑另一个方面,即为事件提供时间戳并控制应用程序的当前时间。通过设置当前(处理时间或事件时间)时间,我们可以触发注册的计时器,并调用该函数的 onTimer 方法:
package com.flink.example.test;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;public class TimerProcessFunction extends KeyedProcessFunction<String, String, String> {@Overridepublic void processElement(String s, Context context, Collector<String> collector) throws Exception {context.timerService().registerProcessingTimeTimer(50);String out = "hello " + s;collector.collect(out);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {// 到达时间点触发事件操作out.collect(String.format("Timer triggered at timestamp %d", timestamp));}
我们需要测试 KeyedProcessFunction 中的两个方法,即 processElement 和 onTimer 方法。使用 TestHarness,我们可以控制函数的当前时间。因此,我们可以随意触发计时器,而不必等待特定的时间:
package com.flink.example.test;import com.google.common.collect.Lists;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.operators.KeyedProcessOperator;import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;import org.junit.Assert;import org.junit.Before;import org.junit.Test;public class TimerProcessFunctionUnitTest {private OneInputStreamOperatorTestHarness<String, String> testHarness;private TimerProcessFunction processFunction;@Beforepublic void setupTestHarness() throws Exception {processFunction = new TimerProcessFunction();// KeyedOneInputStreamOperatorTestHarness 需要三个参数:算子对象、键 Selector、键类型testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new KeyedProcessOperator<>(processFunction),x -> "1",Types.STRING);// Function time is initialized to 0testHarness.open();}@Testpublic void testProcessElement() throws Exception{testHarness.processElement("world", 10);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());}@Testpublic void testOnTimer() throws Exception {// test first recordtestHarness.processElement("world", 10);Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// Function time 设置为 50testHarness.setProcessingTime(50);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("Timer triggered at timestamp 50")),testHarness.extractOutputStreamRecords());}}
考虑到 ProcessFunction 的重要性,除了上面可以直接用于测试 ProcessFunction 的 TestHarness 之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的 TestHarness 工厂,可以大大简化 TestHarness 的实例化。如下所示:
package com.flink.example.test;import org.apache.flink.streaming.api.functions.ProcessFunction;import org.apache.flink.util.Collector;public class MyProcessFunction extends ProcessFunction<Integer, Integer> {@Overridepublic void processElement(Integer integer, Context context, Collector<Integer> collector) throws Exception {collector.collect(integer);}}
通过传递适当的参数并验证输出,使用 ProcessFunctionTestHarnesses 进行单元测试会更加容易:
package com.flink.example.test;import com.google.common.collect.Lists;import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;import org.junit.Assert;import org.junit.Test;public class ProcessFunctionUnitTest {@Testpublic void testPassThrough() throws Exception {MyProcessFunction processFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = ProcessFunctionTestHarnesses.forProcessFunction(processFunction);testHarness.processElement(1, 10);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>(1, 10)),testHarness.extractOutputStreamRecords());}}
有关如何使用 ProcessFunctionTestHarnesses 测试 ProcessFunction 不同风味(例如 KeyedProcessFunction,KeyedCoProcessFunction,BroadcastProcessFunction等)的更多示例,,可以参阅 ProcessFunctionTestHarnessesTest:
public static <IN, OUT> OneInputStreamOperatorTestHarness<IN, OUT> forProcessFunction(ProcessFunction<IN, OUT> function) throws Exception {}public static <K, IN, OUT> KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> forKeyedProcessFunction(KeyedProcessFunction<K, IN, OUT> function, KeySelector<IN, K> keySelector, TypeInformation<K> keyType) throws Exception {}public static <IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> forCoProcessFunction(CoProcessFunction<IN1, IN2, OUT> function) throws Exception {}public static <K, IN1, IN2, OUT> KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> forKeyedCoProcessFunction(KeyedCoProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType) throws Exception {}public static <IN1, IN2, OUT> BroadcastOperatorTestHarness<IN1, IN2, OUT> forBroadcastProcessFunction(BroadcastProcessFunction<IN1, IN2, OUT> function, MapStateDescriptor... descriptors) throws Exception {}public static <K, IN1, IN2, OUT> KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> forKeyedBroadcastProcessFunction(KeyedBroadcastProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector, TypeInformation<K> keyType, MapStateDescriptor... descriptors) throws Exception {}
另一个完整的单元测试样例:
package com.flink.example.test;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.streaming.api.functions.ProcessFunction;import org.apache.flink.streaming.api.functions.co.CoProcessFunction;import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;import org.apache.flink.util.Collector;import org.junit.Assert;import org.junit.Test;import java.util.Arrays;import java.util.Collections;/*** ProcessFunction 单元测试示例* Created by wy on 2020/11/9.*/public class ProcessFunctionTestHarnessesTest {@Testpublic void testHarnessForProcessFunction() throws Exception {ProcessFunction<Integer, Integer> function = new ProcessFunction<Integer, Integer> () {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {out.collect(value);}};OneInputStreamOperatorTestHarness<Integer, Integer> harness = ProcessFunctionTestHarnesses.forProcessFunction(function);harness.processElement(1, 10);Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1));}@Testpublic void testHarnessForKeyedProcessFunction() throws Exception {KeyedProcessFunction<Integer, Integer, Integer> function = new KeyedProcessFunction<Integer, Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {out.collect(value);}};OneInputStreamOperatorTestHarness<Integer, Integer> harness = ProcessFunctionTestHarnesses.forKeyedProcessFunction(function, x -> x, BasicTypeInfo.INT_TYPE_INFO);harness.processElement(1, 10);Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1));}@Testpublic void testHarnessForCoProcessFunction() throws Exception {CoProcessFunction<Integer, String, Integer> function = new CoProcessFunction<Integer, String, Integer>() {@Overridepublic void processElement1(Integer value, Context ctx, Collector<Integer> out) throws Exception {out.collect(value);}@Overridepublic void processElement2(String value, Context ctx, Collector<Integer> out) throws Exception {out.collect(Integer.parseInt(value));}};TwoInputStreamOperatorTestHarness<Integer, String, Integer> harness = ProcessFunctionTestHarnesses.forCoProcessFunction(function);harness.processElement2("0", 1);harness.processElement1(1, 10);Assert.assertEquals(harness.extractOutputValues(), Arrays.asList(0, 1));}@Testpublic void testHarnessForKeyedCoProcessFunction() throws Exception {KeyedCoProcessFunction<Integer, Integer, Integer, Integer> function = new KeyedCoProcessFunction<Integer, Integer, Integer, Integer>() {@Overridepublic void processElement1(Integer value, Context ctx, Collector<Integer> out) throws Exception {out.collect(value);}@Overridepublic void processElement2(Integer value, Context ctx, Collector<Integer> out) throws Exception {out.collect(value);}};KeyedTwoInputStreamOperatorTestHarness<Integer, Integer, Integer, Integer> harness = ProcessFunctionTestHarnesses.forKeyedCoProcessFunction(function, x -> x, x -> x, TypeInformation.of(Integer.class));harness.processElement1(0, 1);harness.processElement2(1, 10);Assert.assertEquals(harness.extractOutputValues(), Arrays.asList(0, 1));}}
