Skip to main content

Batch and streaming with the Table and DataStream APIs

Visit the latest-transaction recipe on GitHub.

Use case: Keeping track of the most recent transaction for each customer

In this recipe, you are going to keep track of the most recent transaction for each customer.

This recipe contains four different implementations, each working with the same JSON-encoded input. You will see how to setup serialization and deserialization of timestamps for maximum flexibility. If you follow this pattern, your timestamped data can be used with either the DataStream API or the Table/SQL API, and with various source connectors.

In this recipe you'll also get a good look at how batch and streaming relate to one another, because the implementations provided by this recipe cover these cases:

  • Streaming from Apache Kafka using Apache Flink's DataStream API
  • Streaming from Apache Kafka using Apache Flink's Table API
  • Batch from File System using Apache Flink's DataStream API
  • Batch from File System using Apache Flink's Table API

This recipe for Apache Flink is a self-contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.

Writing the data

The recipe uses either a Kafka topic transactions, or files in a temporary directory. In either case the input is JSON-encoded with timestamps written in ISO-8601 format:


_5
{"t_time": "2022-07-19T11:46:20.000Z", "t_id": 0, "t_customer_id": 0, "t_amount": 100.00}
_5
{"t_time": "2022-07-19T12:00:00.000Z", "t_id": 1, "t_customer_id": 1, "t_amount": 55.00}
_5
{"t_time": "2022-07-24T12:00:00.000Z", "t_id": 2, "t_customer_id": 0, "t_amount": 500.00}
_5
{"t_time": "2022-07-24T13:00:00.000Z", "t_id": 3, "t_customer_id": 1, "t_amount": 11.00}
_5
{"t_time": "2022-07-24T12:59:00.000Z", "t_id": 4, "t_customer_id": 1, "t_amount": 1.00}

All of the source connectors we want to use can handle this ISO-8601 format, but it requires the use of the jackson-datatype-jsr310 module. For serialization you'll need to use this @JsonFormat annotation in the Transaction class:

Transaction.java

