Working with keyed state
At the heart of Flink is a distributed key-value store. Understanding how to leverage this data store is crucial to the development of robust, performant Flink applications.
The big picture: Key partitioning
The effect of keyBy
on a DataStream is to (re)parition that stream around a key. For example, the key in the example below is a customer ID. This means that for each customer, all of the events for that customer are processed in the same instance of LatestTransactionFunction
:
How is Flink involved in managing state?
Many of the recipes in the Apache Flink Cookbook make use of stateful stream processing. For example:
- In the recipe on using session windows, Flink is keeping track of the ongoing sessions and their contents.
- In the recipe on pattern matching with CEP, Flink is managing the internal state of the finite state machine doing the pattern matching.
- In each of the examples in the recipe on batch and streaming with the Table and DataStream APIs, state is used to keep track of the most recent transaction for each customer.
In each of these recipes (and several others), managed, key state is being used. When you use keyed state, Flink does a lot for you. It
- keeps the state local to the thread where it is used, providing low-latency access
- takes care of checkpointing and recovering the state in the event of failures
- redistributes the state when the cluster is rescaled
- can (if you are careful) support type evolution
- see the recipe on migrating state away from Kyro if you haven't been sufficiently careful
State backends
Any keyed state (or timers) your application uses, whether directly or indirectly (e.g., via the Window, CEP, or Table APIs), is kept in a state backend. There are two principal implementations, with the most commonly used one being based on the embedded RocksDB library.
The most important characteristics of the RocksDB state backend are that:
- It is very scalable, since it keeps its working state as serialized bytes on disk.
- Every read and write involves serialization/deserialization.
The lifecycle of a KeyedProcessFunction
Consider again the example from above, which tracks the most recent transaction for each customer:
An instance of the LatestTransactionFunction
is created in the Flink client, serialized there, and shipped to the Flink cluster as part of job submission. Each of the task managers assigned to execute this function will deserialize a copy of this function object, and call its open
method once during the job's startup phase:
By using a ValueState<Transaction>
inside of LatestTransactionFunction
, you are working with a distributed map, mapping customer IDs to Transaction objects. That distributed map stored in latestState
is sharded across the cluster, with each instance of this keyed process function storing the transactions for some disjoint subset of the customers.
The open
method connects this instance of LatestTransactionFunction
with the latestState
object being managed by the state backend. And if the job is being restarted from a checkpoint or savepoint, then the values of latestState
preserved in that state snapshot become available to this instance, but only for the customer IDs assigned to it.
There is at most one instance of LatestTransactionFunction
in each worker thread (the task slots), and its open
method is called just once. Each of these instances is, however, multiplexed across all of the events for all of the customer IDs assigned to that slot.
As an incoming transaction event arrives for processing, the Flink runtime passes it to processElement
:
The code above that uses latestState.value()
to fetch the state (and similarly, the code that updates the state) is frequently misunderstood.
It is easy to misunderstand the nature of keyed state, and assume that MapState
would be needed here in order to store a Transaction for each Customer. But this is not the case.
Instead, before the Flink runtime calls processElement
, it uses the key selector function specified in the keyBy
to compute the key for the event being processed (the incoming transaction), and makes that key available, both internally to the Flink runtime, and to you via the Context passed to processElement
. Then when your user code calls the value
or update
methods on latestTransaction
, the state being read or written is actually the state corresponding to that key.
Choosing between the types of keyed state
The DataStream API supports three different types of keyed state: ValueState, ListState, and MapState:
ValueState<T>
stores an object of type T for each key (e.g., each user)ListState<T>
stores a list of T for each keyMapState<UK, UV>
stores a map from UK to UV for each key
ValueState is a powerful building block, and could be used whenever your applications need managed, keyed state. However, storing collections as values inside ValueState can be very expensive, and it is better to take advantage of ListState or MapState whenever possible.
You should prefer ListState<T>
over ValueState<List<T>>
, and similarly MapState<UK, UV>
over ValueState<HashMap<UK, UV>>
, because the RocksDB state backend has significant optimizations for both ListState and MapState:
- With ListState, the implementation for RocksDB is able to append to the end of the list without having to first deserialize the entire list, and then re-serialize after.
- With MapState, each key/value pair in the map is a separate RocksDB object that can be created, read, updated, and deleted independently.
There are times when the semantics of ListState are a very natural fit for a particular use case, but even so, it's worthwhile considering whether you could use MapState instead.
This is because there are couple of drawbacks to using ListState compared to MapState, relating to how each of them is implemented on top of RocksDB:
- With ListState, the entire list, when serialized, must be at most 231 bytes long.
- Iterating over ListState is more expensive than iterating over MapState.
Examples
The recipe on implementing an enrichment join that waits for missing data includes examples that illustrate how to use all three types of keyed state. It also includes an example of reworking a use case for ListState so that it uses MapState instead.