Unit testing with Apache Flink's® test harnesses
Visit the enrichment-join-with-buffering recipe on GitHub.
Introduction
In this recipe you will write unit tests for functions that use managed state.
In particular, you will write unit tests for the KeyedCoProcessFunction at the heart of the recipe that implements a buffered enrichment join.
That job has two source streams, transactions
and products
, and connects them to enrich each Transaction with the Product being sold:
flowchart LR
T(transactions) & P(products) --> J(Join)
J --> S( )
style T fill:#E5E8FF,stroke:#3A4BDC
style P fill:#E2FDB5,stroke:#4A9505
style J fill:#FFFFAD,stroke:#8A8A00
style S fill:#FFFFFF,stroke:#FFFFFF
linkStyle 0 stroke:#3A4BDC
linkStyle 1 stroke:#4A9505
linkStyle 2 stroke:#8A8A00
To test this properly, you need to test that this join behaves correctly, regardless of which event is processed first by the join: the Transaction event to be enriched, or the Product event used for enrichment. This means verifying not only that the results are correct, but also that any managed state that gets used is freed when it is no longer needed.
This sort of testing isn't possible in an integration test, because Flink's APIs don't provide enough control over the runtime, nor do they provide enough visibility into the state backend.
Fortunately, Flink includes test harnesses that provide this functionality.
Using Flink's test harnesses
For an introduction to these test harnesses, and their relationship to operators and user functions, please see the section on getting started with Flink's test harnesses in the explainer on testing DataStream applications.
Necessary dependencies
To use these test harnesses, you will first add some dependencies to your project:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.immerok</groupId>
<artifactId>cookbook-enrichment-join-with-buffering</artifactId>
<packaging>jar</packaging>
<name>Flink Cookbook Recipe - Enrichment Join with Buffering</name>
<flink.version>1.16.0</flink.version>
<jackson.databind.version>2.14.1</jackson.databind.version>
<junit.jupiter.version>5.9.1</junit.jupiter.version>
<kafka.version>3.2.2</kafka.version>
<log4j.version>2.19.0</log4j.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<artifactId>flink-streaming-java</artifactId>
Creating the test harness
With all the dependencies now in place, you are ready to start writing tests.
In this recipe, the initializeTestHarness
method is used to create a test harness before each test:
EnrichmentJoinUnitTestBase.java
_166package com.immerok.cookbook;
_166import static org.assertj.core.api.Assertions.assertThat;
_166import com.immerok.cookbook.records.Product;
_166import com.immerok.cookbook.records.Transaction;
_166import java.math.BigDecimal;
_166import org.apache.flink.api.common.typeinfo.Types;
_166import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
_166import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
_166import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
_166import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
_166import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
_166import org.junit.jupiter.api.BeforeEach;
_166import org.junit.jupiter.api.Test;
_166public abstract class EnrichmentJoinUnitTestBase {
_166 private KeyedTwoInputStreamOperatorTestHarness<Long, Transaction, Product, Transaction> harness;
In this case you are testing a join that is implemented with a KeyedCoProcessFunction, so the appropriate operator is a KeyedCoProcessOperator, to which you should pass an instance of the KeyedCoProcessFunction being tested.
With any keyed operator, the test harness will need to know how to compute the key(s), and their type, so that information is passed to the test harness' constructor, along with the operator.
The recipe you are testing includes two implementations of the same enrichment join, one using ListState, and the other using MapState. The testing criteria are the same for both implementations, so the tests have been implemented in a EnrichmentJoinUnitTestBase
class that is extended by EnrichmentJoinUsingListStateUnitTest
and EnrichmentJoinUsingMapStateUnitTest
to create two identical test suites:
EnrichmentJoinUsingListStateUnitTest.java
_13package com.immerok.cookbook;
_13import com.immerok.cookbook.functions.EnrichmentJoinUsingListState;
_13import com.immerok.cookbook.records.Product;
_13import com.immerok.cookbook.records.Transaction;
_13 public KeyedCoProcessFunction<Long, Transaction, Product, Transaction> getFunctionToTest() {
EnrichmentJoinUsingMapStateUnitTest.java
_13package com.immerok.cookbook;
_13import com.immerok.cookbook.functions.EnrichmentJoinUsingMapState;
_13import com.immerok.cookbook.records.Product;
_13import com.immerok.cookbook.records.Transaction;
_13 public KeyedCoProcessFunction<Long, Transaction, Product, Transaction> getFunctionToTest() {
Writing tests
The KeyedCoProcessFunction you are testing has processElement1
and processElement2
callbacks that are called by the KeyedCoProcessOperator. That operator also has processElement1
and processElement2
callbacks, which are in turn called by the Flink runtime. Except in this case, you are going to replace the Flink runtime with a test harness, which will call those operator callbacks directly.
Here is the implementation for KeyedCoProcessOperator#processElement1
, copied from the Flink sources:
KeyedCoProcessOperator.java
_6 public void processElement1(StreamRecord<IN1> element) throws Exception {
_6 collector.setTimestamp(element);
_6 context.element = element;
_6 userFunction.processElement1(element.getValue(), context, collector);
_6 context.element = null;
Note that the operator's version of processElement1
needs to be passed a StreamRecord
. To help with this, the tests include a couple of helper methods for setting up the inputs:
EnrichmentJoinUnitTestBase.java
_166package com.immerok.cookbook;
_166import static org.assertj.core.api.Assertions.assertThat;
_166import com.immerok.cookbook.records.Product;
_166import com.immerok.cookbook.records.Transaction;
_166import java.math.BigDecimal;
_166import org.apache.flink.api.common.typeinfo.Types;
_166import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
_166import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
_166import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
_166 * StreamRecord is an internal type used in the Flink runtime (and the test harnesses).
and another helper method for the expected result:
EnrichmentJoinUnitTestBase.java
_166package com.immerok.cookbook;
_166import static org.assertj.core.api.Assertions.assertThat;
_166 private static StreamRecord<Transaction> recordForExpectedResult(Transaction t, Product p) {
Here then, is an example of a test for the happy case where the product is processed before the transaction that needs it:
EnrichmentJoinUnitTestBase.java
_166package com.immerok.cookbook;
_166import static org.assertj.core.api.Assertions.assertThat;
_166import com.immerok.cookbook.records.Product;
_166import com.immerok.cookbook.records.Transaction;
_166import java.math.BigDecimal;
_166import org.apache.flink.api.common.typeinfo.Types;
_166import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
_166import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
_166import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
_166import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
_166import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
_166import org.junit.jupiter.api.BeforeEach;
_166import org.junit.jupiter.api.Test;
_166public abstract class EnrichmentJoinUnitTestBase {
_166 private static final Product product = new Product(1, 1, "product1", 5.0F, 1);
Notice how:
harness.processElement1
and harness.processElement2
are used to guarantee a specific order in which the inputs are processed.
harness.numKeyedStateEntries
is used to verify the impact of each record's ingestion on the number of entries stored in Flink's managed state. In this test case the transaction didn't need to be stored, and it wasn't.
harness.getOutput()
is used to obtain the actual results.
The test below verifies that the transactions are buffered when they need to be, and that the buffer is cleared when it is no longer needed.
EnrichmentJoinUnitTestBase.java
_166package com.immerok.cookbook;
_166import static org.assertj.core.api.Assertions.assertThat;
_166import com.immerok.cookbook.records.Product;
_166import com.immerok.cookbook.records.Transaction;
_166import java.math.BigDecimal;
_166import org.apache.flink.api.common.typeinfo.Types;
_166import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
_166import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
_166import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
_166import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
_166import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
_166import org.junit.jupiter.api.BeforeEach;
_166import org.junit.jupiter.api.Test;
_166public abstract class EnrichmentJoinUnitTestBase {
_166 private KeyedTwoInputStreamOperatorTestHarness<Long, Transaction, Product, Transaction> harness;
_166 abstract KeyedCoProcessFunction<Long, Transaction, Product, Transaction> getFunctionToTest();
_166 public void initializeTestHarness() throws Exception {
_166 StreamRecord<Transaction> expected1 = recordForExpectedResult(transaction, product);
Unfortunately, the test harness does not allow for inspecting the status of individual states (e.g., the ValueState
for the product data vs. the ListState
or MapState
used for buffering); all that can be determined is the total number of objects being managed by this operator instance. So when the product that is being waited for is ultimately processed, two things happen: (1) the product is stored, and (2) the list or map of buffered transactions is completely cleared — resulting in no net change to the number of state entries.
The full recipe
The recipe includes many more tests than what are presented here; see the code on GitHub for more
details.
You can run the unit tests in the EnrichmentJoinUnitTestBase
class to see the test suite
in action, either directly via Maven or in
your favorite editor such as IntelliJ IDEA or Visual Studio Code.