An enrichment join that waits for missing dataVisit the enrichment-join-with-buffering recipe on GitHub.
This recipe illustrates how to use the DataStream API to implement a streaming join for enrichment use cases that want to guarantee that every event will be enriched. This implementation avoids some of the drawbacks of the joins from Flink SQL commonly used for enrichment; see the explainer on enriching data for ML model serving for more background, especially the section on streaming joins.
This recipe includes examples showing how to work with ValueState, ListState, and MapState.
In this recipe you will enrich a stream of
Transaction events with a stream of
Product events by joining them on
Transaction.t_product_id = Product.p_id.
The semantics of this join are that:
- Each incoming transaction is immediately enriched with the latest available product information.
- However, if nothing is known about the product, the transaction is buffered. The join then waits to produce a result until after the missing product data becomes available.
- Stale (out-of-order) product updates are ignored.
Points 2 and 3 above are what make this join different from the join implemented by Flink's
The basic implementation strategy is this:
Productupdate events arrive, store them somewhere, indexed by their
Transactionevents arrive, send them to the instance responsible for that particular product (using
t_product_id). That product data has hopefully already been stored there by the previous step.
- For any
Transactionevents that are processed before the matching product data has been ingested and stored, store those transactions in some sort of buffer in the same instance that will later process and store the missing product data.
You will achieve the desired data partitioning by shuffling both the
Transaction streams by the product id:
When you deploy this job with a parallelism of 2, the resulting execution graph will have this topology:
All of the transactions and products with certain product IDs will be sent to Join 1, and the rest will be sent to Join 2.
Working with keyed state
The instances of the join operator (Join 1 and Join 2) are the logical place to store the state this recipe requires. For each product ID, the join will need:
- the most recent
Transactionobjects (if any) waiting for that
Flink's keyed state API implements a sharded key/value store, where the keys are defined using
keyBy. In this recipe, the input streams are shuffled with
keyBy(t -> t.t_product_id, p -> p.p_id)
and the keys are product IDs.
Flink offers three types of keyed (or key-partitioned) state, and all three types are used in this recipe. For a deep dive on this topic, see the explainer on working with keyed state.
ValueState<Product> is ideal for storing the most recent
Product object for each product ID.
ListState and MapState
You will use either ListState or MapState to keep track of the transactions waiting for the corresponding product data to be processed and stored:
ListState<Transaction>: a list of Transactions for each product ID
MapState<UUID, Transaction>: a map from UUIDs to Transactions for each product ID
Examples of both approaches are included, one in
EnrichmentJoinUsingListState and the other in
While the implementation that uses ListState is arguably a bit more natural, the implementation based on MapState is more scalable, and should perform better. For more about this, see the section on choosing between the types of keyed state in the explainer on keyed state.
Potential drawbacks of this approach
During the warm-up phase, before all the product events have been ingested, the delay incurred for enrichment is unpredictable, and potentially rather large. But this only has to be done once, during the initial deployment.
In large scale deployments, the number of transactions needing to be buffered during the warm-up phase is potentially very large, which may cause operational headaches. In such cases, another solution for bootstrapping state may be more practical.
The full recipe
This recipe is self-contained. You can run the manual tests in the
EnrichmentJoinWithBufferingTest class to see the full recipe
in action. Those tests use an embedded Apache Kafka and Apache Flink setup, so you can run them directly via Maven or in
your favorite editor such as IntelliJ IDEA or Visual Studio Code.
Another page in this cookbook — Unit testing with Flink's test harnesses — dives into the details of how to write unit tests for the two implementations of the join developed in this recipe.
See the code on GitHub for more details.