Skip to main content

Reading Change Data Capture (CDC) with Apache Flink®

Visit the change-data-capture recipe on GitHub.

With Change Data Capture, all inserts, updates, and deletes that are committed to your database are captured. You can use this data for use cases such as keeping your standby database in sync with your primary database, keeping your cache up-to-date, or to stream data in realtime into your data warehouse.

The CDC Connectors for Apache Flink® offer a set of source connectors for Apache Flink that supports a wide variety of databases. The connectors integrate Debezium® as the engine to capture the data changes. There are currently CDC Connectors for MongoDB®, MySQL® (including MariaDB®, AWS Aurora®, AWS RDS®), Oracle®, Postgres®, Microsoft SQL Server®, and many more. The connectors support exactly-once semantics.

In this recipe, you are going to consume change data capture from a Postgres database. You will use the DataStream API to connect to the table transactions.incoming and print the JSON payload with the captured data changes.

This recipe for Apache Flink is a self contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.

The Postgres table

The recipe uses the Postgres schema transactions and the Postgres database incoming.


_6
CREATE schema transactions;
_6
CREATE TABLE transactions.incoming (
_6
t_id serial PRIMARY KEY,
_6
t_customer_id serial,
_6
t_amount REAL
_6
);

Add dependency on Postgres CDC Connector

You start with adding the Postgres CDC Connector to your POM file.

pom.xml

_246
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
_246
<modelVersion>4.0.0</modelVersion>
_246
_246
<groupId>com.immerok</groupId>
_246
<artifactId>cookbook-change-data-capture</artifactId>
_246
<artifactId>flink-connector-postgres-cdc</artifactId>

Connecting and reading data from Postgres

You are using the Postgres CDC Connector in the application to connect to Postgres. Next to the necessary connection information, you are configuring the connector to use the deserializer called JsonDebeziumDeserializationSchema. The connector supports different Postgres logical decoding plugins, which can be configured via the decodingPluginName.

ReadCDC.java

_48
package com.immerok.cookbook;
_48
_48
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
_48
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
_48
import java.util.function.Consumer;
_48
import org.apache.flink.streaming.api.datastream.DataStream;
_48
import org.apache.flink.streaming.api.datastream.DataStreamSource;
_48
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
_48
import org.apache.flink.streaming.api.functions.sink.PrintSink;
_48
import org.apache.flink.streaming.api.functions.source.SourceFunction;
_48
_48
public class ReadCDC {
_48
_48
.deserializer(new JsonDebeziumDeserializationSchema())

Set parallelism for the source

Postgres and other databases have a binary log, that contains all changes to the database. The binary log used by the CDC connector. The Postgres CDC source can't read in parallel, because there is only one task that can receive binlog events. That requires setting the parallelism for the source to 1.

ReadCDC.java

_48
final DataStreamSource<String> postgres = env.addSource(source).setParallelism(1);

The full recipe

This recipe is self contained. You can run the ReadCDCTest#testProductionJob test to see the full recipe in action. That test uses an embedded Postgres and Apache Flink setup, so you can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments and tests included in the code for more details.