Deserializing JSON from Apache Kafka®
Visit the kafka-json-to-pojo recipe on GitHub.Reading Kafka JSON records in Apache Flink
To get started with your first event processing application, you will need to read data from one or multiple sources.
In this recipe, you are going to consume JSON-encoded event data from Apache Kafka and transform this data into your data model. The data model is defined in a POJO.
This recipe for Apache Flink is a self-contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.
The JSON input data
The recipe uses Kafka topic input
, containing JSON-encoded records.
_5{"id":1,"data":"Frodo Baggins","timestamp":"2022-07-28T08:03:18.819865Z"}_5{"id":2,"data":"Meriadoc Brandybuck","timestamp":"2022-07-28T08:03:19.030003Z"}_5{"id":3,"data":"Boromir","timestamp":"2022-07-28T08:03:19.144706Z"}_5{"id":4,"data":"Gollum","timestamp":"2022-07-28T08:03:19.261407Z"}_5{"id":5,"data":"Sauron","timestamp":"2022-07-28T08:03:19.377677Z"}
The data model
You want to consume these records in your Apache Flink application and make them available in the data model. The data model is defined in the following POJO:
Connect to Kafka
You are using the Apache Flink KafkaSource
connector in the application to connect to your Apache Kafka broker.
Next to the necessary connection information, you are configuring the connector
to use a custom deserializer called EventDeserializationSchema
.
The custom deserializer
The custom deserializer EventDeserializationSchema
uses a Jackson ObjectMapper
to deserialize each incoming record.
The full recipe
This recipe is self-contained. You can run the KafkaJSONToPOJOTest#testProductionJob
test to see the full recipe
in action. That test uses an embedded Apache Kafka and Apache Flink setup, so you can run it directly via
Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.
See the comments and tests included in the code for more details.