Skip to main content

Reading Apache Kafka® headers

Visit the kafka-headers recipe on GitHub.

Extracting metadata from Apache Kafka record headers

Records headers give you the ability to provide metadata information about your Apache Kafka record. This information is not added to the key/value pair of the record itself.

Apache Kafka by default adds metadata to each Apache Kafka record, like the topic name, the partition, offset, timestamp and more.

You can also add custom headers yourself, to provide additional metadata to your record. For example, there is the OpenTelemetry project that uses metadata to collect telemetry data such as traces. You could also use metadata to provide the necessary information that's needed to decrypt the payload of the Kafka record.

In this recipe, you are consuming events from Apache Kafka which you transform into your data model. This data model consists of your business payload plus the metadata from Apache Kafka itself and custom added headers. The data model is defined in a POJO.

This recipe for Apache Flink is a self-contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.

The JSON input data

The recipe uses Kafka topic input, containing JSON-encoded records.


_5
{"id":1,"data":"Éomer","timestamp":"2022-07-23T18:29:12.446820Z"}
_5
{"id":2,"data":"Sauron","timestamp":"2022-07-23T18:29:12.663407Z"}
_5
{"id":3,"data":"Gandalf the Grey","timestamp":"2022-07-23T18:29:12.779154Z"}
_5
{"id":4,"data":"Bilbo Baggins","timestamp":"2022-07-23T18:29:12.894671Z"}
_5
{"id":5,"data":"Éowyn","timestamp":"2022-07-23T18:29:13.010826Z"}

The data model

You want to consume these records in your Apache Flink application and make them available in the data model. The data model EnrichedEvent is built up from three different parts:

  1. The business data, which is defined in Event
  2. The default Apache Kafka headers, which are defined in Metadata
  3. The custom added Apache Kafka headers, which are defined in Headers
EnrichedEvent.java

_21
package com.immerok.cookbook.events;
_21
_21
public Metadata metadata;

The business data

Event.java

_51
package com.immerok.cookbook.events;
_51
_51
import java.time.Instant;
_51
import java.util.Objects;
_51
_51
/**
_51
public Instant timestamp;

The Apache Kafka metadata

In this recipe, you are going to read the topic name, partition, offset, the timestamp and what type of timestamp was used when this record was created.

Metadata.java

_77
package com.immerok.cookbook.events;
_77
_77
import java.util.Objects;
_77
_77
/** A collection of Headers metadata provided by Kafka that Flink recognizes as a valid POJO */
_77
public class Metadata {
_77
_77
/** A Flink POJO must have public fields, or getters and setters */
_77
public String metadataTimestampType;

The custom Kafka headers

In this recipe, two custom headers have been added to the Kafka record: tracestate and traceparent. Both are defined in the W3C recommendation for Trace Context.

Headers.java

_46
package com.immerok.cookbook.events;
_46
_46
import java.util.Objects;
_46
_46
/**
_46
public String traceparent;

The custom deserializer

This recipe connects to Apache Kafka in the same way as is described in the Deserializing JSON from Apache Kafka recipe. The difference is that this recipe uses the custom deserializer KafkaHeadersEventDeserializationSchema, which implements a KafkaRecordDeserializationSchema. This provides us with an interface for the deserialization of Kafka records, including all header information.

This provides direct access to the Kafka metadata information for each record:

KafkaHeadersEventDeserializationSchema.java

_77
package com.immerok.cookbook.events;
_77
_77
import com.fasterxml.jackson.databind.ObjectMapper;
_77
import com.fasterxml.jackson.databind.json.JsonMapper;
_77
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
_77
import java.io.IOException;
_77
import java.nio.charset.StandardCharsets;
_77
import org.apache.flink.api.common.serialization.DeserializationSchema;
_77
import org.apache.flink.api.common.typeinfo.TypeInformation;
_77
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
_77
import org.apache.flink.util.Collector;
_77
import org.apache.kafka.clients.consumer.ConsumerRecord;
_77
_77
public class KafkaHeadersEventDeserializationSchema
_77
implements KafkaRecordDeserializationSchema<EnrichedEvent> {
_77
_77
private static final long serialVersionUID = 1L;
_77
_77
private transient ObjectMapper objectMapper;
_77
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<EnrichedEvent> out)

The Headers interface also allows you to return the last header for a given key via lastHeader. You can also return all headers for the given key by replacing this with Headers.

KafkaHeadersEventDeserializationSchema.java

_77
package com.immerok.cookbook.events;
_77
_77
import com.fasterxml.jackson.databind.ObjectMapper;
_77
import com.fasterxml.jackson.databind.json.JsonMapper;
_77
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
_77
import java.io.IOException;
_77
import java.nio.charset.StandardCharsets;
_77
import org.apache.flink.api.common.serialization.DeserializationSchema;
_77
import org.apache.flink.api.common.typeinfo.TypeInformation;
_77
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
_77
import org.apache.flink.util.Collector;
_77
import org.apache.kafka.clients.consumer.ConsumerRecord;
_77
_77
public class KafkaHeadersEventDeserializationSchema
_77
implements KafkaRecordDeserializationSchema<EnrichedEvent> {
_77
_77
private static final long serialVersionUID = 1L;
_77
_77
private transient ObjectMapper objectMapper;
_77
_77
static final String HEADER_TRACE_PARENT = "traceparent";
_77
static final String HEADER_TRACE_STATE = "tracestate";
_77
_77
/**
_77
* For performance reasons it's better to create on ObjectMapper in this open method rather than
_77
* creating a new ObjectMapper for every record.
_77
*/
_77
@Override
_77
public void open(DeserializationSchema.InitializationContext context) {
_77
// JavaTimeModule is needed for Java 8 data time (Instant) support
_77
objectMapper = JsonMapper.builder().build().registerModule(new JavaTimeModule());
_77
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<EnrichedEvent> out)

When you have all the necessary data collected, it's time to collect it and return all information to your EnrichedEvent.

KafkaHeadersEventDeserializationSchema.java

_77
package com.immerok.cookbook.events;
_77
_77
import com.fasterxml.jackson.databind.ObjectMapper;
_77
import com.fasterxml.jackson.databind.json.JsonMapper;
_77
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
_77
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<EnrichedEvent> out)

The full recipe

This recipe is self-contained. You can run the KafkaHeadersTest#testProductionJob class to see the full recipe in action. That test uses an embedded Apache Kafka and Apache Flink setup, so you can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments included in the code for more details.