Enriching streaming data for ML model serving
Introduction
Putting machine learning models into production has very particular requirements, and is generally more demanding than traditional ETL or analytics workloads.
To begin with, the overall system architecture must accommodate the needs of both offline (batch) model training, and online (streaming) model serving.
In this explainer the focus is on using Flink for real-time feature generation, for use in streaming model-serving pipelines.
Many of the recipes in the Immerok Flink Cookbook provide examples of techniques that can be useful for feature generation, e.g., using Complex Event Processing (CEP) for pattern matching, or computing session window analytics.
Those examples are relatively straightforward because they each operate on a single stream. But in most real-world use cases of machine learning, such as anomaly detection and fraud prevention, the models will use features that require enriching incoming event records with data from other sources. For example, before approving a credit card transaction, a credit card processor might want to use historic data from other recent transactions for the same card, information about the merchant, metadata about the product being purchased, etc.
Flink offers several different ways to perform enrichment. The following sections consider each technique in turn, discussing their strengths and weaknesses in the context of implementing machine learning pipelines. These techniques have been grouped into two categories, according to whether the enrichment data (1) sits outside of Flink in an external service or data store, to be accessed as needed, or (2) if the enrichment data will be streamed into Flink.
After this deep dive into stream enrichment, the last section of this explainer covers "Low-latency Assembly of Multiple Enrichments". That is where you can have a final look at the big picture and read how to approach the requirement that many real-world applications have to minimize latency, while at the same time generating a bunch of features that involve potentially expensive transformations and enrichments.
Fetching Data from External Sources
Two approaches for querying external services are described below. These apply to situations in which the data to be used for enrichment is not going to be streamed into Flink. Instead, the data will be fetched in real time, on an as-needed basis.
Scenarios where this style of solution is most applicable include situations where:
- The information used for enrichment is computed, in real time, by a machine learning model, sitting behind a REST API.
- The data sits in a data store that doesn't offer changelog (CDC) streams.
- The external dataset is so huge that you can't afford to mirror it into Flink state.
Async I/O
The Async I/O operator is a feature of the DataStream API intended for cases where you must call out to an external service to fetch data used for enrichment. Flink will handle errors and retries for you (since Flink 1.16), and is smart enough to re-issue pending requests after restarting from a checkpoint or savepoint.
For an in-depth explanation, see the section on Async I/O.
Lookup Joins (Table/SQL API)
Lookup joins are how asynchronous I/O operations fit into Flink's relational APIs.
Here's an example of a lookup join:
SELECT *FROM Transactions tJOIN Customers c FOR SYSTEM_TIME AS OF t.t_proc_time ON t.t_customer_id = c.c_id;
The idea is that the stream being enriched is represented as an insert-only dynamic table (the Transactions table in this example). As each transaction event arrives, it is appended to the Transactions table, and an external data store (or service) is queried to fetch the corresponding customer record.
This join will produce one result for each transaction, and that result will come from using the customer information available at the time the transaction is processed. No state will be retained by Flink, and Flink will not generate updates to the join results if the customer records are subsequently updated in the external data store. (This is why these queries are written using processing time.)
A few lookup source connectors are available out of the box, including JDBC, HBase, and Hive. Some of them offer caching (which is disabled by default).
If you want to implement your own lookup table source you can implement
an AsyncTableFunction
and use it in a lookup join. If you'd like to first study an example, you can look at the
implementation of HBaseRowDataAsyncLookupFunction
in the Flink sources.
Streaming Joins
Both async I/O and lookup joins are expensive (in terms of latency). Where feasible, you should instead consider streaming the enrichment data into Flink and making it available for a streaming join, but this requires provisioning Flink with the capacity to mirror some enrichment data in Flink state.
Regular Joins
While enrichment is conceptually some sort of join, a regular SQL join, such as
SELECT *FROM Transactions t, Customers cWHERE t.t_customer_id = c.c_id
is ill-suited for streaming enrichment use cases.
The semantics of a regular (unconstrained) join like this one require that both tables be fully materialized in Flink state, indefinitely. This is necessary so that whenever new records are processed for either side of the join, updated results can be produced.
When enrichment is being done for the purpose of taking immediate action (e.g., preparing a feature vector for a real-time classifier), you are only interested in joining a specific row from the left side of the join (i.e., an incoming Transaction) with the appropriate matching row from the right side (the Customer executing that Transaction). Whether or not the Customer record is later updated is irrelevant, so regular joins have the wrong semantics for enrichment use cases.
In the following sections we'll look at more specialized streaming joins that are better suited for enrichment use cases.
Event-time Temporal Joins
As mentioned above, regular streaming joins have the wrong semantics for enrichment use cases; you should instead be thinking about using a temporal join. (For what it's worth, temporal joins are sometimes referred to as time-versioned joins or joins with temporal tables.)
Consider an incoming Transaction event that includes a currency, and you want to join that Transaction with the currency exchange rate in effect at the time of the Transaction, so that you include the amount of money at risk (normalized to some reference currency) in a feature vector used in a fraud detection model.
Example
Such a join looks like this:
SELECT t_id, t_time, t_price * r_exchange_rate AS t_yenFROM Transactions tJOIN ExchangeRates r FOR SYSTEM_TIME AS OF t.t_timeON t.t_currency = r.r_currency;
Sample inputs and outputs are shown below:
Transactions
t_id | t_time | t_price | t_currency |
---|---|---|---|
1 | 10:15 | 2 | EUR |
2 | 10:35 | 5 | USD |
3 | 10:55 | 50 | YEN |
4 | 11:05 | 2 | EUR |
5 | 11:15 | 5 | USD |
ExchangeRates
r_time | r_currency | r_rate |
---|---|---|
09:00 | YEN | 1 |
09:20 | EUR | 140 |
09:40 | USD | 150 |
10:30 | EUR | 142 |
10:50 | USD | 148 |
Results
t_id | t_time | t_yen |
---|---|---|
1 | 10:15 | 280 |
2 | 10:35 | 750 |
3 | 10:55 | 50 |
4 | 11:05 | 284 |
5 | 11:15 | 740 |
The results are computing by matching each incoming transaction with the latest preceding exchange rate for that currency, as shown here:
Potential Drawbacks
In principle, event-time temporal joins (from Flink SQL) are well-suited for enrichment use cases, as they produce append-only streams of (timestamped) final results that will never need to be updated. For each incoming Transaction (in our example) there is exactly one ExchangeRate record that is the most correct one to use for enrichment, and we should be able to find it.
With event-time temporal joins, there are, however, some potential issues to consider. These issues revolve around watermarking.
There's some indeterminate amount of latency involved. In our scenario with Transactions and ExchangeRates, when enriching a particular Transaction, an event-time temporal join will wait until the watermark on the ExchangeRate stream reaches the timestamp of that Transaction — because only then is it reasonable to be confident that the result of the join is being produced with complete knowledge of the relevant exchange rate data.
Event-time temporal joins cannot not guarantee perfectly correct results. Despite having waited for the watermark, the most relevant ExchangeRate record could still be late, in which case the join will be executed using an earlier version of the exchange rate.
If the enrichment stream has infrequent updates, this will cause problems. This is because of the
behavior of watermarking on idle streams. The TemporalRowTimeJoinOperator (like any operator with
two input streams) normally waits for the watermarks on both incoming streams to reach the desired
timestamp (e.g., t.t_time
in the example above) before taking action.
Joining with Enrichment Streams that are Often Idle
Many commonly used sources of enrichment metadata are only infrequently updated, which is somewhat at odds with this. An idle stream produces no watermarks, and if the watermarks aren't advancing, an event-time temporal join will stall.
The standard solution for managing watermarking for idle streams is to include
withIdleness(Duration)
in the DataStream WatermarkStrategy, or table.exec.source.idle-timeout
in
the table definition. You can go ahead and do this, but depending on your use case. it may not be
helpful.
To understand why, first a bit of background information. The effect of withIdleness(Duration)
is
to mark a stream as idle whenever there are no events for the specified Duration
. While the
watermarks for an active stream will prevent a temporal join from producing results too soon,
idle streams are ignored, and the join will proceed based solely on the watermarks from the other
(still active) input stream.
Below is an example of the impact of idleness detection. In this scenario, a burst of product updates occurs around 10:00:20, after which no joins can be completed until the idleness detection period expires sometime after 10:00:40. After this point is reached, join results are produced as quickly as the Flink cluster is able to catch up, and the backlog is cleared by around 10:01:20.
For many use cases, this delay (e.g., the minute-long hiccough shown above) isn't worthwhile. For example, having the enrichment join pick up today's version of some rarely changing product metadata, as opposed to yesterday's version, probably has no meaningful impact on the score produced by a fraud detection model. Getting a good result quickly, with minimal latency, is often much better than getting a marginally better result some time later (especially if that better result comes too late to be used).
Recommendations
-
Use idleness detection only if you care about getting results that are as close to perfect as possible, while understanding that there's still no guarantee they will be perfect.
-
Otherwise, consider setting the watermarks for the enrichment stream to a large, constant value, such as
Watermark.MAX_WATERMARK
. This will prevent the enrichment stream from having any effect on the overall watermarking. An event-time temporal join will still wait for watermarks on the stream-to-be-enriched, so out-of-order updates on the enrichment stream still have some chance to be used in an optimal way, it's just less likely than if the enrichment stream had its own watermarks. -
But if you really want to minimize latency, use a processing-time temporal join instead.
Processing-time Temporal Joins
A processing-time temporal join is a lazy (some might say sloppy) sort of join that does as little work as possible. Each incoming event (e.g., Transaction) is immediately joined with whatever enrichment data is already available (in Flink state) from the records previously processed from that enrichment stream. This is fast, and simple.
If you want to minimize latency, nothing can beat this approach.
However, there are a couple of challenges to consider with processing-time temporal joins. First off, it's difficult to write good integration tests for jobs using these joins, because a processing-time temporal join is inherently non-deterministic.
But the larger problem is that Flink doesn't make it easy to ensure that the enrichment data is fully processed and loaded into Flink state before events arrive that need to be enriched.
Bootstrapping State
Ideally, we would like to either find a way to bootstrap initial values for this enrichment state, or arrange to wait for missing enrichment data. Otherwise, during an initial warm-up period, there will be events that cannot be enriched because the corresponding enrichment data hasn't yet been ingested.
You can always decide to keep things simple, and accept that some events won't be enriched until the job is warmed up. But if you'd like to solve this problem, here are some ideas:
- Use a
KeyedCoProcessFunction
with state bootstrapped with the State Processor API. - Use a
BroadcastProcessFunction
that bootstraps its state from a file. - Use a
KeyedCoProcessFunction
that waits for missing data by buffering (in Flink state) events that can't yet be enriched. See an enrichment join that waits for missing data for a complete example implementing this approach.
Low-latency Assembly of Multiple Enrichments
For a general introduction to measuring and reducing latency in Flink applications, see the section on Latency.
Reducing Latency with Parallel Enrichments
In the context of assembling feature vectors to use in a real-time machine learning pipeline, it's typically necessary to enrich each event along many different dimensions. For example, a Transaction might be enriched with information about the customer, the product being purchased, and the merchant.
Rather than performing these enrichments sequentially,
it can be much faster to rearrange the job topology so that the enrichments are all done in parallel, and then merge the results.