Connecting to Apache Kafka® in Confluent® Cloud
Creating a new pipeline from scratch can be difficult, especially if there are no data sources and sinks set up yet.
Apache Kafka is a distributed data store optimized for ingesting and processing streaming data in real-time. Confluent Cloud offers a fully-managed version of Kafka as a service.
This tutorial explains how to connect to Confluent Cloud and use a managed Apache Kafka instance as an event store for your Apache Flink job.
What you can learn from this tutorial
You will learn how to connect to Confluent Cloud from Immerok Cloud using Apache Flink's native Kafka connector. Just like in a typical development cycle, you will start with ingesting and exploring the data in an IDE.
The tutorial uses the SASL/PLAIN authentication mechanism, a secure authentication based on username/password credentials that are typically used with TLS for encryption.
For advanced configuration or more explanation, you can always fallback to the official Apache Flink connector documentation for the DataStream API or the Table API.
Before you start
Before you start, please make sure that...
- ...your Github account was allow-listed for the Immerok Cloud Private Beta as described in Sign Up
- ...you already set up the
rok
CLI as described in Setting Up therok
CLI
Setting up Confluent Cloud
Use the following steps to get started with Confluent Cloud. You can skip this section if you know Confluent Cloud well.
To get started you will need an empty Kafka topic that will be used by the following examples.
-
Make sure you have signed up for a paid or trial subscription to Confluent Cloud.
-
After login, create an empty cluster with Basic configuration and launch it. For example, call it "MyCluster".
-
Open the instructions for setting up a Java client.
-
Scroll to the prepared configuration snippet and press Create Kafka cluster API key for creating initial credentials.
Confluent Cloud will offer to download a file containing these credentials. You can download this file, but it won't be used in this tutorial.
Instead, the created credentials will also appear in the configuration file preview.
-
Copy the resulting Java configuration file into a temporary editor. The file should look similar to the following:
-
Switch to the Topics page.
-
Create an empty topic using the default settings. For example, call it "MyTopic".
The Kafka topic is now ready to consume data and be queried.
Project setup
Import the code into your IDE
The code from this guide is available in a Github repository.
Clone the entire repository and switch to the confluent-kafka-sasl-plain
directory.
git clone git@github.com:immerok/examples.gitcd examples/confluent-kafka-sasl-plain
Import this directory into your IDE. We recommend IntelliJ® IDEA as it provides the smoothest experience.
Select the pom.xml file during import to treat it as a Maven project. In the case of IntelliJ IDEA, you don't need to install Maven on your machine, as it can handle Maven projects on its own.
The example jobs can be run directly in your IDE by executing the main()
method of each example class.
For IntelliJ IDEA it is necessary to edit the run configuration
when running an example for the first time. To avoid NoClassDefFoundError
, tick the Add dependencies with "provided" scope to classpath
option under Modify options.
Configure connection properties
All examples rely on the properties file located under src/main/resources/confluent-java.config
.
Replace its content with the configuration copied over from Confluent Cloud earlier.
Now you are all set, let's access our Kafka topic!
Example data
The example uses a JSON schema consisting of message objects with username and text message:
_4{_4 "user": "Bob",_4 "message": "Hello World!"_4}
To fill your topic with some sample data, you can add some JSON data via Confluent Cloud's Web UI, or start with an example that writes to the topic you've created.
DataStream API
For the DataStream API, the example implements a Java object Message
and uses corresponding SerializationSchema
/DeserializationSchema
implementations that match the example schema.
Write to Confluent Cloud
The StreamToConfluentKafka
class defines a program for writing to Confluent Cloud with the DataStream API.
After loading the configuration as a Properties
object, you can pass it to the KafkaSink
:
The snippet shows the minimal parameters required to produce data into a Kafka topic. Exactly-once semantics require additional configuration.
You can use the API to define a workflow that creates and ingests example data:
If you want to see how the data ends up in Kafka, open the corresponding topic in Confluent Cloud's Web UI and click Messages.
Keep the browser open while running the main()
method in your IDE. The UI should visualize the ingested data after
a couple of seconds.
If you missed keeping the browser open, you could also view previously ingested data by Jump to offset 0
.
Read from Confluent Cloud
The StreamFromConfluentKafka
class defines a program for reading from Confluent Cloud with the DataStream API.
The source example assumes that data has been added to the topic (either via Confluent Cloud's Web UI or the sink example).
After loading the configuration as a Properties
object, you can pass it to the KafkaSource
:
By default, the result is printed to standard output which should be sufficient for IDE development. The stream deduplication example shows an end-to-end pipeline runnable with Immerok Cloud.
Table API
For the Table API, this tutorial uses the same example schema when creating a table.
Create a table
As the API is centered around the concept of tables, you define a TableDescriptor
that acts as a connector template to
create a logical table backed by a Kafka topic for both writing and reading.
After loading the configuration as a Properties
object, you can pass the key/value pairs to the descriptor builder.
_88package com.immerok.cloud.examples;_88_88import com.immerok.cloud.examples.utils.ConfigParser;_88import java.util.Properties;_88import org.apache.flink.table.api.DataTypes;_88import org.apache.flink.table.api.EnvironmentSettings;_88import org.apache.flink.table.api.Schema;_88import org.apache.flink.table.api.TableDescriptor;_88import org.apache.flink.table.api.TableEnvironment;_88import org.apache.flink.table.api.TableResult;_88_88/**_88 * Table API example how to read from Apache Kafka® (in Confluent® Cloud) using SASL/PLAIN_88 * authentication._88 (key, value) -> descriptorBuilder.option("properties." + key, (String) value));
Afterwards, create a table out of it for the catalog:
_88 tableEnv.createTemporaryTable("MyTableFromTopic", topicTableDescriptor);
Write to Confluent Cloud
The TableToConfluentKafka
class defines a program for writing to Confluent Cloud with the Table API.
Once the table has been registered under a name, you can use the API to define a workflow that creates and ingests example data:
If you want to see how the data ends up in Kafka, open the corresponding topic in Confluent Cloud's Web UI and click Messages.
Keep the browser open while running the main()
method in your IDE. The UI should visualize the ingested data after
a couple of seconds.
If you missed keeping the browser open, you could also view previously ingested data by Jump to offset 0
.
Read from Confluent Cloud
The TableFromConfluentKafka
class defines a program for reading from Confluent Cloud with the Table API.
This source example assumes that data has already been added to the topic (either via Confluent Cloud's Web UI or the sink example).
Once the table has been registered under a name, you can use the Table API or SQL to define a workflow that reads the data:
By default, the result is printed to standard output which should be sufficient for IDE development. The stream deduplication example shows an end-to-end pipeline that is runnable with Immerok Cloud.
Run an end-to-end pipeline
The previous examples used little data with a predefined schema printing to standard out.
Now let's run a full end-to-end example in Immerok Cloud with some prepared pipeline that reads from and writes to Confluent Cloud.
Pipeline overview
The ConfluentKafkaDeduplication
class defines a full program with the DataStream API. A similar program could be defined
with the Table API but is omitted here for simplicity.
The example performs stream deduplication from a source topic to a destination topic.
The code works with arbitrary JSON that contains a top-level key. The key is extracted in a custom DeserializationSchema
which stores the extracted key next to the raw JSON bytes.
The program is stateful and stores whether the given key has been seen before using a ProcessFunction
. Only records with
unseen keys are emitted downstream. To not grow out of memory, eventually a time-to-live (TTL) limit is passed.
The example is runnable from within an IDE if the required main method arguments have been provided.
But for this tutorial, let's run this in Immerok Cloud!
Submit to Immerok Cloud
You can execute a full pipeline on Immerok Cloud that is connected with Confluent Cloud.
-
Build the
confluent-kafka-sasl-plain
project.If Maven is installed on your machine, run the
mvn clean package
shell command in the directory.For IntelliJ, you can use the Maven tab and execute clean and package under Lifecycle.
-
Make sure you have created two topics in Confluent Cloud.
In our example, they are called "MySourceTopic" and "MySinkTopic".
You can also use an existing topic with your own JSON data as the source topic.
-
Make sure you have signed up for Immerok Cloud and set up the
rok
CLI.Call
rok auth login
if you aren't already authenticated. -
Create a job with the
rok
CLI and pass the required parameters to the main method.
_4rok create job my-dedup-job \_4 --file ./target/example-confluent-kafka-sasl-plain-0.1.jar \_4 --entry-class=com.immerok.cloud.examples.ConfluentKafkaDeduplication \_4 --main-args="--srcTopic MySourceTopic --destTopic MySinkTopic --key user --ttl 3600000"
--key
defines the JSON field on which the deduplication will be performed.
--ttl
defines how long to keep the state used for deduplication.
-
Ensure that the job is running via
rok describe job my-dedup-job
. Flink's job state should beRunning
. -
In the next step, you will add some data to your source topic and observe how the deduplicated stream ends up in the destination topic.
Use the Confluent Cloud Web UI for both ingesting and visualizing the data. For this, open a browser tab for each topic.
-
For ingesting, select the Messages tab of the
MySourceTopic
topic and press Produce a new message to this topic.You can use data from the previous example as the value and
null
as a key in the UI:
_4{_4 "user": "Bob",_4 "message": "Hello World!"_4}
_4{_4 "user": "Bob",_4 "message": "Hello World!"_4}
_4{_4 "user": "Alice",_4 "message": "¡Hola Mundo!"_4}
-
If you want to see how the data ends up in Kafka, open the
MySinkTopic
topic in Confluent Cloud's Web UI and click Messages.The destination topic will only contain a single message for
Bob
to prove that the deduplication worked:
_4{_4 "user": "Bob",_4 "message": "Hello World!"_4}
_4{_4 "user": "Alice",_4 "message": "¡Hola Mundo!"_4}
If you missed keeping the browser open, you could also view previously ingested data by Jump to offset 0
.
- Run
rok delete job my-dedup-job
to clean up your project. Remember to also delete your topics afterward.
Summary & next steps
In this tutorial, you run several jobs in your IDE and on Immerok Cloud accessing Confluent Cloud.
For simplicity, the credentials were part of the submitted Artifact. However, ideally they should be safely stored and passed to the application.
-
Check out the development guides to learn more about developing production-ready pipelines.
-
Use the Immerok Cloud guide to monitor the lifecycle of your Immerok Cloud Jobs.