Skip to main content

Using session windows

Visit the session-window recipe on GitHub.

Grouping data that belongs to one session

Session windows are commonly used when processing clickstream data. When a user visits your website and performs multiple actions, these actions have different intervals between them. At some point, the user has completed its goal on your website and goes elsewhere. A session window closes when no actions have been reported for a certain period of time. When you determine a session, you can create insights such as the number of interactions per user.

In this recipe, you are going to consume events from Apache Kafka and count the number of interactions per session. A session is finished when no activities have occurred 5 seconds after the last one.

This recipe for Apache Flink is a self-contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.

The JSON input data

The recipe uses Kafka topic input, containing JSON-encoded records.


_5
{"id":1,"user":"Grover Larson","timestamp":"2022-07-24T18:42:06.064708Z"}
_5
{"id":2,"user":"Brian Kiehn","timestamp":"2022-07-24T18:42:06.382717Z"}
_5
{"id":3,"user":"Santos Romaguera","timestamp":"2022-07-24T18:42:06.627165Z"}
_5
{"id":4,"user":"Bella Marks","timestamp":"2022-07-24T18:42:06.866655Z"}
_5
{"id":5,"user":"Elvis Welch","timestamp":"2022-07-24T18:42:07.462994Z"}

Define your session window

You are going to use Flink's Session Windows to create a session window and define the length of your session.

To define your session window, you need to define on which key you want to aggregate your results. This can be compared with a GROUP BY statement in SQL. You are using the value from user and you define your session window as having no data occurring for that specific user for 5 seconds.

SessionWindow.java

_100
package com.immerok.cookbook;
_100
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))

Aggregate the number of interactions

You are going to aggregate the number of interactions with Flink's built-in AggregateFunction This uses UserActivity POJO:

UserActivity.java

_56
/*
_56
* Licensed to the Apache Software Foundation (ASF) under one or more
_56
* contributor license agreements. See the NOTICE file distributed with
_56
* this work for additional information regarding copyright ownership.
_56
public Instant activityStart;

Adding data to a session window

Inside the AggregateFunction function you use the UserActivity accumulator to add data to a session window for each event. If a session window doesn't exist yet for this user, it will be created.

SessionWindow.java

_100
package com.immerok.cookbook;
_100
_100
import com.immerok.cookbook.events.Event;
_100
import com.immerok.cookbook.events.EventDeserializationSchema;
_100
import com.immerok.cookbook.events.UserActivity;
_100
import java.util.function.Consumer;
_100
import org.apache.commons.lang3.ObjectUtils;
_100
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
_100
import org.apache.flink.api.common.functions.AggregateFunction;
_100
import org.apache.flink.api.connector.source.Source;
_100
public UserActivity add(Event value, UserActivity accumulator) {

Merging data in one session window

If a session window already exists for this user and the defined gap of 5 seconds hasn't passed yet, the session windows for that user needs to be merged. That's why you're using the merge method in the AggregateFunction function to aggregate the total number of interactions and to determine what is the correct timestamp for when the session window was started and when it ended.

SessionWindow.java

_100
package com.immerok.cookbook;
_100
_100
import com.immerok.cookbook.events.Event;
_100
import com.immerok.cookbook.events.EventDeserializationSchema;
_100
import com.immerok.cookbook.events.UserActivity;
_100
import java.util.function.Consumer;
_100
import org.apache.commons.lang3.ObjectUtils;
_100
ObjectUtils.min(a.activityStart, b.activityStart);

Getting results of a session window

If the gap of 5 seconds has passed, you will return the results of the accumulator. This contains the final result of the session window.

SessionWindow.java

_100
package com.immerok.cookbook;
_100
_100
public UserActivity getResult(UserActivity accumulator) {

The full recipe

This recipe is self-contained. You can run the SessionWindowTest#testProductionJob class to see the full recipe in action. That test uses an embedded Apache Kafka and Apache Flink setup, so you can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments included in the code for more details.