Skip to main content

Unit testing with Apache Flink's® test harnesses

Visit the enrichment-join-with-buffering recipe on GitHub.

Introduction

In this recipe you will write unit tests for functions that use managed state.

In particular, you will write unit tests for the KeyedCoProcessFunction at the heart of the recipe that implements a buffered enrichment join. That job has two source streams, transactions and products, and connects them to enrich each Transaction with the Product being sold:

flowchart LR T(transactions) & P(products) --> J(Join) J --> S( ) style T fill:#E5E8FF,stroke:#3A4BDC style P fill:#E2FDB5,stroke:#4A9505 style J fill:#FFFFAD,stroke:#8A8A00 style S fill:#FFFFFF,stroke:#FFFFFF linkStyle 0 stroke:#3A4BDC linkStyle 1 stroke:#4A9505 linkStyle 2 stroke:#8A8A00

To test this properly, you need to test that this join behaves correctly, regardless of which event is processed first by the join: the Transaction event to be enriched, or the Product event used for enrichment. This means verifying not only that the results are correct, but also that any managed state that gets used is freed when it is no longer needed.

This sort of testing isn't possible in an integration test, because Flink's APIs don't provide enough control over the runtime, nor do they provide enough visibility into the state backend. Fortunately, Flink includes test harnesses that provide this functionality.

For an introduction to these test harnesses, and their relationship to operators and user functions, please see the section on getting started with Flink's test harnesses in the explainer on testing DataStream applications.

Necessary dependencies

To use these test harnesses, you will first add some dependencies to your project:

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.immerok</groupId>
<artifactId>cookbook-enrichment-join-with-buffering</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Flink Cookbook Recipe - Enrichment Join with Buffering</name>
<properties>
<flink.version>1.16.0</flink.version>
<jackson.databind.version>2.14.1</jackson.databind.version>
<junit.jupiter.version>5.9.1</junit.jupiter.version>
<kafka.version>3.2.2</kafka.version>
<log4j.version>2.19.0</log4j.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<artifactId>flink-streaming-java</artifactId>

Creating the test harness

With all the dependencies now in place, you are ready to start writing tests.

In this recipe, the initializeTestHarness method is used to create a test harness before each test:

EnrichmentJoinUnitTestBase.java

