Skip to main content

Write Apache Parquet® files with Apache Flink®

Visit the write-parquet-files recipe on GitHub.

Reading from Apache Kafka® and writing to Apache Parquet files

Apache Kafka is great for streaming data but there are use cases where it's not the best solution. For example, you want leverage Flink’s HybridSource to efficiently bootstrap a new Flink application. This can be especially interesting if you can't store your Kafka events indefinitely.

In this recipe, you are going to consume event data from an Apache Kafka topic and write it to Apache Parquet files. The Parquet format is an open source, column-oriented data format designed for efficient data storage and retrieval. Flink allows you to read and write Parquet files, including using it with Flink's HybridSource. The Parquet format is widely used by other applications, such as the data lake of your choice or directly in a tool like DuckDB.

You will use both the DataStream API and the Table API. It starts with consuming the events using the DataStream Kafka connector, switches to the Table API, uses SQL to select and transform the data before using the FileSystem connector to generate the partitioned Parquet files.

This recipe for Apache Flink is a self contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.

The JSON input data

The recipe uses the Kafka topic transactions, containing JSON-encoded records.


_5
{"t_time":"2022-10-12 14:13:34.277000000","t_id":1,"t_customer_id":7,"t_amount":99.08}
_5
{"t_time":"2022-10-12 14:13:34.497000000","t_id":2,"t_customer_id":24,"t_amount":405.01}
_5
{"t_time":"2022-10-12 14:13:34.609000000","t_id":3,"t_customer_id":19,"t_amount":974.90}
_5
{"t_time":"2022-10-12 14:13:34.724000000","t_id":4,"t_customer_id":8,"t_amount":100.19}
_5
{"t_time":"2022-10-12 14:13:34.838000000","t_id":5,"t_customer_id":24,"t_amount":161.48}

Connecting and reading data from Apache Kafka

This recipe uses the same implementation as the Kafka JSON to POJO recipe to connect to the transactions topic and deserialize the data. It is recommended to read the Kafka JSON to POJO recipe before continuing.

You create a stream transactionStream as a result.

Like done before in the Joining and Deduplicating Data recipe, you are switching from the DataStreams to the Table API by creating a temporary Transactions view from the incoming stream created by the DataStream API.

WriteParquetFiles.java

_103
tableEnv.createTemporaryView("Transactions", transactionStream);

Add dependency on Apache Parquet library

You start with adding Flink’s Parquet file format artifact to your POM file.

pom.xml

_323
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
_323
<modelVersion>4.0.0</modelVersion>
_323
_323
<groupId>com.immerok</groupId>
_323
<artifactId>flink-parquet</artifactId>

Apache Parquet relies on Apache Hadoop®, which requires adding hadoop-common and hadoop-mapreduce-client-core. Both dependencies contain transitive dependencies which conflict with Apache Flink and therefore need to be excluded.

pom.xml

_323
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
_323
<modelVersion>4.0.0</modelVersion>
_323
_323
<groupId>com.immerok</groupId>
_323
<artifactId>cookbook-write-parquet-files</artifactId>
_323
<version>0.1</version>
_323
<packaging>jar</packaging>
_323
_323
<name>Flink Cookbook Recipe - Write Parquet Files</name>
_323
_323
<properties>
_323
<flink.version>1.16.0</flink.version>
_323
<jackson.version>2.14.0</jackson.version>
_323
<junit.jupiter.version>5.9.1</junit.jupiter.version>
_323
<kafka.version>3.2.2</kafka.version>
_323
<log4j.version>2.19.0</log4j.version>
_323
<maven.compiler.source>${target.java.version}</maven.compiler.source>
_323
<maven.compiler.target>${target.java.version}</maven.compiler.target>
_323
<!-- Flink 1.15.2 uses Parquet 1.12.2
_323
https://github.com/apache/flink/blob/release-1.15.2/flink-formats/pom.xml#L34
_323
Parquet 1.12.2 uses Hadoop 2.10.1
_323
https://github.com/apache/parquet-mr/blob/apache-parquet-1.12.2/pom.xml#L82
_323
-->
_323
<parquet.hadoop.version>2.10.2</parquet.hadoop.version>
_323
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
_323
<target.java.version>11</target.java.version>
_323
</properties>
_323
_323
<dependencies>
_323
<!-- Apache Flink dependencies -->
_323
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
_323
<dependency>
_323
<!-- We need to exclude Zookeeper, Log4J and SLF4J to avoid conflicts with Flink's usage -->

