The most famous checkpoint is very certainly Checkpoint Charlie. As you can see, even back in 1963, the Soviets were already teasing the United States with hype technology.
The most famous checkpoint is very certainly Checkpoint Charlie. As you can see, even back in 1963, the Soviets were already teasing the United States with hype technology. Source Wikipedia.

Let’s understand what can checkpoints do for your Spark dataframes and go through a Java example on how we can use them.

Checkpoint on Dataframe

In v2.1.0, Apache Spark introduced checkpoints on dataframe/dataset – I will continue to use the term of dataframe for a Dataset<Row>. The Javadoc describes it as:

Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the logical plan of this Dataset, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with SparkContext#setCheckpointDir.

However, I think it requires a little more explanation.

What do I Want a Checkpoint?

Basically, I will use a checkpoint if I want to freeze the content of my dataframe before I do something else. It can be in the scenario of iterative algorithms as mentioned in the Javadoc, but also in recursive algorithms or simply branching out a dataframe to run different kind of analytics on both.

Spark has been offering checkpoints on streaming since earlier version (at least v1.2.0), but checkpoint on a dataframe is a different beast.

Types of Checkpoints

You can create 2 kinds of checkpoints.

An eager checkpoint will cut the lineage from previous dataframes and will allow to start “fresh” from this point on. In clear, Spark will dump your dataframe in a file specified by setCheckpointDir() and will start a fresh new dataframe from it. You will also need to wait for completion of the operation.

On the opposite, a non-eager checkpoint will keep the lineage from previous operations in the dataframe.

And the Code is…

Now that we understand what a checkpoint is and how it works, let’s see how we implement that in Java. The code is part of my Apache Spark Java Cookbook on GitHub.

public class DataframeCheckpoint {
	public static void main(String[] args) {
		DataframeCheckpoint app = new DataframeCheckpoint();
		app.start();
	}

	private void start() {
		SparkConf conf = new SparkConf().setAppName("Checkpoint").setMaster("local[*]");
		SparkContext sparkContext = new SparkContext(conf);
		// We need to specify where Spark will save the checkpoint file. It can be an HDFS location.
		sparkContext.setCheckpointDir("/tmp");
		SparkSession spark = SparkSession.builder().appName("Checkpoint").master("local[*]").getOrCreate();

		String filename = "data/tuple-data-file.csv";
		Dataset<Row> df1 = spark.read().format("csv").option("inferSchema", "true").option("header", "false")
				.load(filename);
		System.out.println("DF #1 - step #1: simple dump of the dataframe");
		df1.show();

		System.out.println("DF #2 - step #2: same as DF #1 - step #1");
		Dataset<Row> df2 = df1.checkpoint(false);
		df2.show();

		df1 = df1.withColumn("x", df1.col("_c0"));
		System.out.println("DF #1 - step #2: new column x, which is the same as _c0");
		df1.show();

		System.out.println("DF #2 - step #2: no operation was done on df2");
		df2.show();
	}
}

The execution will be, without much surprise:

DF #1 - step #1: simple dump of the dataframe
+---+---+
|_c0|_c1|
+---+---+
|  1|  5|
|  2| 13|
|  3| 27|
|  4| 39|
|  5| 41|
|  6| 55|
+---+---+

DF #2 - step #2: same as DF #1 - step #1
+---+---+
|_c0|_c1|
+---+---+
|  1|  5|
|  2| 13|
|  3| 27|
|  4| 39|
|  5| 41|
|  6| 55|
+---+---+

DF #1 - step #2: new column x, which is the same as _c0
+---+---+---+
|_c0|_c1|  x|
+---+---+---+
|  1|  5|  1|
|  2| 13|  2|
|  3| 27|  3|
|  4| 39|  4|
|  5| 41|  5|
|  6| 55|  6|
+---+---+---+

DF #2 - step #2: no operation was done on df2
+---+---+
|_c0|_c1|
+---+---+
|  1|  5|
|  2| 13|
|  3| 27|
|  4| 39|
|  5| 41|
|  6| 55|
+---+---+

Although this example is really basic, it explains how to use checkpoint on a dataframe and see the evolution after the dataframe. Hopefully this will be useful to you too. A comment is always appreciated…

Thanks to Burak Yavuz at Databricks for his additional explanations.

6 thoughts on “What are Spark Checkpoints on Dataframes?

  1. Hi. Thank you for your article!
    I actually can’t understand the difference between checkpointing and persist(StorageLevel.DISK_ONLY).
    And another one question what practically mean “cut the lineage” or “keep the lineage” from previous operations? How this lineage can be use? When should or shouldn’t I keep it?
    I will be very grateful for your response.

    1. Hey Jaylla,

      On the lineage part:
      With Spark, the operations are “stacked up” until you do a collect() or similar operation. Then, before doing anything, the optimizer, Catalyst, is analyzing the best way to do it. A good example is: you have a dataframe with column A, you copy this column to B (step 1), then you delete column B (step 2). If you checkpoint after step 1 and cut the lineage, then BOTH operation will occur. If you checkpoint and keep the lineage, nothing will happen as the two operations cancel one another.

    2. On persist(StorageLevel.DISK_ONLY):
      This is a very interesting question. The idea of checkpoint is that you can continue the operations after, and it does not have to finish the operation, before you can go on. I will have to try with larger data set how it impact on performance.

Leave a Reply