Skip to main content

Connecting to Apache Kafka® in Confluent® Cloud

Creating a new pipeline from scratch can be difficult, especially if there are no data sources and sinks set up yet.

Apache Kafka is a distributed data store optimized for ingesting and processing streaming data in real-time. Confluent Cloud offers a fully-managed version of Kafka as a service.

This tutorial explains how to connect to Confluent Cloud and use a managed Apache Kafka instance as an event store for your Apache Flink job.

What you can learn from this tutorial

You will learn how to connect to Confluent Cloud from Immerok Cloud using Apache Flink's native Kafka connector. Just like in a typical development cycle, you will start with ingesting and exploring the data in an IDE.

The tutorial uses the SASL/PLAIN authentication mechanism, a secure authentication based on username/password credentials that are typically used with TLS for encryption.

See Also

For advanced configuration or more explanation, you can always fallback to the official Apache Flink connector documentation for the DataStream API or the Table API.

Before you start

Before you start, please make sure that...

  • ...your Github account was allow-listed for the Immerok Cloud Private Beta as described in Sign Up
  • ...you already set up the rok CLI as described in Setting Up the rok CLI

Setting up Confluent Cloud

note

Use the following steps to get started with Confluent Cloud. You can skip this section if you know Confluent Cloud well.

To get started you will need an empty Kafka topic that will be used by the following examples.

  1. Make sure you have signed up for a paid or trial subscription to Confluent Cloud.

  2. After login, create an empty cluster with Basic configuration and launch it. For example, call it "MyCluster".

  3. Open the instructions for setting up a Java client.

  4. Scroll to the prepared configuration snippet and press Create Kafka cluster API key for creating initial credentials.

    Confluent Cloud will offer to download a file containing these credentials. You can download this file, but it won't be used in this tutorial.

    Instead, the created credentials will also appear in the configuration file preview.

  5. Copy the resulting Java configuration file into a temporary editor. The file should look similar to the following:

confluent-java.config

_25
###################################################################################################
_25
# Example Confluent Cloud Java configuration
_25
#
_25
# Make sure to replace this file with your client configuration.
_25
# Including valid 'bootstrap.servers' and 'sasl.jaas.config' with cluster API key and secret.
_25
###################################################################################################
_25
_25
# Required connection configs for Kafka producer, consumer, and admin
_25
bootstrap.servers=some-server.us-east-2.aws.confluent.cloud:9092
_25
security.protocol=SASL_SSL
_25
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='***' password='***';
_25
sasl.mechanism=PLAIN

  1. Switch to the Topics page.

  2. Create an empty topic using the default settings. For example, call it "MyTopic".

The Kafka topic is now ready to consume data and be queried.

Project setup

Import the code into your IDE

The code from this guide is available in a Github repository.

Clone the entire repository and switch to the confluent-kafka-sasl-plain directory.


git clone git@github.com:immerok/examples.git
cd examples/confluent-kafka-sasl-plain

Import this directory into your IDE. We recommend IntelliJ® IDEA as it provides the smoothest experience.

Select the pom.xml file during import to treat it as a Maven project. In the case of IntelliJ IDEA, you don't need to install Maven on your machine, as it can handle Maven projects on its own.

The example jobs can be run directly in your IDE by executing the main() method of each example class.

caution

For IntelliJ IDEA it is necessary to edit the run configuration when running an example for the first time. To avoid NoClassDefFoundError, tick the Add dependencies with "provided" scope to classpath option under Modify options.

Configure connection properties

All examples rely on the properties file located under src/main/resources/confluent-java.config.

Replace its content with the configuration copied over from Confluent Cloud earlier.

Now you are all set, let's access our Kafka topic!

Example data

The example uses a JSON schema consisting of message objects with username and text message:


_4
{
_4
"user": "Bob",
_4
"message": "Hello World!"
_4
}

To fill your topic with some sample data, you can add some JSON data via Confluent Cloud's Web UI, or start with an example that writes to the topic you've created.

DataStream API

For the DataStream API, the example implements a Java object Message and uses corresponding SerializationSchema/DeserializationSchema implementations that match the example schema.

Write to Confluent Cloud

The StreamToConfluentKafka class defines a program for writing to Confluent Cloud with the DataStream API.

After loading the configuration as a Properties object, you can pass it to the KafkaSink:

StreamToConfluentKafka.java

_73
package com.immerok.cloud.examples;
_73
_73
import com.immerok.cloud.examples.events.Message;
_73
import com.immerok.cloud.examples.utils.ConfigParser;
_73
import java.util.Properties;
_73
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
_73
import org.apache.flink.connector.kafka.sink.KafkaSink;
_73
import org.apache.flink.formats.json.JsonSerializationSchema;
_73
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
_73
new JsonSerializationSchema<Message>())

The snippet shows the minimal parameters required to produce data into a Kafka topic. Exactly-once semantics require additional configuration.

You can use the API to define a workflow that creates and ingests example data:

StreamToConfluentKafka.java

