Skip to main content

Splitting Apache Kafka® events

Visit the split-stream recipe on GitHub.

Splitting or routing a stream of events

A common use case is where you will need to split a stream of events or route specific events to a specific location.

In this recipe, you are going to consume events from Apache Kafka which have a Priority value in the payload. All events with a Critical or Major priority will be routed to different outputs.

This recipe for Apache Flink is a self-contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.

The JSON input data

The recipe uses Kafka topic input, containing JSON-encoded records.


_5
{"id":1,"data":"NL31PJMQ9001080613","priority":"MAJOR"}
_5
{"id":2,"data":"AD5934360143N91217Gfa7hA","priority":"MINOR"}
_5
{"id":3,"data":"ST35710189607894938826568","priority":"CRITICAL"}
_5
{"id":4,"data":"IT90R5678911215DpWA54OpHM17","priority":"CRITICAL"}
_5
{"id":5,"data":"RS93089245532951564296","priority":"MAJOR"}

Define your side output

You are going to use Flink's Side Output to split the stream of events. Side Outputs can be used to split a stream in n-ways, into streams of different types with excellent performance.

To use Side Outputs, you first define an OutputTag to identify your side output stream.

SplitStream.java

_82
package com.immerok.cookbook;
_82
_82
import static java.util.Map.entry;
_82
_82
entry(Event.Priority.CRITICAL, new OutputTag<>("critical") {}),

Emit to side output

You are going to emit to your defined side output using Flink's ProcessFunction. By using the Context parameter you are emitting data to the previously defined side output.

SplitStream.java

_82
package com.immerok.cookbook;
_82
_82
import static java.util.Map.entry;
_82
_82
import com.immerok.cookbook.events.Event;
_82
import com.immerok.cookbook.events.EventDeserializationSchema;
_82
import java.util.Map;
_82
import java.util.function.Consumer;
_82
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
_82
import org.apache.flink.connector.kafka.source.KafkaSource;
_82
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
_82
ProcessFunction<Event, Event>.Context ctx,

The full recipe

This recipe is self-contained. You can run the SplitStreamTest#testProductionJob class to see the full recipe in action. That test uses an embedded Apache Kafka and Apache Flink setup, so you can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments included in the code for more details.