Skip to main content

Working with keyed state

At the heart of Flink is a distributed key-value store. Understanding how to leverage this data store is crucial to the development of robust, performant Flink applications.

The big picture: Key partitioning

The effect of keyBy on a DataStream is to (re)parition that stream around a key. For example, the key in the example below is a customer ID. This means that for each customer, all of the events for that customer are processed in the same instance of LatestTransactionFunction:

DataStreamWorkflow.java

package com.immerok.cookbook.workflows;
import com.immerok.cookbook.functions.LatestTransactionFunction;
.process(new LatestTransactionFunction());

flowchart LR T(transactionStream) ----->|transactions for certain customers| P1(LatestTransactionFunction) T(transactionStream) ----->|transactions for other customers| P2(LatestTransactionFunction) style T fill:#E5E8FF,stroke:#3A4BDC style P1 fill:#E5E8FF,stroke:#3A4BDC style P2 fill:#E5E8FF,stroke:#3A4BDC

Many of the recipes in the Apache Flink Cookbook make use of stateful stream processing. For example:

In each of these recipes (and several others), managed, key state is being used. When you use keyed state, Flink does a lot for you. It

  • keeps the state local to the thread where it is used, providing low-latency access
  • takes care of checkpointing and recovering the state in the event of failures
  • redistributes the state when the cluster is rescaled
  • can (if you are careful) support type evolution

State backends

Any keyed state (or timers) your application uses, whether directly or indirectly (e.g., via the Window, CEP, or Table APIs), is kept in a state backend. There are two principal implementations, with the most commonly used one being based on the embedded RocksDB library.

The most important characteristics of the RocksDB state backend are that:

  • It is very scalable, since it keeps its working state as serialized bytes on disk.
  • Every read and write involves serialization/deserialization.

The lifecycle of a KeyedProcessFunction

Consider again the example from above, which tracks the most recent transaction for each customer:

DataStreamWorkflow.java

package com.immerok.cookbook.workflows;
import com.immerok.cookbook.functions.LatestTransactionFunction;
.process(new LatestTransactionFunction());

An instance of the LatestTransactionFunction is created in the Flink client, serialized there, and shipped to the Flink cluster as part of job submission. Each of the task managers assigned to execute this function will deserialize a copy of this function object, and call its open method once during the job's startup phase:

LatestTransactionFunction.java

package com.immerok.cookbook.functions;
import com.immerok.cookbook.records.Transaction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
new ValueStateDescriptor<>("latest transaction", Transaction.class);

By using a ValueState<Transaction> inside of LatestTransactionFunction, you are working with a distributed map, mapping customer IDs to Transaction objects. That distributed map stored in latestState is sharded across the cluster, with each instance of this keyed process function storing the transactions for some disjoint subset of the customers.

The open method connects this instance of LatestTransactionFunction with the latestState object being managed by the state backend. And if the job is being restarted from a checkpoint or savepoint, then the values of latestState preserved in that state snapshot become available to this instance, but only for the customer IDs assigned to it.

There is at most one instance of LatestTransactionFunction in each worker thread (the task slots), and its open method is called just once. Each of these instances is, however, multiplexed across all of the events for all of the customer IDs assigned to that slot.

As an incoming transaction event arrives for processing, the Flink runtime passes it to processElement:

LatestTransactionFunction.java

package com.immerok.cookbook.functions;
import com.immerok.cookbook.records.Transaction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class LatestTransactionFunction
extends KeyedProcessFunction<Long, Transaction, Transaction> {
if (latestTransaction == null || (incoming.t_time.isAfter(latestTransaction.t_time))) {

Frequent source of confusion

The code above that uses latestState.value() to fetch the state (and similarly, the code that updates the state) is frequently misunderstood.

It is easy to misunderstand the nature of keyed state, and assume that MapState would be needed here in order to store a Transaction for each Customer. But this is not the case.

Instead, before the Flink runtime calls processElement, it uses the key selector function specified in the keyBy to compute the key for the event being processed (the incoming transaction), and makes that key available, both internally to the Flink runtime, and to you via the Context passed to processElement. Then when your user code calls the value or update methods on latestTransaction, the state being read or written is actually the state corresponding to that key.

Choosing between the types of keyed state

The DataStream API supports three different types of keyed state: ValueState, ListState, and MapState:

  • ValueState<T> stores an object of type T for each key (e.g., each user)
  • ListState<T> stores a list of T for each key
  • MapState<UK, UV> stores a map from UK to UV for each key

ValueState is a powerful building block, and could be used whenever your applications need managed, keyed state. However, storing collections as values inside ValueState can be very expensive, and it is better to take advantage of ListState or MapState whenever possible.

Try to avoid using collections inside ValueState

You should prefer ListState<T> over ValueState<List<T>>, and similarly MapState<UK, UV> over ValueState<HashMap<UK, UV>>, because the RocksDB state backend has significant optimizations for both ListState and MapState:

  • With ListState, the implementation for RocksDB is able to append to the end of the list without having to first deserialize the entire list, and then re-serialize after.
  • With MapState, each key/value pair in the map is a separate RocksDB object that can be created, read, updated, and deleted independently.

There are times when the semantics of ListState are a very natural fit for a particular use case, but even so, it's worthwhile considering whether you could use MapState instead.

This is because there are couple of drawbacks to using ListState compared to MapState, relating to how each of them is implemented on top of RocksDB:

  • With ListState, the entire list, when serialized, must be at most 231 bytes long.
  • Iterating over ListState is more expensive than iterating over MapState.

Examples

The recipe on implementing an enrichment join that waits for missing data includes examples that illustrate how to use all three types of keyed state. It also includes an example of reworking a use case for ListState so that it uses MapState instead.

References