One of the significant features of Apache Flink is its ability to do stateful processing. The API to store and retrieve state data is simple which makes it a joy to use.

However, behind that API lies a system to manage your data while providing persistence guarantees and that’s what we’ll be understanding in this article.

We’ll be looking at 3 parts of the State Management —

  1. State Backend
  2. Data Format
  3. Persistence and Failure Recovery

Let’s first look at where the state is actually stored.

Where is my data stored?

Flink provides three backend storage for your state out of the box. These are

  1. Memory state backend
  2. File System (FS) state backend
  3. RocksDB state backend

Memory State Backend

This storage persists the data in the memory of each task manager’s Heap. Hence, this makes it extremely fast in access. In spite of this performance, this state should never be used in production jobs. That’s because the state creates a backup of the data (also known as checkpointing) in the job manager memory which puts unnecessary pressure on the job manager’s operational stability.

Another limitation of this backend that the total state size of a task can’t exceed 10MB. It can be configured to a higher limit but is not advised by the authors due to performance consideration.

This is the default backend used by Flink in case nothing is configured.

File System Backend

This backend is similar to Memory state backend except for the fact that it stores the backup on the filesystem rather than job manager memory. The filesystem can be task manager’s local filesystem or a durable store such as HDFS/S3.

This state is also limited by the heap memory and hence should be used for cases when you have fewer data and require high performance.

RocksDB backend

This backend uses RocksDB by Facebook to store the data. If you are not aware of RocksDB, it’s an embeddable key-value store that offers ACID guarantees. It is based on LevelDB by Google but offers much better write performance.

Flink chose to use RocksDB instead of some of the most popular embeddable storage such as SQLlite because of its high write performance which comes from the LSM architecture based design.

Since RocksDB also maintains an in-memory table (also known as mem-table) along with bloom filters, reading recent data also is extremely fast.

Each task manager maintains its own Rocks DB file and the backup of this state is checkpointed to a durable store such as HDFS.

This is the only backend that offers support for incremental checkpointing i.e. taking a backup of only modified data rather than complete data.

If your applications require a large state to store, this should be your choice. However, since it requires disk access and serialization/deserialization, it is comparatively slower than the rest of the backends.

How does the state actually look like?

Let’s look at how the data is actually stored once you create a state in your application.

The storage format differs according to the backend. However, the common part is both the key and the value of the state are stored in byte arrays created using Flink’s own Type serializers.

We’ll be using RocksDB backend for demonstration.

Each task manager has multiple RocksDB folders with each folder a database in itself. Each database contains multiple column families defined by the name given in the state descriptors.

Each column family contain key-value pairs where the key is the operator’s key and value is the state’s data.

As an example. let’s look at the state of this example job

This job contains two stateful functions which are defined as

If you run this job and set Rocksdb as state backend in the flink-conf.yml file, following directories, get generated on every task manager.

drwxr-xr-x   4 abc  74715970   128B Sep 23 03:19 job_127b2b84f80b368b8edfe02b2762d10d_op_KeyedProcessOperator_0d49016af99997646695a030f69aa7ee__1_1__uuid_65b50444-5857-4940-9f8c-77326cc79279/db

drwxr-xr-x   4 abc  74715970   128B Sep 23 03:20 job_127b2b84f80b368b8edfe02b2762d10d_op_StreamFlatMap_11f49afc24b1cce91c7169b1e5140284__1_1__uuid_19b333d3-3278-4e51-93c8-ac6c3608507c/db

Here’s how the directory names are defined

The names are composed of 3 parts

  1. JOB_ID: The random id assigned to your job when the job graph is created.
  2. OPERATOR_ID: This is the combination of Base Class of operator, Murmur3 Hash of operator uid, index of the task and the overall parallelism of the task. e.g. for our StatefulMapTest function, these 4 parts turn out to be
  • StreamFlatMap
  • Murmur3_128(“stateful_map_test”) -> 11f49afc24b1cce91c7169b1e5140284
  • 1, since there can be only a single task in a job with a parallelism of 1 and hence task index is 1
  • 1, since I set the parallelism of 1 while executing the job

3.** UUID**: This is just a random UUID generated while creating the directories.

Each of these directories contains an instance of RocksDB. The file structure of RocksDB will be

-rw-r--r--  1 abc  74715970    21K Sep 23 03:20 000011.sst
-rw-r--r--  1 abc  74715970    21K Sep 23 03:20 000012.sst
-rw-r--r--  1 abc  74715970     0B Sep 23 03:36 000015.log
-rw-r--r--  1 abc  74715970    16B Sep 23 03:36 CURRENT
-rw-r--r--  1 abc  74715970    33B Sep 23 03:18 IDENTITY
-rw-r--r--  1 abc  74715970     0B Sep 23 03:33 LOCK
-rw-r--r--  1 abc  74715970    34K Sep 23 03:36 LOG
-rw-r--r--  1 abc  74715970   339B Sep 23 03:36 MANIFEST-000014
-rw-r--r--  1 abc  74715970    10K Sep 23 03:36 OPTIONS-000017

The .sst files are the SSTable files of the Rocksdb which contain the actual data.

LOG file contains the commit log.

MANIFEST contains metadata such as column families.

OPTIONS contains the configuration used to create RocksDB instance.

Let’s open this DB using RocksDB java API. We’ll take a look at the StatefulMapTest functions directory.

The following code prints all the column family names which are present in the DB. The output of the above piece of code turns out to be

default
previousInt
nextInt

We can also print all the key-value pairs inside each column family. This can be done using below piece of code

In our case, it will print out key-value pairs such as

(testing123, 1423), (testing456, 1212) etc.

TestInputView here is just Flink specific construct which is used to read Byte Array data streams.

Do I need to take a backup of data?

Flink provides persistence for your application state using a mechanism called Checkpointing. It takes a snapshot of the state on periodic intervals and then stores it in a durable store such as HDFS/S3. This allows the Flink application to resume from this backup in case of failures.

Checkpointing is disabled by default for a Flink job. To enable it, you can add the following piece of code to your application

This will configure your application to take a snapshot of your state every 60 seconds and put it to job manager/HDFS/S3 for future recovery. In case of HDFS/S3, the directory used to store the checkpoint can be configured with state.checkpoints.dir in flink-conf.yml.

The final directory structure of a checkpoint looks like

hdfs:///path/to/state.checkpoints.dir/{JOB_ID}/chk-{CHECKPOINT_ID}/

JOB_ID is your application’s unique ID and checkpoint ID is auto-incremental numeric id.

To restore the state from checkpoint at the start of the application, simply run

flink-1.9.0/bin/flink run -s hdfs:///path/to/state.checkpoints.dir/{JOB_ID}/chk-{CHECKPOINT_ID}/ path/to//your/jar

You can extend your stateful functions with Checkpointed Function which provide the ability to modify the state upon initialization and taking the snapshot. The previous StatefulProcess function can be extended to use this interface.

This completes our deep dive into Flink state and now you can be sure that your state is well preserved by the application.

If you want to get started with State processing Apache Flink, these are some useful links to the official docs –

  1. Working with State
  2. State Backends
  3. State Processor API