Skip to main content

Migrating state away from Kryo

Visit the kryo-migration recipe on GitHub.

Background

Data is one of the cornerstones of Flink applications. Almost all Apache Flink applications execute business logic that requires to know what has happened in the past or access to intermediate results. The data is modeled in a data model. Over time, use-cases and business requirements evolve, and so must the data model.

To that end Flink introduced schema evolution, allowing users who use POJO or Avro types to change their data model.

Users mostly are taking great care to ensure they use POJOs everywhere; unfortunately, that in and of itself isn't sufficient.

For example, have a look at this type (the structure of Pojo2 is irrelevant):

Pojo1.java

_3
public class Pojo1 {
_3
public List<Pojo2> pojos;
_3
}

Can Pojo1 evolve? Of course, it is a POJO after all!

Can Pojo2 evolve? No!

This is because the List<Pojo2> will be serialized with Kryo, which does not support schema evolution.

Manifesting itself in many different forms, be it as Maps, Optionals, Scala collections, etc., the common theme is that they are all cases of non-POJOs containing POJOs.

This issue has a tendency to go unnoticed until schema evolution is attempted (in part because in certain cases Flink does not inform the user that Kryo is used!), and once found, users face the challenge of having to migrate their state away from Kryo.

In this recipe you are going to migrate a value state containing a POJO that was partially serialized with Kryo to another serializer using the State Processor API.

The application

For the purposes of this recipe, there is an application that stores Events in a value state. It generates a stream of Events, keys the stream by userId, and passes the data into a LatestEventFunction.

Job.java

_57
package com.immerok.cookbook;
_57
_57
import com.immerok.cookbook.functions.LatestEventFunction;
_57
import com.immerok.cookbook.records.Event;
_57
import com.immerok.cookbook.records.SubEvent;
_57
import java.util.ArrayList;
_57
import java.util.List;
_57
env.fromSequence(0, 1_000_000_000).map(new EventGenerator());

The LatestEventFunction stores the latest received element in state.

Job.java

_39
package com.immerok.cookbook.functions;
_39
_39
import com.immerok.cookbook.records.Event;
_39
import org.apache.flink.api.common.state.ValueState;
_39
import org.apache.flink.api.common.state.ValueStateDescriptor;
_39
import org.apache.flink.api.common.typeinfo.TypeInformation;
_39
import org.apache.flink.configuration.Configuration;
_39
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
_39
import org.apache.flink.util.Collector;
_39
_39
/**
_39
* A {@link KeyedProcessFunction} that stores the last received {@link Event} in a {@link
_39
* ValueState}.
_39
*
_39
* <p>This class isn't useful for any practical applications; its only purpose is to be a stateful
_39
* operation.
_39
*/
_39
return new ValueStateDescriptor<>("latest event", TypeInformation.of(Event.class));

note

The application and operator don't have any practical application; they only exist for demonstration purposes.

The problem

Our state contains Events, which is a POJO containing a long user ID and a List of SubEvents.

Event.java

_2
public long userId;
_2
public List<SubEvent> subEvents;

SubEvent is another POJO:

SubEvent.java

