Skip to main content

Writing an application in Kotlin™

Visit the kotlin recipe on GitHub.

Use the Kotlin programming language

In this recipe, you will create an Apache Flink application that every 5 seconds counts the number of words that are posted on an Apache Kafka topic using the Kotlin programming language.

Flink natively supports Java and Scala, but you can use any JVM language to create your application.

This recipe for Apache Flink is a self-contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.

Kotlin project setup

This project setup is based on the official Apache Flink Maven quickstart for the Java API .

To add support for Kotlin, you add dependencies and configure them correctly in the Maven project. The full documentation can be found at the Kotlin Maven build tool documentation.

Add dependency on Kotlin standard library

The kotlin-stdlib-jdk8 artifact can be used with JDK 8 and JDK 11 (or higher).

pom.xml

_313
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
_313
<modelVersion>4.0.0</modelVersion>
_313
_313
<groupId>com.immerok</groupId>
_313
<artifactId>cookbook-kotlin</artifactId>
_313
<!-- The Kotlin standard library + extensions to work with Java 7/8 features. -->

Configure source/test directories

You need to explicitly define your Kotlin source- and test directories in Maven, so you can reference them in the necessary configuration for the Kotlin Maven plugin.

pom.xml

_313
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
_313
<groupId>org.apache.maven.plugins</groupId>

Setup Kotlin Maven plugin

The Kotlin Maven plugin uses the previously configured values and connects them to the correct Maven lifecycles.

pom.xml

_313
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
_313
<modelVersion>4.0.0</modelVersion>
_313
_313
<groupId>com.immerok</groupId>
_313
<artifactId>cookbook-kotlin</artifactId>
_313
<version>0.1</version>
_313
<packaging>jar</packaging>
_313
_313
<name>Flink Cookbook Recipe - Writing an Application in Kotlin</name>
_313
_313
<properties>
_313
<flink.version>1.16.0</flink.version>
_313
<jackson.databind.version>2.14.0</jackson.databind.version>
_313
<jackson.version>2.14.0</jackson.version>
_313
<junit.jupiter.version>5.9.1</junit.jupiter.version>
_313
<kafka.version>3.2.2</kafka.version>
_313
<log4j.version>2.19.0</log4j.version>
_313
<maven.compiler.source>${target.java.version}</maven.compiler.source>
_313
<maven.compiler.target>${target.java.version}</maven.compiler.target>
_313
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
_313
<target.java.version>11</target.java.version>
_313
</properties>
_313
_313
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->

Configure main class in the shade-plugin

By configuring your main class, Maven can properly create a fat JAR when you are building the recipe.

pom.xml

_313
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
_313
<modelVersion>4.0.0</modelVersion>
_313
_313
</configuration>

The record input data

The recipe uses Kafka topic input, containing String records.


_6
"Frodo Baggins"
_6
"Meriadoc Brandybuck"
_6
"Boromir"
_6
"Gollum"
_6
"Sauron"
_6
...

Count the number of words

The recipe uses the Apache Flink KafkaSource connector in the application to connect to your Apache Kafka broker.

The recipe uses a Kotlin data class. The class has public and mutable properties, with a no-arg constructor so that Flink treats it as a POJO. This has performance benefits, and more importantly allows schema evolution (i.e., you can add/remove fields) when using this class for state. An immutable data class would be serialized via Kryo, because Flink does not have built-in support for such classes.

Alternatively you could also use Avro types, or implement custom TypeSerializers for your data types.

WordCount.kt

_77
package com.immerok.cookbook
_77
data class Event(var word: String, var count: Int) {

Each appearing word and the number of appearances will be stored in this data class. Everytime 5 seconds have passed, the application will output the word and how many times it has appeared in that timeframe. Since the data is unbounded, the output to the console will never stop.

WordCount.kt

_77
package com.immerok.cookbook
_77
_77
import org.apache.flink.api.common.eventtime.WatermarkStrategy
_77
import org.apache.flink.api.common.functions.FlatMapFunction
_77
import org.apache.flink.api.common.functions.ReduceFunction
_77
import org.apache.flink.api.common.serialization.SimpleStringSchema
_77
.window(TumblingEventTimeWindows.of(Time.seconds(5)))

The full recipe

This recipe is self-contained. You can run the WordCountTest#testProductionJob test to see the full recipe in action. That test uses an embedded Kafka and Flink instance, so you can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments and tests included in the code for more details.