Reading Google Protocol Buffers with Apache Flink®Visit the read-protobuf recipe on GitHub.
Reading Google Protocol Buffers from Apache Kafka®
Google’s Protocol Buffers (Protobuf) are a language/platform neutral and extensible mechanism for serializing structured data. Protobuf is an alternative to the well-known formats like JSON and Apache Avro® and is commonly used as a serializer for Apache Kafka.
In this recipe, you are going to consume Protobuf-encoded event data from an Apache Kafka topic and print the data to screen.
You will use both the Table API and the DataStream API. It starts with consuming the events using the Table API Kafka connector before switching to the DataStream API for printing the information. This shows that you can use Protobuf in either your Table API or in your DataStream API application.
This recipe for Apache Flink is a self contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.
The Protobuf input data
The recipe uses the Kafka topic
transactions, containing Protobuf-encoded records.
The Protobuf schema for these records is defined in
Transaction.proto. You can use either
Protobuf project setup
To add support for Protobuf, you need to add dependencies and configure them correctly in the Maven project.
Add dependency on flink-protobuf
You start with adding Flink’s Protobuf file format artifact to your POM file.
Add Protobuf Maven plugin
After adding the dependency on
flink-protobuf, you need to add the
protoc-jar-maven-plugin. This Maven plugin
compiles and packages our
src/main/protobuf into Java classes at
This is done during the
generate-sources phase of the Maven lifecycle.
These Java classes need to be provided in the classpath of your Flink application. In IntelliJ, you
can do that by Generating Sources and Update Project Folders.
Connecting to Kafka
You are using the Apache Flink
Kafka Table API
connector in the application to connect to your Apache Kafka broker.
This is where you define the schema for your incoming Protobuf event, that you are using the Protobuf format and
the class name of your Protobuf schema.
Creating and selecting from a dynamic table
After defining your incoming stream, you need to create a temporary dynamic table and select the data from this table.
Switching to the DataStream API
You are going to switch back from the previously created Dynamic Table to the DataStream API. Flink currently only supports Protobuf when using a Source or a Sink from a Table API job. If required, you can still implement your main pipeline or access more low-level operations by switching to the DataStream API.
Because the SQL query is a regular select, the result of the Dynamic Table is an insert-only changelog stream.
You can find the data type mapping from Flink types to Protobuf types in the Flink Protobuf documentation.
The full recipe
This recipe is self contained. You can run the
ReadProtobufTest#testProductionJob test to see the full recipe
in action. That test uses an embedded Apache Kafka and Apache Flink setup, so you can run it directly via
Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.
See the comments and test included in the code for more details.
It's currently not possible to test the recipe itself because Flink's Kafka Table API connector only supports unbounded (streaming) mode. Testing requires support for bounded (batch) mode. This is tracked under FLINK-24456.