_166
package com.immerok.cookbook;
_166
_166
import static org.assertj.core.api.Assertions.assertThat;
_166
_166
import com.immerok.cookbook.records.Product;
_166
import com.immerok.cookbook.records.Transaction;
_166
import java.math.BigDecimal;
_166
import org.apache.flink.api.common.typeinfo.Types;
_166
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
_166
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
_166
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
_166
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
_166
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
_166
import org.junit.jupiter.api.BeforeEach;
_166
import org.junit.jupiter.api.Test;
_166
_166
public abstract class EnrichmentJoinUnitTestBase {
_166
_166
private KeyedTwoInputStreamOperatorTestHarness<Long, Transaction, Product, Transaction> harness;
_166

In this case you are testing a join that is implemented with a KeyedCoProcessFunction, so the appropriate operator is a KeyedCoProcessOperator, to which you should pass an instance of the KeyedCoProcessFunction being tested.

With any keyed operator, the test harness will need to know how to compute the key(s), and their type, so that information is passed to the test harness' constructor, along with the operator.

The recipe you are testing includes two implementations of the same enrichment join, one using ListState, and the other using MapState. The testing criteria are the same for both implementations, so the tests have been implemented in a EnrichmentJoinUnitTestBase class that is extended by EnrichmentJoinUsingListStateUnitTest and EnrichmentJoinUsingMapStateUnitTest to create two identical test suites:

EnrichmentJoinUsingListStateUnitTest.java

_13
package com.immerok.cookbook;
_13
_13
import com.immerok.cookbook.functions.EnrichmentJoinUsingListState;
_13
import com.immerok.cookbook.records.Product;
_13
import com.immerok.cookbook.records.Transaction;
_13
public KeyedCoProcessFunction<Long, Transaction, Product, Transaction> getFunctionToTest() {

EnrichmentJoinUsingMapStateUnitTest.java

_13
package com.immerok.cookbook;
_13
_13
import com.immerok.cookbook.functions.EnrichmentJoinUsingMapState;
_13
import com.immerok.cookbook.records.Product;
_13
import com.immerok.cookbook.records.Transaction;
_13
public KeyedCoProcessFunction<Long, Transaction, Product, Transaction> getFunctionToTest() {

Writing tests

The KeyedCoProcessFunction you are testing has processElement1 and processElement2 callbacks that are called by the KeyedCoProcessOperator. That operator also has processElement1 and processElement2 callbacks, which are in turn called by the Flink runtime. Except in this case, you are going to replace the Flink runtime with a test harness, which will call those operator callbacks directly.

Here is the implementation for KeyedCoProcessOperator#processElement1, copied from the Flink sources:

KeyedCoProcessOperator.java

_6
public void processElement1(StreamRecord<IN1> element) throws Exception {
_6
collector.setTimestamp(element);
_6
context.element = element;
_6
userFunction.processElement1(element.getValue(), context, collector);
_6
context.element = null;
_6
}

Note that the operator's version of processElement1 needs to be passed a StreamRecord. To help with this, the tests include a couple of helper methods for setting up the inputs:

EnrichmentJoinUnitTestBase.java

_166
package com.immerok.cookbook;
_166
_166
import static org.assertj.core.api.Assertions.assertThat;
_166
_166
import com.immerok.cookbook.records.Product;
_166
import com.immerok.cookbook.records.Transaction;
_166
import java.math.BigDecimal;
_166
import org.apache.flink.api.common.typeinfo.Types;
_166
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
_166
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
_166
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
_166
* StreamRecord is an internal type used in the Flink runtime (and the test harnesses).

and another helper method for the expected result:

EnrichmentJoinUnitTestBase.java

_166
package com.immerok.cookbook;
_166
_166
import static org.assertj.core.api.Assertions.assertThat;
_166
private static StreamRecord<Transaction> recordForExpectedResult(Transaction t, Product p) {

Here then, is an example of a test for the happy case where the product is processed before the transaction that needs it:

EnrichmentJoinUnitTestBase.java

_166
package com.immerok.cookbook;
_166
_166
import static org.assertj.core.api.Assertions.assertThat;
_166
_166
import com.immerok.cookbook.records.Product;
_166
import com.immerok.cookbook.records.Transaction;
_166
import java.math.BigDecimal;
_166
import org.apache.flink.api.common.typeinfo.Types;
_166
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
_166
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
_166
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
_166
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
_166
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
_166
import org.junit.jupiter.api.BeforeEach;
_166
import org.junit.jupiter.api.Test;
_166
_166
public abstract class EnrichmentJoinUnitTestBase {
_166
_166
private static final Product product = new Product(1, 1, "product1", 5.0F, 1);

Notice how:

  • harness.processElement1 and harness.processElement2 are used to guarantee a specific order in which the inputs are processed.
  • harness.numKeyedStateEntries is used to verify the impact of each record's ingestion on the number of entries stored in Flink's managed state. In this test case the transaction didn't need to be stored, and it wasn't.
  • harness.getOutput() is used to obtain the actual results.

The test below verifies that the transactions are buffered when they need to be, and that the buffer is cleared when it is no longer needed.

EnrichmentJoinUnitTestBase.java

_166
package com.immerok.cookbook;
_166
_166
import static org.assertj.core.api.Assertions.assertThat;
_166
_166
import com.immerok.cookbook.records.Product;
_166
import com.immerok.cookbook.records.Transaction;
_166
import java.math.BigDecimal;
_166
import org.apache.flink.api.common.typeinfo.Types;
_166
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
_166
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
_166
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
_166
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
_166
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
_166
import org.junit.jupiter.api.BeforeEach;
_166
import org.junit.jupiter.api.Test;
_166
_166
public abstract class EnrichmentJoinUnitTestBase {
_166
_166
private KeyedTwoInputStreamOperatorTestHarness<Long, Transaction, Product, Transaction> harness;
_166
_166
abstract KeyedCoProcessFunction<Long, Transaction, Product, Transaction> getFunctionToTest();
_166
_166
@BeforeEach
_166
public void initializeTestHarness() throws Exception {
_166
_166
StreamRecord<Transaction> expected1 = recordForExpectedResult(transaction, product);

Unfortunately, the test harness does not allow for inspecting the status of individual states (e.g., the ValueState for the product data vs. the ListState or MapState used for buffering); all that can be determined is the total number of objects being managed by this operator instance. So when the product that is being waited for is ultimately processed, two things happen: (1) the product is stored, and (2) the list or map of buffered transactions is completely cleared — resulting in no net change to the number of state entries.

The full recipe

The recipe includes many more tests than what are presented here; see the code on GitHub for more details.

You can run the unit tests in the EnrichmentJoinUnitTestBase class to see the test suite in action, either directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.