Skip to main content

Testing your DataStream application

This is an introduction to the different types of tests you should consider writing for your Flink applications that use the DataStream API.

End-to-end integration tests

This kind of testing can be done entirely with Flink's public APIs, and each of the recipes in our Apache Flink Cookbook has one or more examples of end-to-end integration tests.

Some of these tests are disabled because they use data generators to provide unbounded input streams, and need to be run manually because they never end. testProductionJob in SplitStreamTest.java from the Splitting Apache Kafka® events recipe is a good example. This particular test is using an embedded Kafka cluster:

SplitStreamTest.java

_83
package com.immerok.cookbook;
_83
_83
import static com.immerok.cookbook.SplitStream.TOPIC;
_83
import static org.assertj.core.api.Assertions.assertThat;
_83
_83
import com.immerok.cookbook.events.Event;
_83
import com.immerok.cookbook.events.EventSupplier;
_83
import com.immerok.cookbook.extensions.MiniClusterExtensionFactory;
_83
kafka.createTopicAsync(TOPIC, Stream.generate(new EventSupplier()));

The test below is verifying the implementation's behavior. Pay particular attention to the defineWorkflow method: each of our recipes has a defineWorkflow method that sets up the pipeline for the job. This allows us to easily use different sources and sinks for testing.

SplitStreamTest.java

_83
package com.immerok.cookbook;
_83
_83
import static com.immerok.cookbook.SplitStream.TOPIC;
_83
import static org.assertj.core.api.Assertions.assertThat;
_83
_83
import com.immerok.cookbook.events.Event;
_83
import com.immerok.cookbook.events.EventSupplier;
_83
import com.immerok.cookbook.extensions.MiniClusterExtensionFactory;
_83
import com.immerok.cookbook.utils.CookbookKafkaCluster;
_83
import java.util.Iterator;
_83
import java.util.stream.Stream;
_83
import org.apache.flink.streaming.api.datastream.DataStream;
_83
import org.apache.flink.streaming.api.datastream.DataStreamSource;
_83
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
_83
import org.apache.flink.test.junit5.MiniClusterExtension;
_83
import org.apache.flink.types.PojoTestUtils;
_83
import org.apache.flink.util.CloseableIterator;
_83
import org.junit.jupiter.api.Disabled;
_83
import org.junit.jupiter.api.Test;
_83
import org.junit.jupiter.api.extension.RegisterExtension;
_83
_83
class SplitStreamTest {
_83
_83
@RegisterExtension
_83
static final MiniClusterExtension FLINK =
_83
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Unit tests for stateless functions

Writing good tests for something like a FilterFunction or a FlatMapFunction is straightforward, and doesn't require doing anything Flink-specific.

Here's a small example:


_15
public class HotFilter implements FilterFunction<Integer> {
_15
@Override
_15
public boolean filter(Integer temperature) {
_15
return temperature >= 100;
_15
}
_15
}
_15
_15
public class HotFilterTest {
_15
@Test
_15
public void testCoolTemperature() {
_15
HotFilter hotFilter = new HotFilter();
_15
_15
assertFalse(hotFilter.filter(20));
_15
}
_15
}

Unit tests for functions that use managed state

Where testing becomes more challenging is when you want to write tests to cover cases where factors such as timing, watermarks, race conditions, and state management can affect the results. This kind of testing would be difficult to do without some support, but fortunately Flink itself includes many tests like this, and has some machinery you can borrow for writing your own tests. This is where Flink's test harnesses come into play.

As outlined above, many useful tests can, and should, be written without resorting to using the test harnesses described below.

caution

The test harnesses are not part of Flink's public API, and could be changed in a minor release.

These test harnesses were created for the purpose of testing Flink itself, and their focus is on testing Flink's operators.

Every user function you write, whether it's a MapFunction, or a KeyedProcessFunction, for example, implements methods such as open, map, processElement, onTimer, etc., that are called by an Operator designed to work with instances of that specific class of user functions.

Because these harnesses are for testing operators, to use them you need to know what type of operator to instantiate in order to test your user function. Here are the Operator classes to use for most of the relevant function types:

User function classOperator class
RichFilterFunctionStreamFilter
RichMapFunctionStreamMap
RichFlatMapFunctionStreamFlatMap
RichCoFlatMapFunctionCoStreamFlatMap
ProcessFunctionProcessOperator
CoProcessFunctionCoProcessOperator
KeyedProcessFunctionKeyedProcessOperator
KeyedCoProcessFunctionKeyedCoProcessOperator
BroadcastProcessFunctionCoBroadcastWithNonKeyedOperator
KeyedBroadcastProcessFunctionCoBroadcastWithKeyedOperator

You can also use these test harnesses to test window operators, but that is more complex to set up. You can find examples of that in the Flink sources, but for many use cases a combination of unit tests for the WindowProcessFunction and some end-to-end integration tests should suffice.

Choosing the appropriate test harness

There are four classes of test harness, depending on whether the operator being tested has one or two input streams, and whether it operates on keyed or non-keyed streams.

keyednon-keyed
one inputKeyedOneInputStreamOperatorTestHarnessOneInputStreamOperatorTestHarness
two inputKeyedTwoInputStreamOperatorTestHarnessTwoInputStreamOperatorTestHarness

Example

See the recipe on unit testing with Flink's test harnesses for a complete example.