Here are a few quick recipes to solve some common issues with Apache Spark. All examples are based on Java 8 (although I do not use consciously any of the version 8 features) and Spark v1.6.2 and Spark v2.0.0. The examples (and more) are clonable from GitHub : https://github.com/jgperrin/net.jgp.labs.spark.git.

Build a DataFrame from a Text File

Spark v2.0.0

package net.jgp.labs.spark;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class TextFileToDataFrame {

	public static void main(String[] args) {
		System.out.println("Working directory = " + System.getProperty("user.dir"));
		TextFileToDataFrame app = new TextFileToDataFrame();
		app.start();
	}

	private void start() {
		SparkSession spark = SparkSession.builder()
				.appName("DataFrame from Text File")
				.master("local[*]")
				.getOrCreate();

		String filename = "data/simple-data-file.txt";
		Dataset<Row> df = spark.read().text(filename);
		df.show();
	}
}

Spark v1.6.2

package net.jgp.labs.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class FirstTextFile {

	public static void main(String[] args) {
		System.out.println("Working directory = " + System.getProperty("user.dir"));
		FirstTextFile app = new FirstTextFile();
		app.start();
	}

	private void start() {
		SparkConf conf = new SparkConf().setAppName("DataFrame from Text File").setMaster("local");
		SparkContext sc = new SparkContext(conf);
		SQLContext sqlContext = new SQLContext(sc);

		String filename = "data/simple-data-file.txt";
		DataFrame df = sqlContext.read().text(filename);
		df.show();
	}
}

Line 11 dumps the current directory, always useful if you lose your files (it never happens to me).

Output

And the output is:

Working Directory = /Users/jgp/git/net.jgp.labs.spark
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
+-----+

Given that data/simple-data-file.txt contains simply a list of integers from 1 to 7.

Connect Locally

Spark v2.0.0

package net.jgp.labs.spark;

import org.apache.spark.sql.SparkSession;

public class ConnectLocally {

	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder().appName("Hello Spark").master("local").getOrCreate();
		System.out.println("Hello, Spark v." + spark.version());
	}

}

The output is:

Hello, Spark v.2.0.0

Spark v1.6.2

package net.jgp.labs.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;

public class HelloSpark {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("Hello Spark").setMaster("local");
		SparkContext sc = new SparkContext(conf); 
		System.out.println("Hello, Spark v." + sc.version());
	}

}

Connect Remotely

Example on how to connect to a remote Spark server/cluster to see if it is live.

Spark v2.0.0

package net.jgp.labs.spark;

import org.apache.spark.sql.SparkSession;

public class ConnectRemotely {

	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder().appName("myApp").master("spark://10.0.100.120:7077").getOrCreate();
		System.out.println("Hello, Spark v." + spark.version());
	}
}

Spark v1.6.2

package net.jgp.labs.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;

public class ConnectRemotely {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("myApp").setMaster("spark://10.0.100.120:7077");
		SparkContext sc = new SparkContext(conf);
		System.out.println("Hello, Remote Spark v." + sc.version());
	}
}

Reading a CSV file in a Dataset

Read a CSV file composed of tuples and add them to a Dataset. This example is using the new Apache Spark v2.0.0 feature for reading CSV.

Spark v2.0.0

package net.jgp.labs.spark;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class CsvToDataset {

	public static void main(String[] args) {
		System.out.println("Working directory = " + System.getProperty("user.dir"));
		CsvToDataset app = new CsvToDataset();
		app.start();
	}

	private void start() {
		SparkSession spark = SparkSession.builder().appName("CSV to Dataset").master("local").getOrCreate();

		String filename = "data/tuple-data-file.csv";
		Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
				.option("header", "false").load(filename);
		df.show();
	}
}

Output

+---+---+
|_c0|_c1|
+---+---+
|  1| 15|
|  2| 25|
|  3| 35|
|  4| 45|
|  5| 55|
|  6| 65|
|  7| 75|
+---+---+

Migration from Databricks CSV parser

Note: using the Databricks version of the CSV parser, columns would be named C0, C1, and so on. Now, using the standard CSV parser, columns are called _c0, _c1…

You can add the following code before df.show() to turn your v2.0.0 code to be compatible with your v1.6.2.

		int count = df.columns().length;
		for (int i = 0; i < count; i++) {
			String oldColName = "_c" + i;
			String newColName = "C" + i;
			df = df.withColumn(newColName, df.col(oldColName)).drop(oldColName);
		}

