Skip to main content

Upgrading Flink (Table API)

Visit the compiled-plan recipe on GitHub.

Background

This recipe shows how you can upgrade the Flink version used by a Table API job without losing any state.

The standard technique for upgrading a Flink job to a new version of Flink involves restarting the job from a savepoint:

  • perform a "stop with savepoint" on the running job
  • upgrade to the new version of Flink
  • restart the job from that savepoint

This process assumes that each stateful operator in the job being restarted will be able to find and load its state. Jobs written using the DataStream API have enough low-level control to be able to avoid or cope with potential problems (see the Flink documentation on savepoints for details). But the Table API operates at a higher level of abstraction, and new Flink versions may introduce changes to the SQL planner/optimizer that render the state in a checkpoint or savepoint unrestorable.

This recipe illustrates how to use the compiled plan feature described in FLIP-190: Support Version Upgrades for Table API & SQL Programs and introduced in Flink 1.15 to overcome this problem.

caution

Note, however, that this compiled plan feature is considered experimental.

Using a compiled plan

The goal of FLIP-190 is to handle cases where you want to upgrade to a newer Flink version while continuing to execute the same SQL query. By transforming the query to a compiled plan, you are guaranteed that exactly the same plan (using the same operators and state) will be executed regardless of which version of Flink is executing that plan. This means that you should be able to upgrade to a newer version of Flink without losing any state.

The query used in this example is deduplicating a stream of transactions using the technique recommended in the Flink documentation:

CompiledPlanRecipe.java

_96
package com.immerok.cookbook;
_96
_96
import java.nio.file.Path;
_96
import org.apache.flink.table.api.EnvironmentSettings;
_96
import org.apache.flink.table.api.PlanReference;
_96
import org.apache.flink.table.api.TableEnvironment;
_96
import org.apache.flink.table.api.TableResult;
_96
_96
public class CompiledPlanRecipe {
_96
" ROW_NUMBER() OVER (PARTITION BY t_id ORDER BY t_time ASC) AS row_num",

If you were about to upgrade Flink, you would use code like this to compile the SQL query into a JSON plan:

CompiledPlanRecipe.java

_96
package com.immerok.cookbook;
_96
_96
import java.nio.file.Path;
_96
import org.apache.flink.table.api.EnvironmentSettings;
_96
import org.apache.flink.table.api.PlanReference;
_96
import org.apache.flink.table.api.TableEnvironment;
_96
import org.apache.flink.table.api.TableResult;
_96
_96
public class CompiledPlanRecipe {
_96
_96
static final String TRANSACTION_TOPIC = "transactions";
_96
_96
static final String transactionsDDL =
_96
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();

Using the newer Flink version you could then load and execute the compiled plan:

CompiledPlanRecipe.java

_96
package com.immerok.cookbook;
_96
_96
import java.nio.file.Path;
_96
import org.apache.flink.table.api.EnvironmentSettings;
_96
import org.apache.flink.table.api.PlanReference;
_96
import org.apache.flink.table.api.TableEnvironment;
_96
import org.apache.flink.table.api.TableResult;
_96
_96
public class CompiledPlanRecipe {
_96
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();

If you want to see what the JSON plan looks like, one of the tests prints it out:

CompiledPlanRecipeTest.java

_116
package com.immerok.cookbook;
_116
_116
import static com.immerok.cookbook.CompiledPlanRecipe.TRANSACTION_TOPIC;
_116
import static com.immerok.cookbook.CompiledPlanRecipe.printSinkDDL;
_116
import static com.immerok.cookbook.CompiledPlanRecipe.streamingDeduplication;
_116
import static com.immerok.cookbook.CompiledPlanRecipe.transactionsDDL;
_116
_116
import com.immerok.cookbook.extensions.MiniClusterExtensionFactory;
_116
tableEnv.executeSql(transactionsDDL);

The full recipe

See the code and tests in the Compiled Plan recipe for more details.

You can run these tests directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code. There is no need to download or install Apache Flink or Apache Kafka.