An enrichment join that waits for missing data
Visit the enrichment-join-with-buffering recipe on GitHub.Introduction
This recipe illustrates how to use the DataStream API to implement a streaming join for enrichment use cases that want to guarantee that every event will be enriched. This implementation avoids some of the drawbacks of the joins from Flink SQL commonly used for enrichment; see the explainer on enriching data for ML model serving for more background, especially the section on streaming joins.
This recipe includes examples showing how to work with ValueState, ListState, and MapState.
Use case
In this recipe you will enrich a stream of Transaction
events with a stream of Product
events by joining them on Transaction.t_product_id = Product.p_id
.
The semantics of this join are that:
- Each incoming transaction is immediately enriched with the latest available product information.
- However, if nothing is known about the product, the transaction is buffered. The join then waits to produce a result until after the missing product data becomes available.
- Stale (out-of-order) product updates are ignored.
Points 2 and 3 above are what make this join different from the join implemented by Flink's TemporalProcessTimeJoinOperator
.
Implementation strategy
The basic implementation strategy is this:
- As
Product
update events arrive, store them somewhere, indexed by theirp_id
field. - As
Transaction
events arrive, send them to the instance responsible for that particular product (usingt_product_id
). That product data has hopefully already been stored there by the previous step. - For any
Transaction
events that are processed before the matching product data has been ingested and stored, store those transactions in some sort of buffer in the same instance that will later process and store the missing product data.
You will achieve the desired data partitioning by shuffling both the Product
and Transaction
streams by the product id:
When you deploy this job with a parallelism of 2, the resulting execution graph will have this topology:
All of the transactions and products with certain product IDs will be sent to Join 1, and the rest will be sent to Join 2.
Working with keyed state
The instances of the join operator (Join 1 and Join 2) are the logical place to store the state this recipe requires. For each product ID, the join will need:
- the most recent
Product
object - the
Transaction
objects (if any) waiting for thatProduct
Flink's keyed state API implements a sharded key/value store, where the keys are defined using keyBy
. In this recipe, the input streams are shuffled with
keyBy(t -> t.t_product_id, p -> p.p_id)
and the keys are product IDs.
Flink offers three types of keyed (or key-partitioned) state, and all three types are used in this recipe. For a deep dive on this topic, see the explainer on working with keyed state.
ValueState
ValueState<Product>
is ideal for storing the most recent Product
object for each product ID.
ListState and MapState
You will use either ListState or MapState to keep track of the transactions waiting for the corresponding product data to be processed and stored:
ListState<Transaction>
: a list of Transactions for each product IDMapState<UUID, Transaction>
: a map from UUIDs to Transactions for each product ID
Examples of both approaches are included, one in EnrichmentJoinUsingListState
and the other in EnrichmentJoinUsingMapState
.
While the implementation that uses ListState is arguably a bit more natural, the implementation based on MapState is more scalable, and should perform better. For more about this, see the section on choosing between the types of keyed state in the explainer on keyed state.
Potential drawbacks of this approach
During the warm-up phase, before all the product events have been ingested, the delay incurred for enrichment is unpredictable, and potentially rather large. But this only has to be done once, during the initial deployment.
In large scale deployments, the number of transactions needing to be buffered during the warm-up phase is potentially very large, which may cause operational headaches. In such cases, another solution for bootstrapping state may be more practical.
The full recipe
This recipe is self-contained. You can run the manual tests in the EnrichmentJoinWithBufferingTest
class to see the full recipe
in action. Those tests use an embedded Apache Kafka and Apache Flink setup, so you can run them directly via Maven or in
your favorite editor such as IntelliJ IDEA or Visual Studio Code.
Another page in this cookbook — Unit testing with Flink's test harnesses — dives into the details of how to write unit tests for the two implementations of the join developed in this recipe.
See the code on GitHub for more details.