Output:

+---+---+
| C0| C1|
+---+---+
|  1| 15|
|  2| 25|
|  3| 35|
|  4| 45|
|  5| 55|
|  6| 65|
|  7| 75|
+---+---+

Register and Call an Internal UDF (User Defined Function)

This basic UDF multiplies the value of an integer column by 2.

Spark v2.0.0

package net.jgp.labs.spark;

import static org.apache.spark.sql.functions.callUDF;

import java.io.Serializable;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

public class BasicUdfFromTextFile implements Serializable {
	private static final long serialVersionUID = 3492970200940899011L;

	public static void main(String[] args) {
		System.out.println("Working directory = " + System.getProperty("user.dir"));
		BasicUdfFromTextFile app = new BasicUdfFromTextFile();
		app.start();
	}

	private void start() {
		SparkSession spark = SparkSession.builder().appName("CSV to Dataset").master("local").getOrCreate();

		spark.udf().register("x2Multiplier", new UDF1<Integer, Integer>() {
			private static final long serialVersionUID = -5372447039252716846L;

			@Override
			public Integer call(Integer x) {
				return x * 2;
			}
		}, DataTypes.IntegerType);

		String filename = "data/tuple-data-file.csv";
		Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
				.option("header", "false").load(filename);
		df = df.withColumn("label", df.col("_c0")).drop("_c0");
		df = df.withColumn("value", df.col("_c1")).drop("_c1");
		df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType)));
		df.show();
	}
}

Spark v1.6.2

package net.jgp.labs.spark;

import static org.apache.spark.sql.functions.callUDF;

import java.io.Serializable;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

public class BasicUdfFromTextFile implements Serializable {
	private static final long serialVersionUID = 3492970200940899011L;

	public static void main(String[] args) {
		BasicUdfFromTextFile app = new BasicUdfFromTextFile();
		app.start();
	}

	private void start() {
		SparkConf conf = new SparkConf().setAppName("Basic UDF from Text File").setMaster("local");
		SparkContext sc = new SparkContext(conf);
		SQLContext sqlContext = new SQLContext(sc);

		sqlContext.udf().register("x2Multiplier", new UDF1&amp;lt;Integer, Integer&amp;gt;() {
			private static final long serialVersionUID = -5372447039252716846L;

			@Override
			public Integer call(Integer x) {
				return x * 2;
			}
		}, DataTypes.IntegerType);

		String filename = "data/tuple-data-file.csv";
		DataFrame df = sqlContext.read().format("com.databricks.spark.csv").option("inferSchema", "true")
				.option("header", "false").load(filename);
		df = df.withColumn("label", df.col("C0")).drop("C0");
		df = df.withColumn("value", df.col("C1")).drop("C1");
		df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType)));
		df.show();
	}
}

Output

+-----+-----+---+
|label|value| x2|
+-----+-----+---+
|    1|   15| 30|
|    2|   25| 50|
|    3|   35| 70|
|    4|   45| 90|
|    5|   55|110|
|    6|   65|130|
|    7|   75|150|
+-----+-----+---+

Register and Call an External UDF (User Defined Function)

This basic UDF multiplies the value of an integer column by 2. This example uses 2 files.

Spark v2.0.0

package net.jgp.labs.spark;

import static org.apache.spark.sql.functions.callUDF;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;

import net.jgp.labs.spark.udf.Multiplier2;

public class BasicExternalUdfFromTextFile {

	public static void main(String[] args) {
		System.out.println("Working directory = " + System.getProperty("user.dir"));
		BasicExternalUdfFromTextFile app = new BasicExternalUdfFromTextFile();
		app.start();
	}

	private void start() {
		SparkSession spark = SparkSession.builder().appName("CSV to Dataset").master("local").getOrCreate();

		spark.udf().register("x2Multiplier", new Multiplier2(), DataTypes.IntegerType);

		String filename = "data/tuple-data-file.csv";
		Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
				.option("header", "false").load(filename);
		df = df.withColumn("label", df.col("_c0")).drop("_c0");
		df = df.withColumn("value", df.col("_c1")).drop("_c1");
		df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType)));
		df.show();
	}
}

Spark v1.6.2

package net.jgp.labs.spark;

