Write Apache Parquet® files with Apache Flink®
Visit the write-parquet-files recipe on GitHub.Reading from Apache Kafka® and writing to Apache Parquet files
Apache Kafka is great for streaming data but there are use cases where it's not the best solution. For example, you want leverage Flink’s HybridSource to efficiently bootstrap a new Flink application. This can be especially interesting if you can't store your Kafka events indefinitely.
In this recipe, you are going to consume event data from an Apache Kafka topic and write it to Apache Parquet files. The Parquet format is an open source, column-oriented data format designed for efficient data storage and retrieval. Flink allows you to read and write Parquet files, including using it with Flink's HybridSource. The Parquet format is widely used by other applications, such as the data lake of your choice or directly in a tool like DuckDB.
You will use both the DataStream API and the Table API.
It starts with consuming the events using the DataStream Kafka connector, switches to the Table API, uses SQL to
select and transform the data before using the FileSystem
connector to generate the partitioned Parquet files.
This recipe for Apache Flink is a self contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.
The JSON input data
The recipe uses the Kafka topic transactions
, containing JSON-encoded records.
_5{"t_time":"2022-10-12 14:13:34.277000000","t_id":1,"t_customer_id":7,"t_amount":99.08}_5{"t_time":"2022-10-12 14:13:34.497000000","t_id":2,"t_customer_id":24,"t_amount":405.01}_5{"t_time":"2022-10-12 14:13:34.609000000","t_id":3,"t_customer_id":19,"t_amount":974.90}_5{"t_time":"2022-10-12 14:13:34.724000000","t_id":4,"t_customer_id":8,"t_amount":100.19}_5{"t_time":"2022-10-12 14:13:34.838000000","t_id":5,"t_customer_id":24,"t_amount":161.48}
Connecting and reading data from Apache Kafka
This recipe uses the same implementation as the Kafka JSON to POJO recipe
to connect to the transactions
topic and deserialize the data. It is recommended to read the
Kafka JSON to POJO recipe before continuing.
You create a stream transactionStream
as a result.
Like done before in the Joining and Deduplicating Data recipe,
you are switching from the DataStreams to the Table API by creating a temporary Transactions
view from the incoming
stream created by the DataStream API.
Add dependency on Apache Parquet library
You start with adding Flink’s Parquet file format artifact to your POM file.
Apache Parquet relies on Apache Hadoop®, which requires adding hadoop-common
and hadoop-mapreduce-client-core
.
Both dependencies contain transitive dependencies which conflict with Apache Flink and therefore need to be excluded.
You are adding these dependencies here as provided. If you want to use Parquet in a production cluster, you'll need to configure Flink with Hadoop.
Create a Parquet Sink Table
Before selecting and transforming your data, you create a so-called
Dynamic Table
.
This recipe uses a TableDescriptor.Builder
for defining the schema, path and format.
You then use TableEnvironment#createTable
to create the table, while enabling partitioning and compaction:
You can specify a location for writing the Parquet files by providing the argument --outputFolder
.
The Parquet format
implementation in Flink also allows you to specify any configuration option from ParquetOutputFormat
,
such as parquet.compression
to select a compression algorithm.
Enable partitioning
Flink’s FileSystem connector supports partitioning. Imagine that you need to get some data from the Parquet files and
are only interested in the results of a specific day. A properly optimized application (using "partition pruning") will
optimize its reader so that only the data for that specific day is being read. That will tremendously improve your query
results. Another use case is that you could automatically delete all data that is older than 90 days to be compliant
with data retention policies.
In this recipe, the data is partitioned by t_date
so that all data for a specific day ends up in one specific folder.
Enable compaction
Flink's FileSystem connector also supports compaction. Compaction makes it possible to have smaller checkpoint intervals without generating a large number of small files. File compaction will merge multiple small files into larger files. You can run this automatically like in this recipe, or you could set a target file size for the compacted file.
Before files are merged, they are invisible. That means that the visibility of the file is depending on the time it takes before a checkpoint has completed plus the time that's needed to compact it. If compaction takes too long, it will result in backpressure.
Select and transform the data
After creating the dynamic table, you will use SQL to select and transform the data from Transactions
view.
You want to select all data from the Apache Kafka event. After selecting that, you will cast the timestamp to a string.
This is done to avoid that the Flink data type timestamp
is mapped to Parquet type INT96
, which is how Flink
currently maps the timestamp type.
If you don't do this, you can lose the local timezone information from your data. That would result in a time jump
in your partitioning results as well as in the timestamps in the Parquet files.
Since you want to partition data by day, you convert the timestamp to the yyyy_MM_dd
format.
In the end, you insert everything into your previously defined dynamic table.
The full recipe
This recipe is self contained. You can run the WriteParquetFilesTest#testProductionJob
test to see the full recipe
in action. That test uses an embedded Apache Kafka and Apache Flink setup, so you can run it directly via
Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.
See the comments and tests included in the code for more details.