You are adding these dependencies here as provided. If you want to use Parquet in a production cluster, you'll need to configure Flink with Hadoop.

Create a Parquet Sink Table

Before selecting and transforming your data, you create a so-called Dynamic Table.
This recipe uses a TableDescriptor.Builder for defining the schema, path and format.

WriteParquetFiles.java

_103
package com.immerok.cookbook;
_103
_103
import com.immerok.cookbook.records.Transaction;
_103
import com.immerok.cookbook.records.TransactionDeserializer;
_103
import java.time.Duration;
_103
import java.time.ZoneId;
_103
import java.util.function.Consumer;
_103
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
_103
import org.apache.flink.api.java.utils.ParameterTool;
_103
import org.apache.flink.connector.kafka.source.KafkaSource;
_103
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
_103
import org.apache.flink.core.fs.Path;
_103
static TableDescriptor.Builder createFilesystemConnectorDescriptor(Path dataDirectory) {

You then use TableEnvironment#createTable to create the table, while enabling partitioning and compaction:

WriteParquetFiles.java

_103
package com.immerok.cookbook;
_103
_103
import com.immerok.cookbook.records.Transaction;
_103
import com.immerok.cookbook.records.TransactionDeserializer;
_103
import java.time.Duration;
_103
import java.time.ZoneId;
_103
createFilesystemConnectorDescriptor(dataDirectory)

You can specify a location for writing the Parquet files by providing the argument --outputFolder.

See Also

The Parquet format implementation in Flink also allows you to specify any configuration option from ParquetOutputFormat, such as parquet.compression to select a compression algorithm.

Enable partitioning

Flink’s FileSystem connector supports partitioning. Imagine that you need to get some data from the Parquet files and are only interested in the results of a specific day. A properly optimized application (using "partition pruning") will optimize its reader so that only the data for that specific day is being read. That will tremendously improve your query results. Another use case is that you could automatically delete all data that is older than 90 days to be compliant with data retention policies. In this recipe, the data is partitioned by t_date so that all data for a specific day ends up in one specific folder.

WriteParquetFiles.java

_103
.partitionedBy("t_date")

Enable compaction

Flink's FileSystem connector also supports compaction. Compaction makes it possible to have smaller checkpoint intervals without generating a large number of small files. File compaction will merge multiple small files into larger files. You can run this automatically like in this recipe, or you could set a target file size for the compacted file.

Before files are merged, they are invisible. That means that the visibility of the file is depending on the time it takes before a checkpoint has completed plus the time that's needed to compact it. If compaction takes too long, it will result in backpressure.

WriteParquetFiles.java

_103
.option("auto-compaction", "true")

Select and transform the data

After creating the dynamic table, you will use SQL to select and transform the data from Transactions view. You want to select all data from the Apache Kafka event. After selecting that, you will cast the timestamp to a string. This is done to avoid that the Flink data type timestamp is mapped to Parquet type INT96, which is how Flink currently maps the timestamp type. If you don't do this, you can lose the local timezone information from your data. That would result in a time jump in your partitioning results as well as in the timestamps in the Parquet files.

Since you want to partition data by day, you convert the timestamp to the yyyy_MM_dd format.

In the end, you insert everything into your previously defined dynamic table.

WriteParquetFiles.java

_103
package com.immerok.cookbook;
_103
_103
import com.immerok.cookbook.records.Transaction;
_103
import com.immerok.cookbook.records.TransactionDeserializer;
_103
import java.time.Duration;
_103
import java.time.ZoneId;
_103
import java.util.function.Consumer;
_103
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
_103
import org.apache.flink.api.java.utils.ParameterTool;
_103
import org.apache.flink.connector.kafka.source.KafkaSource;
_103
// for accurate results we cast the timestamp to a string

The full recipe

This recipe is self contained. You can run the WriteParquetFilesTest#testProductionJob test to see the full recipe in action. That test uses an embedded Apache Kafka and Apache Flink setup, so you can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments and tests included in the code for more details.