Skip to main content

Deserializing JSON from Apache Kafka®

Visit the kafka-json-to-pojo recipe on GitHub.

To get started with your first event processing application, you will need to read data from one or multiple sources.

In this recipe, you are going to consume JSON-encoded event data from Apache Kafka and transform this data into your data model. The data model is defined in a POJO.

This recipe for Apache Flink is a self-contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.

The JSON input data

The recipe uses Kafka topic input, containing JSON-encoded records.


_5
{"id":1,"data":"Frodo Baggins","timestamp":"2022-07-28T08:03:18.819865Z"}
_5
{"id":2,"data":"Meriadoc Brandybuck","timestamp":"2022-07-28T08:03:19.030003Z"}
_5
{"id":3,"data":"Boromir","timestamp":"2022-07-28T08:03:19.144706Z"}
_5
{"id":4,"data":"Gollum","timestamp":"2022-07-28T08:03:19.261407Z"}
_5
{"id":5,"data":"Sauron","timestamp":"2022-07-28T08:03:19.377677Z"}

The data model

You want to consume these records in your Apache Flink application and make them available in the data model. The data model is defined in the following POJO:

Event.java

_51
package com.immerok.cookbook.events;
_51
_51
import java.time.Instant;
_51
import java.util.Objects;
_51
_51
/**
_51
/** A Flink POJO must have public fields, or getters and setters */

Connect to Kafka

You are using the Apache Flink KafkaSource connector in the application to connect to your Apache Kafka broker. Next to the necessary connection information, you are configuring the connector to use a custom deserializer called EventDeserializationSchema.

KafkaJSONToPOJO.java

_48
package com.immerok.cookbook;
_48
_48
import com.immerok.cookbook.events.Event;
_48
import com.immerok.cookbook.events.EventDeserializationSchema;
_48
import java.util.function.Consumer;
_48
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
_48
.setValueOnlyDeserializer(new EventDeserializationSchema())

The custom deserializer

The custom deserializer EventDeserializationSchema uses a Jackson ObjectMapper to deserialize each incoming record.

EventDeserializationSchema.java

_34
package com.immerok.cookbook.events;
_34
_34
import com.fasterxml.jackson.databind.ObjectMapper;
_34
import com.fasterxml.jackson.databind.json.JsonMapper;
_34
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
_34
import java.io.IOException;
_34
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
_34
_34
public class EventDeserializationSchema extends AbstractDeserializationSchema<Event> {
_34
_34
private static final long serialVersionUID = 1L;
_34
_34
private transient ObjectMapper objectMapper;
_34
_34
/**
_34
* For performance reasons it's better to create on ObjectMapper in this open method rather than
_34
* creating a new ObjectMapper for every record.
_34
*/
_34
@Override
_34
public void open(InitializationContext context) {
_34
// JavaTimeModule is needed for Java 8 data time (Instant) support
_34
objectMapper = JsonMapper.builder().build().registerModule(new JavaTimeModule());
_34
}
_34
_34
/**

The full recipe

This recipe is self-contained. You can run the KafkaJSONToPOJOTest#testProductionJob test to see the full recipe in action. That test uses an embedded Apache Kafka and Apache Flink setup, so you can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments and tests included in the code for more details.