_33
package com.immerok.cookbook.records;
_33
_33
/**
_33
* The POJO for which schema evolution is blocked by Kryo because it's contained in a list within
_33
* {@link Event}.
_33
*/
_33
public class SubEvent {
_33
_33
/** A Flink POJO must have public fields, or getters and setters */

Both of these classes are proper POJOs. They are serialized with the PojoSerializer and on their own would support schema evolution.

However, the List throws a wrench into the whole affair, because it, and its contents, will be serialized with Kryo. This happens because List itself is neither a POJO nor another type that Flink natively supports. With Kryo not supporting schema evolution you now end up in a strange situation where you can't evolve a POJO.

info
Why "the PojoSerializer supports schema evolution" is misleading:

Serializers for structured types (like POJOs, Tuples, Collections) are composed of several serializers, one for each of the contained fields. Each of those serializers individually controls whether it supports schema evolution or further serializer nesting for the field it is responsible for.

For example, let's take the Event class. The POJO serializer for this class contains 2 serializers: one each for the userId/subEvents fields. The schema evolution support that these POJO serializers provide is limited to the top-level structure of the POJO; you can add/remove fields, but you aren't necessarily able to change SubEvent because that is handled by another serializer.

When Kryo is used for the subEvents field then you can't evolve the SubEvent class, because Kryo does not support schema evolution, and it serializes both the list and its contents, never deferring the serialization of the SubEvent class to another (POJO) serializer. Meanwhile, the ListSerializer does rely on other serializers, and in this case will use the PojoSerializer internally for the SubEvents, allowing us to evolve the type.

Migration

Taking control

The first thing you do is take control over which serializer is used for the List, using the @TypeInfo annotation.

Event.java

_34
package com.immerok.cookbook.records;
_34
@TypeInfo(SubEventListTypeInfoFactory.class)

This annotation allows you to supply a TypeInfoFactory, which Flink will call when it encounters the annotated field during type extraction. You can then return a TypeInformation of your choosing, which in the end provides the serializer for the annotated field.

The factory needs 2 code paths, so that during migration the state will be read using Kryo, but written with the new serializer. For this recipe you will leverage Flink's ListTypeInfo for brevity, but you could also implement a fully-custom TypeInformation and TypeSerializer.

note

Flink does have a built-in serializer for lists, but its not used by default. Changing this would break existing states that used Kryo for lists.

SubEventListTypeInfoFactory.java

_66
/*
_66
* Licensed to the Apache Software Foundation (ASF) under one or more
_66
* contributor license agreements. See the NOTICE file distributed with
_66
* this work for additional information regarding copyright ownership.
_66
return new ListTypeInfo<>(TypeInformation.of(SubEvent.class));

To control which code path is used you unfortunately have to do some static shenanigans, because the access to the factory happens in the background, not allowing you to directly provide an appropriately configured factory. You will see later how this is used.

SubEventListTypeInfoFactory.java

_66
/*
_66
* Licensed to the Apache Software Foundation (ASF) under one or more
_66
* contributor license agreements. See the NOTICE file distributed with
_66
* this work for additional information regarding copyright ownership.
_66
* The ASF licenses this file to You under the Apache License, Version 2.0
_66
* (the "License"); you may not use this file except in compliance with
_66
* the License. You may obtain a copy of the License at
_66
*
_66
* http://www.apache.org/licenses/LICENSE-2.0
_66
*
_66
* Unless required by applicable law or agreed to in writing, software
_66
* distributed under the License is distributed on an "AS IS" BASIS,
_66
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
_66
* See the License for the specific language governing permissions and
_66
* limitations under the License.
_66
*/
_66
_66
package com.immerok.cookbook.records;
_66
_66
import java.lang.reflect.Type;
_66
import java.util.List;
_66
import java.util.Map;
_66
public static AutoCloseable temporarilyEnableKryoPath() {

Rewriting state

The State Processor API is a powerful tool that allows you to create and modify savepoints.

The API allows you to treat existing state as a source or sink; you write functions that extract data from state, which is passed to another set of functions that write it into state.

You will use it to read a particular state with the Kryo-infected POJO serializer, and create a new savepoint containing the same state but serialized with a POJO serializer that leverages the ListSerializer instead.

Reading state

To extract the state you will use the SavepointReader API.

Given the path to the savepoint you create a SavepointReader
and use readKeyedState() to setup the extraction (because value state is a keyed state!),
providing the UID of the operator whose state you want to read,
a reader function that extracts the state,
the type information of the key,
and the type information of the state.

Migration.java

_95
/*
_95
* Licensed to the Apache Software Foundation (ASF) under one or more
_95
* contributor license agreements. See the NOTICE file distributed with
_95
* this work for additional information regarding copyright ownership.
_95
* The ASF licenses this file to You under the Apache License, Version 2.0
_95
* (the "License"); you may not use this file except in compliance with
_95
* the License. You may obtain a copy of the License at
_95
*
_95
* http://www.apache.org/licenses/LICENSE-2.0
_95
new SimpleValueStateReaderFunction<>(LatestEventFunction.createStateDescriptor()),

The reader function emits the value held in state:

SimpleValueStateReaderFunction.java

_47
/*
_47
* Licensed to the Apache Software Foundation (ASF) under one or more
_47
* contributor license agreements. See the NOTICE file distributed with
_47
* this work for additional information regarding copyright ownership.
_47
* The ASF licenses this file to You under the Apache License, Version 2.0
_47
* (the "License"); you may not use this file except in compliance with
_47
* the License. You may obtain a copy of the License at
_47
*
_47
* http://www.apache.org/licenses/LICENSE-2.0
_47
*
_47
* Unless required by applicable law or agreed to in writing, software
_47
* distributed under the License is distributed on an "AS IS" BASIS,
_47
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
_47
* See the License for the specific language governing permissions and
_47
* limitations under the License.
_47
*/
_47
_47
package com.immerok.cookbook.utils;
_47
public class SimpleValueStateReaderFunction<K, T> extends KeyedStateReaderFunction<K, T> {

Writing state

To write the state you will use the SavepointWriter API.

You create a SavepointWriter using fromExistingSavepoint(),
define a transformation that uses the previously extracted state as its input,
key the stream using the key selector from the application,
apply a bootstrap function to write the data into state,
add the transformation to the SavepointWriter,
and finalize the preparation by providing the path to which the savepoint should be written.

Migration.java

_95
/*
_95
* Licensed to the Apache Software Foundation (ASF) under one or more
_95
* contributor license agreements. See the NOTICE file distributed with
_95
* this work for additional information regarding copyright ownership.
_95
* The ASF licenses this file to You under the Apache License, Version 2.0
_95
* (the "License"); you may not use this file except in compliance with
_95
* the License. You may obtain a copy of the License at
_95
*
_95
* http://www.apache.org/licenses/LICENSE-2.0
_95
*
_95
* Unless required by applicable law or agreed to in writing, software
_95
* distributed under the License is distributed on an "AS IS" BASIS,
_95
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
_95
* See the License for the specific language governing permissions and
_95
* limitations under the License.
_95
*/
_95
_95
package com.immerok.cookbook;
_95
_95
import com.immerok.cookbook.functions.LatestEventFunction;
_95
LatestEventFunction.createStateDescriptor()));

The bootstrap function writes every received value into state:

SimpleValueStateBootstrapFunction.java

_49
/*
_49
* Licensed to the Apache Software Foundation (ASF) under one or more
_49
* contributor license agreements. See the NOTICE file distributed with
_49
* this work for additional information regarding copyright ownership.
_49
* The ASF licenses this file to You under the Apache License, Version 2.0
_49
* (the "License"); you may not use this file except in compliance with
_49
* the License. You may obtain a copy of the License at
_49
*
_49
* http://www.apache.org/licenses/LICENSE-2.0
_49
*
_49
* Unless required by applicable law or agreed to in writing, software
_49
* distributed under the License is distributed on an "AS IS" BASIS,
_49
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
_49
* See the License for the specific language governing permissions and
_49
* limitations under the License.
_49
*/
_49
_49
package com.immerok.cookbook.utils;
_49
_49
import org.apache.flink.api.common.state.ValueState;
_49
public class SimpleValueStateBootstrapFunction<K, T> extends KeyedStateBootstrapFunction<K, T> {

info

SavepointWriter#fromExistingSavepoint() automatically determines the maxParallelism and state backend from the savepoint, while also forwarding the states of all operators that you don't explicitly process. This is perfect for this recipe because you only want to change the serializer of a particular state, without changing the statebackend or maxParallelism.

caution

If an operator contains multiple states, like 2 value states, then the reader/bootstrap functions for that operator must extract and write both states, even if you only want to modify one of them.

tip

Wrapping all extracted state values in a Tuple is a good way to implement this.

Putting it together

Now that you can read and write state you use the two methods to define a migration function.

To control which serializer is used for reading the state, you use SubEventListTypeInfoFactory.temporarilyEnableKryoPath() to enable Kryo when reading state.
Outside of this try-with-resources statement the list serializer will be used instead.

Migration.java

_95
/*
_95
* Licensed to the Apache Software Foundation (ASF) under one or more
_95
* contributor license agreements. See the NOTICE file distributed with
_95
* this work for additional information regarding copyright ownership.
_95
* The ASF licenses this file to You under the Apache License, Version 2.0
_95
* (the "License"); you may not use this file except in compliance with
_95
* the License. You may obtain a copy of the License at
_95
*
_95
* http://www.apache.org/licenses/LICENSE-2.0
_95
*
_95
* Unless required by applicable law or agreed to in writing, software
_95
* distributed under the License is distributed on an "AS IS" BASIS,
_95
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
_95
* See the License for the specific language governing permissions and
_95
* limitations under the License.
_95
static void migrateSavepoint(final String sourceSavepointPath, final String targetSavepointPath)

Post-migration

After the migration is complete you keep the @TypeInfo annotation in the Event, to ensure the list serializer continues to be used. The type info factory can be cleaned up however, and should look like this:

SubEventListTypeInfoFactory.java

_43
/*
_43
* Licensed to the Apache Software Foundation (ASF) under one or more
_43
* contributor license agreements. See the NOTICE file distributed with
_43
* this work for additional information regarding copyright ownership.
_43
* The ASF licenses this file to You under the Apache License, Version 2.0
_43
* (the "License"); you may not use this file except in compliance with
_43
* the License. You may obtain a copy of the License at
_43
public class PostMigrationSubEventListTypeInfoFactory extends TypeInfoFactory<List<SubEvent>> {

The full recipe

This recipe is self-contained. Follow the instructions in the MigrationTest javadocs to see the recipe in action. That test uses an embedded Apache Flink setup, so you can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments and tests included in the code for more details.