import static org.apache.spark.sql.functions.callUDF;

import java.io.Serializable;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

import net.jgp.labs.spark.udf.Multiplier2;

public class BasicExternalUdfFromTextFile {

	public static void main(String[] args) {
		System.out.println("Working directory = " + System.getProperty("user.dir"));
		BasicExternalUdfFromTextFile app = new BasicExternalUdfFromTextFile();
		app.start();
	}

	private void start() {
		SparkConf conf = new SparkConf().setAppName("Basic UDF from Text File").setMaster("local");
		SparkContext sc = new SparkContext(conf);
		SQLContext sqlContext = new SQLContext(sc);

		sqlContext.udf().register("x2Multiplier", new Multiplier2(), DataTypes.IntegerType);

		String filename = "data/tuple-data-file.csv";
		DataFrame df = sqlContext.read().format("com.databricks.spark.csv").option("inferSchema", "true")
				.option("header", "false").load(filename);
		df = df.withColumn("label", df.col("C0")).drop("C0");
		df = df.withColumn("value", df.col("C1")).drop("C1");
		df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType)));
		df.show();
	}
}

UDF Code

The UDF (User Defined Function) code is the same for both Apache Spark v1.6.2 and v2.0.0.

package net.jgp.labs.spark.udf;

import org.apache.spark.sql.api.java.UDF1;

public class Multiplier2 implements UDF1<Integer, Integer> {

	private static final long serialVersionUID = -4519338105113996424L;

	@Override
	public Integer call(Integer t1) throws Exception {
		return t1 * 2;
	}
}

NC schools by school district

This example downloads a JSON file from the Open Data portal of Durham and starts some analysis.

You will need:

  • The net.jgp.commons.download.DownloadManager class, which you can get from GitHub at https://github.com/jgperrin/net.jgp.commons.download.git.
  • A /Pool directory, which you can easily replace by /tmp if you want.
/**
 * NC schools by school district analysis.
 */
package net.jgp.labs.spark;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import net.jgp.commons.download.DownloadManager;

public class ListNCSchoolDistricts {

	public static void main(String[] args) {
		String filename = DownloadManager.getFilename(
				"https://opendurham.nc.gov/explore/dataset/north-carolina-school-performance-data/download/?format=json&timezone=America/New_York");
		System.out.println("File " + filename + " downloaded");

		SparkSession spark = SparkSession.builder().appName("NC Schools").master("local").getOrCreate();

		String fileToAnalyze = "/Pool/" + filename;
		System.out.println("File to analyze: " + fileToAnalyze);

		Dataset<Row> df;
		df = spark.read().option("dateFormat", "yyyy-mm-dd").json(fileToAnalyze);
		df = df.withColumn("district", df.col("fields.district"));
		df = df.groupBy("district").count().orderBy(df.col("district"));
		df.show(150, false);
	}
}

Partial output:

File 644ae7209488a5a34f6ecf6c6aa27178.dl downloaded
File to analyze: /Pool/644ae7209488a5a34f6ecf6c6aa27178.dl
+------------------------------+-----+
|district                      |count|
+------------------------------+-----+
|Alamance-Burlington Schools   |34   |
|Alexander County Schools      |10   |
|Alleghany County Schools      |4    |
|Anson County Schools          |10   |
|Ashe County Schools           |5    |
|Asheboro City Schools         |8    |
|Asheville City Schools        |8    |
|Avery County Schools          |10   |
|Beaufort County Schools       |12   |
|Bertie County Schools         |8    |
|Bladen County Schools         |13   |
|Brunswick County Schools      |18   |
|Buncombe County Schools       |39   |
|Burke County Schools          |25   |
|Cabarrus County Schools       |36   |
|Caldwell County Schools       |24   |
|Camden County Schools         |5    |
|Carteret County Public Schools|16   |
|Caswell County Schools        |6    |
|Catawba County Schools        |27   |
|Chapel Hill-Carrboro Schools  |18   |
...

More to Come

More are on their way, use the comments if you have a specific one in mind and I’ll see if I can add it.

Updates

  • 2016-07-22: First version.
  • 2016-07-25: Added 2 UDF examples.
  • 2016-07-29: Migration to Spark v2.0.0 started.
  • 2016-07-29: NC School example added, parsing JSON into a Dataset.

Comments are closed.