Skip to main content

Alerting when problems persist

Visit the pattern-matching-cep recipe on GitHub.

Use case: Alert when a sensor remains hot for too long

In this recipe we are working with this data model for a SensorReading class:

classDiagram class SensorReading SensorReading : long deviceId SensorReading : long temperature SensorReading : Instant timestamp SensorReading : sensorIsHot() SensorReading : sensorIsCool()

We are going to focus on detecting when a temperature sensor continuously reports a temperature that is above some threshold for some pre-determined period of time.

But the same pattern matching logic can be applied to other use cases where streaming analytics are used to trigger alerts. For example, you might want to count login attempts per minute, and raise an alert when that value exceeds some threshold for 3 minutes in a row. Or you might want an alert whenever a geo-fenced asset has reported itself outside the region where it is authorized to be for more than one minute.

Flink includes a Complex Event Processing library (called CEP, for short) that has extensive documentation .

The basic idea is that you define a pattern and apply that pattern to a stream. Every time CEP succeeds in matching that pattern to the stream, CEP calls a callback you've registered for processing the matches.

In this recipe, we want to define a pattern that matches as soon as a sensor has been continuously hot for some period of time.

Here's an initial attempt to define such a pattern:

MatcherV1.java

_45
package com.immerok.cookbook.patterns;
_45
_45
import com.immerok.cookbook.conditions.StillHotLater;
_45
import com.immerok.cookbook.records.SensorReading;
_45
import java.time.Duration;
_45
import java.util.List;
_45
import java.util.Map;
_45
import org.apache.flink.cep.functions.PatternProcessFunction;
_45
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
_45
import org.apache.flink.cep.pattern.Pattern;
_45
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
_45
import org.apache.flink.util.Collector;
_45
.where(new StillHotLater("first-hot-reading", limitOfHeatTolerance));

What you are seeing above is a pattern sequence composed of two singleton patterns (each matching only one event) combined into a sequence with followedBy.

The first singleton pattern is named first-hot-reading, and it matches any sensor reading satisfying the condition reading.sensorIsHot().

The second singleton pattern (still-hot-later) is more complex. It uses an IterativeCondition to match sensor readings that are hot and at least limitOfHeatTolerance seconds after the first-hot-reading:

StillHotLater.java

_30
package com.immerok.cookbook.conditions;
_30
_30
import com.immerok.cookbook.records.SensorReading;
_30
import java.time.Duration;
_30
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
_30
_30
public class StillHotLater extends IterativeCondition<SensorReading> {
_30
_30
private final String nameOfInitialHotPattern;
_30
private final Duration limitOfHeatTolerance;
_30
_30
public StillHotLater(String nameOfInitialHotPattern, Duration limitOfHeatTolerance) {
_30
this.nameOfInitialHotPattern = nameOfInitialHotPattern;
_30
this.limitOfHeatTolerance = limitOfHeatTolerance;
_30
}
_30
_30
@Override
_30
public boolean filter(SensorReading thisReading, Context<SensorReading> context)
_30
throws Exception {
_30
_30
boolean sensorIsHotNow = thisReading.sensorIsHot();
_30
_30
SensorReading firstHotReading =
_30
Duration interval = Duration.between(firstHotReading.timestamp, thisReading.timestamp);

This being an IterativeCondition is what allows us to look back at the sensor reading that matched the first-hot-reading part of the pattern sequence and use it to measure the time elapsed between that initial sensor reading and the one being currently considered as a possible match for the still-hot-later condition.

The preliminary solution shown above isn't good enough. While it does successfully find situation where there are two hot sensor readings far enough apart, it doesn't correctly handle cases where the sensor has cooled off in-between those two readings.

Behavior of MatcherV1

Doing a better job

The pattern defined by MatcherV2 overcomes the problem described above by inserting an additional condition that requires that the sensor not become cool in-between the first-hot-reading and the still-hot-later reading.

MatcherV2.java

_53
package com.immerok.cookbook.patterns;
_53
_53
import com.immerok.cookbook.conditions.StillHotLater;
_53
import com.immerok.cookbook.records.SensorReading;
_53
import java.time.Duration;
_53
import java.util.List;
_53
import java.util.Map;
_53
import org.apache.flink.cep.functions.PatternProcessFunction;
_53
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
_53
import org.apache.flink.cep.pattern.Pattern;
_53
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
_53
import org.apache.flink.util.Collector;
_53
_53
public class MatcherV2 implements PatternMatcher<SensorReading, SensorReading> {
_53
_53
public Pattern<SensorReading, ?> pattern(Duration limitOfHeatTolerance) {
_53
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
_53
_53
return Pattern.<SensorReading>begin("first-hot-reading", skipStrategy)
_53
.where(
_53
.where(new StillHotLater("first-hot-reading", limitOfHeatTolerance));

However, if your objective is to generate an alert when the pattern matches, you might be dissatisfied with MatcherV2 because it won't just match once, but will match again and again until the temperature drops below the threshold:

Behavior of MatcherV2

Doing an even better job

If you only want to generate one alert from each hot streak, then the solution shown above isn't good enough. It could be addressed by extending the still-hot-later part of the pattern to continue matching until the temperature is no longer hot — but then the single alert would not be emitted until after the hot streak was over — and that's unacceptable (assuming you want real-time alerting).

MatcherV3 overcomes these problems, and also illustrates a somewhat different approach:

MatcherV3.java

_54
package com.immerok.cookbook.patterns;
_54
_54
import com.immerok.cookbook.conditions.StillHotLater;
_54
import com.immerok.cookbook.records.SensorReading;
_54
import java.time.Duration;
_54
import java.util.List;
_54
import java.util.Map;
_54
import org.apache.flink.cep.functions.PatternProcessFunction;
_54
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
_54
import org.apache.flink.cep.pattern.Pattern;
_54
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
_54
import org.apache.flink.util.Collector;
_54
_54
public class MatcherV3 implements PatternMatcher<SensorReading, SensorReading> {
_54
_54
public Pattern<SensorReading, ?> pattern(Duration limitOfHeatTolerance) {
_54
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
_54
_54
return Pattern.<SensorReading>begin("starts-cool", skipStrategy)
_54
.where(
_54
new SimpleCondition<>() {
_54
@Override
_54
public boolean filter(SensorReading reading) {
_54
return reading.sensorIsCool();
_54
}

With this solution we have

  • prevented a continuously hot sensor from matching more than once by requiring that each match begin when the sensor is cool
  • used a looping pattern (rather than a singleton pattern in combination with notFollowedBy) to capture the sequence of continuously hot readings
  • used next rather than followedBy to connect the individual patterns

The difference between next and followedBy is that next only matches if there are no events in-between the two simple patterns being connected, whereas followedBy can skip over events that aren't matched by either of those patterns.

This is the behavior of MatcherV3:

Behavior of MatcherV3

The full recipe

This recipe is self contained. You can run the ProductionJobTests to see the full recipe in action with any of these matchers. Those tests use an embedded Apache Kafka and Apache Flink setup, so you can run them directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the code and tests included in the Github repo for more details.