_63
package com.immerok.cookbook.records;
_63
_63
import com.fasterxml.jackson.annotation.JsonFormat;
_63
import java.math.BigDecimal;
_63
import java.time.Instant;
_63
import java.util.Objects;
_63
_63
public class Transaction {
_63
_63
* Without this annotation, the timestamps are serialized like this:

Streams from Apache Kafka

Two of the implementations are based on connecting to Apache Kafka from Apache Flink using Flink's Kafka connector. This is done via the Kafka topic transactions.

DataStream API

The first implementation uses Apache Flink's KafkaSource DataStream API connector. It uses the same implementation as you can find in the Kafka JSON to POJO recipe.

StreamingDataStreamJob.java

_38
package com.immerok.cookbook;
_38
_38
import com.immerok.cookbook.records.Transaction;
_38
import com.immerok.cookbook.records.TransactionDeserializer;
_38
import com.immerok.cookbook.workflows.DataStreamWorkflow;
_38
import java.util.function.Consumer;
_38
.setValueOnlyDeserializer(new TransactionDeserializer())

Table API

The second implementation uses Apache Flink's Kafka Table API connector. You first have to create a TableEnvironment, which is the entrypoint for Table API and SQL integration. Since this is a streaming implementation, you are using inStreamingMode().

StreamingTableJob.java

_40
package com.immerok.cookbook;
_40
_40
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();

After you've defined your TableEnvironment, you connect to the Apache Kafka topic by creating a dynamic table with SQL. It is necessary to explicitly configure 'json.timestamp-format.standard' = 'ISO-8601' as shown.

StreamingTableJob.java

_40
package com.immerok.cookbook;
_40
_40
import com.immerok.cookbook.workflows.TableWorkflow;
_40
import org.apache.flink.table.api.EnvironmentSettings;
_40
import org.apache.flink.table.api.TableEnvironment;
_40
import org.apache.flink.table.api.TableResult;
_40
_40
public class StreamingTableJob {
_40
_40
public static void main(String[] args) {
_40
produceResults("transactions").print();
_40
}
_40
_40
static TableResult produceResults(String kafkaTopic) {
_40
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
_40
_40
TableEnvironment tableEnv = TableEnvironment.create(settings);
_40
" 'properties.bootstrap.servers' = 'localhost:9092',",

Batches from the filesystem

The remaining two implementations are based on consuming files from your filesystem. You can specify a location by providing the argument --inputURI.

DataStream API

The third implementation uses Apache Flink's FileSystem DataStream API connector. Since this implementation is a batch implementation, you explicitly have to set the RuntimeExecutionMode to BATCH.

BatchDataStreamJob.java

_41
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

Then you can configure the FileSystem to read the files. Since Apache Flink lacks a JsonInputFormat which you can use with the FileSource, this recipe uses JsonPojoInputFormat. This is a custom input format for JSON that's capable of handling any class Jackson can work with, and it includes the JavaTimeModule needed for JSR-310 / ISO-8601 encoded timestamps.

BatchDataStreamJob.java

_41
package com.immerok.cookbook;
_41
_41
import com.immerok.cookbook.records.JsonPojoInputFormat;
_41
new JsonPojoInputFormat<>(Transaction.class), new Path(inputURI))

Table API

The fourth implementation uses Apache Flink's FileSystem Table API connector. You first have to create a TableEnvironment, which is the entrypoint for Table API and SQL integration. Since this is a batch implementation, you are using inBatchMode().

BatchTableJob.java

_44
package com.immerok.cookbook;
_44
_44
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();

After you've defined your TableEnvironment, you connect to the files by creating a dynamic table with SQL.

BatchTableJob.java

_44
package com.immerok.cookbook;
_44
_44
import com.immerok.cookbook.workflows.TableWorkflow;
_44
import java.net.URI;
_44
import org.apache.flink.api.java.utils.ParameterTool;
_44
import org.apache.flink.table.api.EnvironmentSettings;
_44
import org.apache.flink.table.api.TableEnvironment;
_44
import org.apache.flink.table.api.TableResult;
_44
_44
public class BatchTableJob {
_44
_44
public static void main(String[] args) throws Exception {
_44
final ParameterTool parameters = ParameterTool.fromArgs(args);
_44
_44
URI inputURI = new URI(parameters.getRequired("inputURI"));
_44
" 'json.timestamp-format.standard' = 'ISO-8601'",

Determining the latest transaction

Two of the recipes are using the DataStream API and two of the recipes are using the Table API as explained previously. After connecting to your batch or streaming data sources, you can use the same workflow for your DataStream API recipes and the same workflow for your Table API recipes. That means that you can easily switch your application between a bounded (batch) or unbounded (streaming) source without needing to modify your business logic.

The DataStream Workflow

The recipes that use the DataStream API use Apache Flink's ValueState to retrieve or update the latest transaction from the incoming data.

You first define and configure latestState which is the implementation of ValueState.

LatestTransactionFunction.java

_36
package com.immerok.cookbook.functions;
_36
_36
import com.immerok.cookbook.records.Transaction;
_36
import org.apache.flink.api.common.state.ValueState;
_36
import org.apache.flink.api.common.state.ValueStateDescriptor;
_36
import org.apache.flink.configuration.Configuration;
_36
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
_36
new ValueStateDescriptor<>("latest transaction", Transaction.class);

Next, you define how each incoming event needs to be handled. In case there's no latestState yet for this customer or if a new incoming transaction has occurred, you are updating the value in latestState to the value from the incoming transaction.

LatestTransactionFunction.java

_36
package com.immerok.cookbook.functions;
_36
_36
import com.immerok.cookbook.records.Transaction;
_36
import org.apache.flink.api.common.state.ValueState;
_36
import org.apache.flink.api.common.state.ValueStateDescriptor;
_36
import org.apache.flink.configuration.Configuration;
_36
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
_36
import org.apache.flink.util.Collector;
_36
_36
public class LatestTransactionFunction
_36
extends KeyedProcessFunction<Long, Transaction, Transaction> {
_36
if (latestTransaction == null || (incoming.t_time.isAfter(latestTransaction.t_time))) {

ValueState can only be used with keyed state. In our DataStream API workflow, this means that retrieving and updating can only be done per key. This recipe uses the t_customer_id as the key.

DataStreamWorkflow.java

_28
package com.immerok.cookbook.workflows;
_28
_28
import com.immerok.cookbook.functions.LatestTransactionFunction;
_28
.process(new LatestTransactionFunction());

In the case of the DataStream implementations, there's no real difference between the behavior of the batch and streaming versions.

The Table workflow

The recipes that use the Table API can use exactly the same SQL statement to determine what is the latest transaction for each customer.

TableWorkflow.java

_23
package com.immerok.cookbook.workflows;
_23
_23
import org.apache.flink.table.api.TableEnvironment;
_23
import org.apache.flink.table.api.TableResult;
_23
_23
public class TableWorkflow {
_23
public static TableResult defineWorkflow(TableEnvironment tableEnv, String inputTable) {
_23
_23
String query =
_23
String.format(
_23
" OVER (PARTITION BY t_customer_id ORDER BY t_time DESC) AS rownum",

The only difference between the batch and streaming Table workflows is the output: the batch implementation will produce a final, materialized result, while the streaming implementation produces a changelog stream.

The full recipe

This recipe is self-contained. There are four tests for the four different implementations you can run to see the full recipe in action. The tests are:

  • LatestTransactionTest#testStreamingDataStreamJob
  • LatestTransactionTest#testStreamingTableJob
  • LatestTransactionTest#testBatchDataStreamJob
  • LatestTransactionTest#testBatchTableJob

All tests use an embedded Apache Kafka and Apache Flink setup, so you can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments and tests included in the code for more details.