Skip to main content

Capturing late data

Visit the late-data-to-sink recipe on GitHub.

Sending late data to a separate sink

When you are consuming data where the payload contains a timestamp when the event occurred, you can use this timestamp for so-called event-time processing.

Apache Flink measures the progress of event-time with watermarks. A watermark declares that the event time has reached a certain time. This means that there should be no more data where the event-time is older or earlier then the timestamp in that watermark. If the timestamp from the payload is lower than the current watermark, the data can be considered late data.

Apache Flink does not always drop late events. This will only happen if you configure watermarks and if you use one of the following specific operations:

  • Windows
  • Joins
  • CEP (Complex Event Processing)
  • Process functions (if you enable it)

In this recipe, you are going to consume events from Apache Kafka which have a timestamp in the payload. When the value for timestamp is late, we will send this data to a separate sink. Data could arrive late because the mobile device of a user has no internet connection and events will only be sent when the device comes back online.

Sending data to a separate sink allows you to monitor how much data is actually late. It also makes it possible that you re-process the late data via an alternative solution, or verify that the late data did not affect the correctness of the result.

This recipe for Apache Flink is a self-contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.

The JSON input data

The recipe uses Kafka topic input, containing JSON-encoded records.


_5
{"id":1,"data":"Galadriel","timestamp":"2022-07-19T09:59:32.804843Z"}
_5
{"id":2,"data":"Éowyn","timestamp":"2022-07-19T09:59:33.011537Z"}
_5
{"id":3,"data":"Arwen Evenstar","timestamp":"2022-07-19T09:59:33.122527Z"}
_5
{"id":4,"data":"Shelob","timestamp":"2022-07-19T09:59:33.235666Z"}
_5
{"id":5,"data":"Saruman the White","timestamp":"2022-07-19T09:59:33.352016Z"}

Define your watermark

Since the payload contains a timestamp, you will configure Flink to use these values for watermarks. This is done by defining a WatermarkStrategy and leveraging Flink's built-in forMonotonousTimestamps() method. This requires extracting the timestamp from the record and passing this to the withTimestampAssigner method.

LateDataToSeparateSink.java

_94
package com.immerok.cookbook;
_94
_94
import com.immerok.cookbook.events.Event;
_94
import com.immerok.cookbook.events.EventDeserializationSchema;
_94
import java.util.function.Consumer;
_94
element.timestamp.toEpochMilli()),

Define your side output for late data

You are using Flink's Side Output to get a stream of data that was considered as late. To define a side output for late data, you first define an OutputTag to identify your side output stream.

LateDataToSeparateSink.java

_94
final OutputTag<Event> lateDataOutputTag = new OutputTag<>("lateData") {};

Next, you use a ProcessFunction that writes all late records to the defined side output.

LateDataToSeparateSink.java

_94
package com.immerok.cookbook;
_94
_94
import com.immerok.cookbook.events.Event;
_94
import com.immerok.cookbook.events.EventDeserializationSchema;
_94
import java.util.function.Consumer;
_94
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
_94
import org.apache.flink.api.connector.source.Source;
_94
import org.apache.flink.connector.kafka.source.KafkaSource;
_94
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
_94
import org.apache.flink.streaming.api.datastream.DataStream;
_94
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
_94
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
_94
import org.apache.flink.streaming.api.functions.ProcessFunction;
_94
import org.apache.flink.streaming.api.functions.sink.PrintSink;
_94
import org.apache.flink.util.Collector;
_94
import org.apache.flink.util.OutputTag;
_94
_94
public class LateDataToSeparateSink {
_94
_94
static final String TOPIC = "input";
_94
_94
public static void main(String[] args) throws Exception {
_94
runJob();
_94
}
_94
_94
static void runJob() throws Exception {
_94
KafkaSource<Event> source =
_94
KafkaSource.<Event>builder()
_94
.setBootstrapServers("localhost:9092")
_94
.setTopics(TOPIC)
_94
.setStartingOffsets(OffsetsInitializer.earliest())
_94
private static class LateDataSplittingProcessFunction extends ProcessFunction<Event, Event> {

The full recipe

This recipe is self-contained. You can run the LateDataToSeparateSinkTest#testProductionJob class to see the full recipe in action. That test uses an embedded Apache Kafka and Apache Flink setup, so you can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments included in the code for more details.