Apache Flink is one of the most versatile data streaming open-source solution that exists. It supports all the primary functions of a typical batch processing system such as SQL, Connectors to Hive, Group By, etc. while providing fault-tolerance and exactly-once semantics. Hence, you can create a multitude of push-based applications using it.

However, one of the significant drawbacks of the Apache Flink has been the inability to modify the checkpointed state of the program. Let’s first see what I mean by that.

Checkpointing

Flink provides fault-tolerance by using a mechanism called checkpointing. It periodically takes a snapshot of all the stateful operators/functions of your programs and stores them in a highly durable store such as HDFS.

Checkpointing allows the Flink program to resume from this snapshot. This is helpful in the cases of failures due to some error such as a simple exception not handled or a loss of data node in your YARN/Mesos/k8s cluster.

This snapshot is stored in a binary format only understood by Flink, which makes it difficult to modify the state before restart.

Why would you need to modify the data?

There can be multiple cases where you might need only partial data from the checkpoint and may want to update the other. An example job being

  • Reading numerical data from one Kafka topic
  • Aggregate over a window of 1 hour
  • Classify using some statistical thresholds provided by config stored in your operator’s state.

Suppose, your job gets killed, and now you want to restart it using a checkpoint, but you need to modify the config as well. Earlier, there was no way to do so other than to wait for the job to start with old config and than overwrite it using a stream from Kafka or filesystem.

Now, however, you can easily do that using the new API. Let’s modify the above example to do so.

Bootstrapping the state

Following are the necessary steps required to bootstrap your state

Add Dependency

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-state-processor-api_2.11</artifactId>
    <version>1.9.0/version>
</dependency>

This is not included in the default Flink dependency and needs to be added separately in pom.xml file.

Create a Bootstrap function

This function tells Flink what state to update when it receives the data. In this example, we are updating the threshold state using the TestConfig data we collect.

Flow the config data

Now, you need to flow the config data. The Flink state processor API works seamlessly with Dataset API. It doesn’t imply you can’t use bootstrapping in a Stream environment. It’s just that the data for bootstrapping can only be loaded using Batch API. You can create both Batch and Stream environment in a single job.

Here, I have just created a single config object and then created a Dataset on top of it. We then create a transformation. It specifies what dataset to use with Bootstrap Function.

Update the save point

Next, we load the savepoint from the old directory and then update the states of the operators. To update the state, we need to specify the UID of the operator in the streaming job and the transformation created in the step.

Once that is done, we can rewrite this modified savepoint in a new path. Do note that the new path contains the shallow copies of pointers from the old path. It means, deleting the old savepoint path will corrupt the new one, and hence you should refrain from doing so.

Now, you can resume your Flink job using this new savepoint path.

bin/flink run -s newSavepointPath test-checkpoint.jar

You can even create a new Savepoint instead of updating the old one. For that, you need to do Savepoint.create() instead of Savepoint.load()

Flink’s State Processor API was one of the most requested features and now it’s finally here. The API is available only in 1.9.0 and above versions.

You can explore the whole API in the official documentation.