Skip to main content

Reading Google Protocol Buffers with Apache Flink®

Visit the read-protobuf recipe on GitHub.

Reading Google Protocol Buffers from Apache Kafka®

Google’s Protocol Buffers (Protobuf) are a language/platform neutral and extensible mechanism for serializing structured data. Protobuf is an alternative to the well-known formats like JSON and Apache Avro® and is commonly used as a serializer for Apache Kafka.

In this recipe, you are going to consume Protobuf-encoded event data from an Apache Kafka topic and print the data to screen.

You will use both the Table API and the DataStream API. It starts with consuming the events using the Table API Kafka connector before switching to the DataStream API for printing the information. This shows that you can use Protobuf in either your Table API or in your DataStream API application.

This recipe for Apache Flink is a self contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.

The Protobuf input data

The recipe uses the Kafka topic transactions, containing Protobuf-encoded records.

The Protobuf schema for these records is defined in Transaction.proto. You can use either proto2 or proto3.

Transaction.proto

_12
syntax = "proto3";
_12
option java_multiple_files = true;
_12
_12
package com.immerok.cookbook.protobuf;
_12
option java_package = "com.immerok.cookbook";
_12
_12
message Transaction {
_12
optional string t_time = 1;
_12
optional int64 t_id = 2;
_12
optional int64 t_customer_id = 3;
_12
optional double t_amount = 4;
_12
}

Protobuf project setup

To add support for Protobuf, you need to add dependencies and configure them correctly in the Maven project.

You start with adding Flink’s Protobuf file format artifact to your POM file.

pom.xml

_305
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
_305
<modelVersion>4.0.0</modelVersion>
_305
_305
<groupId>com.immerok</groupId>
_305
<artifactId>flink-protobuf</artifactId>

Add Protobuf Maven plugin

After adding the dependency on flink-protobuf, you need to add the protoc-jar-maven-plugin. This Maven plugin compiles and packages our Transaction.proto from src/main/protobuf into Java classes at target/proto-sources. This is done during the generate-sources phase of the Maven lifecycle. These Java classes need to be provided in the classpath of your Flink application. In IntelliJ, you can do that by Generating Sources and Update Project Folders.

pom.xml

_305
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
_305
<modelVersion>4.0.0</modelVersion>
_305
_305
<groupId>com.immerok</groupId>
_305
<artifactId>cookbook-read-protobuf</artifactId>
_305
<version>0.1</version>
_305
<packaging>jar</packaging>
_305
_305
<name>Flink Cookbook Recipe - Reading Protobuf</name>
_305
_305
<properties>
_305
<flink.version>1.16.0</flink.version>
_305
<protocArtifact>com.google.protobuf:protoc:${protoc.version}</protocArtifact>

Connecting to Kafka

You are using the Apache Flink Kafka Table API connector in the application to connect to your Apache Kafka broker. This is where you define the schema for your incoming Protobuf event, that you are using the Protobuf format and the class name of your Protobuf schema.

ReadProtobuf.java

_68
package com.immerok.cookbook;
_68
_68
import java.util.function.Consumer;
_68
import org.apache.flink.streaming.api.datastream.DataStream;
_68
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
_68
import org.apache.flink.streaming.api.functions.sink.PrintSink;
_68
import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory;
_68
import org.apache.flink.table.api.DataTypes;
_68
.column("t_customer_id", DataTypes.BIGINT())

Creating and selecting from a dynamic table

After defining your incoming stream, you need to create a temporary dynamic table and select the data from this table.

ReadProtobuf.java

_68
package com.immerok.cookbook;
_68
_68
import java.util.function.Consumer;
_68
import org.apache.flink.streaming.api.datastream.DataStream;
_68
tableEnv.createTemporaryTable("KafkaSourceWithProtobufEncodedEvents", transactionStream);

Switching to the DataStream API

You are going to switch back from the previously created Dynamic Table to the DataStream API. Flink currently only supports Protobuf when using a Source or a Sink from a Table API job. If required, you can still implement your main pipeline or access more low-level operations by switching to the DataStream API.

Because the SQL query is a regular select, the result of the Dynamic Table is an insert-only changelog stream.

ReadProtobuf.java

_68
// changelog stream

You can find the data type mapping from Flink types to Protobuf types in the Flink Protobuf documentation.

The full recipe

This recipe is self contained. You can run the ReadProtobufTest#testProductionJob test to see the full recipe in action. That test uses an embedded Apache Kafka and Apache Flink setup, so you can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments and test included in the code for more details.

note

It's currently not possible to test the recipe itself because Flink's Kafka Table API connector only supports unbounded (streaming) mode. Testing requires support for bounded (batch) mode. This is tracked under FLINK-24456.