Skip to main content

Creating dead letter queues

Visit the kafka-dead-letter recipe on GitHub.

Sending events to different topic when failure occurs

When you are processing data, it can happen that something goes wrong during the process. For example, your incoming data is different from your expected data model.

In this recipe, you are consuming events from Apache Kafka which you transform into your data model. This data model is defined in a POJO. However, there are some events which are not matching with your POJO. This could happen because you add a new IoT-device to your network which introduces new data.

You are going to send these malformed events to a different topic. This allows you to inspect this incorrect data and either consult with your data provider or you can adjust your business logic.

This recipe for Apache Flink is a self-contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.

The JSON input data

The recipe uses Kafka topic input, containing JSON-encoded records.


_5
{"id":1,"data":"Éomer","timestamp":"2022-07-23T18:29:12.446820Z"}
_5
{"id":2,"data":"Sauron","timestamp":"2022-07-23T18:29:12.663407Z"}
_5
{"id":3,"data":"Gandalf the Grey","timestamp":"2022-07-23T18:29:12.779154Z"}
_5
{"id":4,"data":"Bilbo Baggins","timestamp":"2022-07-23T18:29:12.894671Z"}
_5
{"id":5,"data":"Éowyn","timestamp":"2022-07-23T18:29:13.010826Z"}

Define your side output

You are going to use Flink's Side Output to send the malformed events to a different topic.

To use Side Outputs, you first define an OutputTag to identify your side output stream.

KafkaDeadLetterTopic.java

_91
final OutputTag<String> deserializationErrors = new OutputTag<>("errors") {};

Try to deserialize your data

You are going to use Flink's ProcessFunction to try to deserialize your incoming data.

In case you can correctly deserialize the data, it will be processed as expected.

In case the deserialization fails, you will send to the defined OutputTag the value of the event that was not possible to be deserialized.

You can't try to deserialize your data directly in the source. That's because when a source encounters a serialization error, they can only either drop the message or fail the job, as they do not have access to side outputs due to Flink limitations.

KafkaDeadLetterTopic.java

_91
package com.immerok.cookbook;
_91
_91
import com.immerok.cookbook.events.Event;
_91
import com.immerok.cookbook.events.JsonDeserialization;
_91
import java.io.IOException;
_91
import java.util.function.Consumer;
_91
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
_91
import org.apache.flink.api.common.serialization.SimpleStringSchema;
_91
import org.apache.flink.api.connector.source.Source;
_91
import org.apache.flink.connector.kafka.source.KafkaSource;
_91
deserialized = JsonDeserialization.deserialize(value);

Apply watermarking afterwards

Since we aren't deserializing events in the source we have to apply the watermark strategy afterwards. Depending on the source this may not be required if the timestamp can be determined without deserializing the event.

KafkaDeadLetterTopic.java

_91
package com.immerok.cookbook;
_91
_91
import com.immerok.cookbook.events.Event;
_91
import com.immerok.cookbook.events.JsonDeserialization;
_91
(event, timestamp) -> event.timestamp.toEpochMilli()));

The full recipe

This recipe is self-contained. You can run the KeadDeadLetterTopicTest#testProductionJob class to see the full recipe in action. That test uses an embedded Apache Kafka and Apache Flink setup, so you can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments included in the code for more details.