Skip to main content

Continuously reading CSV files

Visit the continuous-file-reading recipe on GitHub.

To get started with your first event processing application, you will need to read data from one or multiple sources.

In this recipe, you are going to continuously read CSV-formatted files from a folder and transform this data into your data model.

This recipe for Apache Flink is a self-contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.

The CSV input data

The recipe will generate one or more comma-separated values (CSV) files in a temporary directory. The files are encoded in UTF-8.


_7
1,Shadowfax,Sauerfort
_7
2,Quickbeam,Tremblayside
_7
3,Galadriel,Latriciamouth
_7
4,Denethor,Koeppshire
_7
5,Théoden,Larsonland
_7
6,Tom Bombadil,Hegmannton
_7
7,Barliman Butterbur,Madelinechester

The data model

You want to consume these records in your Apache Flink application and make them available in the data model. The data model is defined in the following POJO:

Event.java

_63
package com.immerok.cookbook.events;
_63
_63
import java.util.Objects;
_63
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
_63
_63
/**
_63
/** A Flink POJO must have public fields, or getters and setters */

You have to explicitly define the order of fields using the @JsonPropertyOrder.

Event.java

_63
@JsonPropertyOrder({"id", "character", "location"})

Setup a FileSource

You are using the Apache Flink FileSource connector in the application to connect to your local file system. You can use Flink's pluggable file systems to connect to other file systems, such as S3.

You can specify a location by providing the argument --inputFolder.

ContinuousFileReading.java

_51
Path inputFolder = new Path(parameters.getRequired("inputFolder"));

Then you configure the CsvReaderFormat to use the defined POJO to parse the CSV files.

ContinuousFileReading.java

_51
CsvReaderFormat<Event> csvFormat = CsvReaderFormat.forPojo(Event.class);

To complete the setup you configure the FileSource connector with the defined csvFormat and the directory that you want to monitor. You configure the connector to monitor the directory every 5 seconds for any new files. Because you are monitoring this directory continuously, the connector is set to streaming (unbounded) mode.

ContinuousFileReading.java

_51
package com.immerok.cookbook;
_51
_51
FileSource.forRecordStreamFormat(csvFormat, dataDirectory)

The full recipe

This recipe is self-contained. You can run the ContinuousFileReadingTest#testProductionJob class to see the full recipe in action. The test generates csv files into a temporary directory and will output the data to the console. You can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments included in the code for more details.