_73
package com.immerok.cloud.examples;
_73
_73
import com.immerok.cloud.examples.events.Message;
_73
import com.immerok.cloud.examples.utils.ConfigParser;
_73
new Message("Charly", "¡Hola Mundo!"))

If you want to see how the data ends up in Kafka, open the corresponding topic in Confluent Cloud's Web UI and click Messages.

Keep the browser open while running the main() method in your IDE. The UI should visualize the ingested data after a couple of seconds.

If you missed keeping the browser open, you could also view previously ingested data by Jump to offset 0.

Read from Confluent Cloud

The StreamFromConfluentKafka class defines a program for reading from Confluent Cloud with the DataStream API.

The source example assumes that data has been added to the topic (either via Confluent Cloud's Web UI or the sink example).

After loading the configuration as a Properties object, you can pass it to the KafkaSource:

StreamFromConfluentKafka.java

_73
package com.immerok.cloud.examples;
_73
_73
import com.immerok.cloud.examples.events.Message;
_73
import com.immerok.cloud.examples.utils.ConfigParser;
_73
import java.util.Properties;
_73
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
_73
.setValueOnlyDeserializer(new JsonDeserializationSchema<>(Message.class))

By default, the result is printed to standard output which should be sufficient for IDE development. The stream deduplication example shows an end-to-end pipeline runnable with Immerok Cloud.

Table API

For the Table API, this tutorial uses the same example schema when creating a table.

Create a table

As the API is centered around the concept of tables, you define a TableDescriptor that acts as a connector template to create a logical table backed by a Kafka topic for both writing and reading.

After loading the configuration as a Properties object, you can pass the key/value pairs to the descriptor builder.


_88
package com.immerok.cloud.examples;
_88
_88
import com.immerok.cloud.examples.utils.ConfigParser;
_88
import java.util.Properties;
_88
import org.apache.flink.table.api.DataTypes;
_88
import org.apache.flink.table.api.EnvironmentSettings;
_88
import org.apache.flink.table.api.Schema;
_88
import org.apache.flink.table.api.TableDescriptor;
_88
import org.apache.flink.table.api.TableEnvironment;
_88
import org.apache.flink.table.api.TableResult;
_88
_88
/**
_88
* Table API example how to read from Apache Kafka® (in Confluent® Cloud) using SASL/PLAIN
_88
* authentication.
_88
(key, value) -> descriptorBuilder.option("properties." + key, (String) value));

Afterwards, create a table out of it for the catalog:


_88
tableEnv.createTemporaryTable("MyTableFromTopic", topicTableDescriptor);

Write to Confluent Cloud

The TableToConfluentKafka class defines a program for writing to Confluent Cloud with the Table API.

Once the table has been registered under a name, you can use the API to define a workflow that creates and ingests example data:

TableToConfluentKafka.java

_82
package com.immerok.cloud.examples;
_82
_82
import com.immerok.cloud.examples.utils.ConfigParser;
_82
import java.util.Properties;
_82
import org.apache.flink.table.api.DataTypes;
_82
import org.apache.flink.table.api.EnvironmentSettings;
_82
Row.of("Charly", "¡Hola Mundo!"))

If you want to see how the data ends up in Kafka, open the corresponding topic in Confluent Cloud's Web UI and click Messages.

Keep the browser open while running the main() method in your IDE. The UI should visualize the ingested data after a couple of seconds.

If you missed keeping the browser open, you could also view previously ingested data by Jump to offset 0.

Read from Confluent Cloud

The TableFromConfluentKafka class defines a program for reading from Confluent Cloud with the Table API.

This source example assumes that data has already been added to the topic (either via Confluent Cloud's Web UI or the sink example).

Once the table has been registered under a name, you can use the Table API or SQL to define a workflow that reads the data:

TableFromConfluentKafka.java

_88
package com.immerok.cloud.examples;
_88
_88
import com.immerok.cloud.examples.utils.ConfigParser;
_88
tableEnv.sqlQuery("SELECT * FROM MyTableFromTopic")

By default, the result is printed to standard output which should be sufficient for IDE development. The stream deduplication example shows an end-to-end pipeline that is runnable with Immerok Cloud.

Run an end-to-end pipeline

The previous examples used little data with a predefined schema printing to standard out.

Now let's run a full end-to-end example in Immerok Cloud with some prepared pipeline that reads from and writes to Confluent Cloud.

Pipeline overview

The ConfluentKafkaDeduplication class defines a full program with the DataStream API. A similar program could be defined with the Table API but is omitted here for simplicity.

The example performs stream deduplication from a source topic to a destination topic.

ConfluentKafkaDeduplication.java

_158
package com.immerok.cloud.examples;
_158
_158
import com.immerok.cloud.examples.events.IdentitySerializationSchema;
_158
import com.immerok.cloud.examples.events.KeyedJson;
_158
import com.immerok.cloud.examples.events.KeyedJsonDeserializationSchema;
_158
import com.immerok.cloud.examples.utils.ConfigParser;
_158
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Confluent Kafka");

The code works with arbitrary JSON that contains a top-level key. The key is extracted in a custom DeserializationSchema which stores the extracted key next to the raw JSON bytes.

KeyedJsonDeserializationSchema.java

_36
package com.immerok.cloud.examples.events;
_36
_36
import com.fasterxml.jackson.databind.JsonNode;
_36
import com.fasterxml.jackson.databind.ObjectMapper;
_36
public KeyedJson deserialize(byte[] rawJson) throws IOException {

The program is stateful and stores whether the given key has been seen before using a ProcessFunction. Only records with unseen keys are emitted downstream. To not grow out of memory, eventually a time-to-live (TTL) limit is passed.

ConfluentKafkaDeduplication.java

_158
package com.immerok.cloud.examples;
_158
_158
import com.immerok.cloud.examples.events.IdentitySerializationSchema;
_158
import com.immerok.cloud.examples.events.KeyedJson;
_158
import com.immerok.cloud.examples.events.KeyedJsonDeserializationSchema;
_158
import com.immerok.cloud.examples.utils.ConfigParser;
_158
import java.util.Properties;
_158
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
_158
import org.apache.flink.api.common.state.StateTtlConfig;
_158
import org.apache.flink.api.common.state.ValueState;
_158
import org.apache.flink.api.common.state.ValueStateDescriptor;
_158
import org.apache.flink.api.common.time.Time;
_158
import org.apache.flink.api.java.utils.ParameterTool;
_158
import org.apache.flink.configuration.Configuration;
_158
import org.apache.flink.connector.base.DeliveryGuarantee;
_158
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
_158
import org.apache.flink.connector.kafka.sink.KafkaSink;
_158
import org.apache.flink.connector.kafka.source.KafkaSource;
_158
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
_158
import org.apache.flink.streaming.api.datastream.DataStreamSource;
_158
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
_158
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
_158
import org.apache.flink.util.Collector;
_158
_158
/**
_158
* DataStream API example how to read from and write to Apache Kafka® (in Confluent® Cloud) using
_158
* SASL/PLAIN authentication.
_158
*
_158
* <p>Make sure to replace the configuration in {@code resources/confluent-java.config} with your
_158
* credentials.
_158
*
_158
* <p>The example is schema agnostic and deduplication happens on JSON level using the provided key.
_158
*
_158
* <p>Make sure to pass the following main() args:
_158
*
_158
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.milliseconds(ttl)).build();

The example is runnable from within an IDE if the required main method arguments have been provided.

But for this tutorial, let's run this in Immerok Cloud!

Submit to Immerok Cloud

You can execute a full pipeline on Immerok Cloud that is connected with Confluent Cloud.

  1. Build the confluent-kafka-sasl-plain project.

    If Maven is installed on your machine, run the mvn clean package shell command in the directory.

    For IntelliJ, you can use the Maven tab and execute clean and package under Lifecycle.

  2. Make sure you have created two topics in Confluent Cloud.

    In our example, they are called "MySourceTopic" and "MySinkTopic".

    You can also use an existing topic with your own JSON data as the source topic.

  3. Make sure you have signed up for Immerok Cloud and set up the rok CLI.

    Call rok auth login if you aren't already authenticated.

  4. Create a job with the rok CLI and pass the required parameters to the main method.


_4
rok create job my-dedup-job \
_4
--file ./target/example-confluent-kafka-sasl-plain-0.1.jar \
_4
--entry-class=com.immerok.cloud.examples.ConfluentKafkaDeduplication \
_4
--main-args="--srcTopic MySourceTopic --destTopic MySinkTopic --key user --ttl 3600000"

--key defines the JSON field on which the deduplication will be performed.

--ttl defines how long to keep the state used for deduplication.

  1. Ensure that the job is running via rok describe job my-dedup-job. Flink's job state should be Running.

  2. In the next step, you will add some data to your source topic and observe how the deduplicated stream ends up in the destination topic.

    Use the Confluent Cloud Web UI for both ingesting and visualizing the data. For this, open a browser tab for each topic.

  3. For ingesting, select the Messages tab of the MySourceTopic topic and press Produce a new message to this topic.

    You can use data from the previous example as the value and null as a key in the UI:


_4
{
_4
"user": "Bob",
_4
"message": "Hello World!"
_4
}


_4
{
_4
"user": "Bob",
_4
"message": "Hello World!"
_4
}


_4
{
_4
"user": "Alice",
_4
"message": "¡Hola Mundo!"
_4
}

  1. If you want to see how the data ends up in Kafka, open the MySinkTopic topic in Confluent Cloud's Web UI and click Messages.

    The destination topic will only contain a single message for Bob to prove that the deduplication worked:


_4
{
_4
"user": "Bob",
_4
"message": "Hello World!"
_4
}


_4
{
_4
"user": "Alice",
_4
"message": "¡Hola Mundo!"
_4
}

If you missed keeping the browser open, you could also view previously ingested data by Jump to offset 0.

  1. Run rok delete job my-dedup-job to clean up your project. Remember to also delete your topics afterward.

Summary & next steps

In this tutorial, you run several jobs in your IDE and on Immerok Cloud accessing Confluent Cloud.

For simplicity, the credentials were part of the submitted Artifact. However, ideally they should be safely stored and passed to the application.