Deploying Java job in standalone Spark deployment

Dhruv Saksena
5 min readDec 25, 2021

In this section, we will try to deploy a Spark job to a local Spark deployment using Java.

Following are the code snippets-

First, we will add the required libraries in classpath using maven-

Following will be the structure of pom.xml-

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">   <modelVersion>4.0.0</modelVersion>      <groupId>com.example.spark</groupId>
<artifactId>spark-example</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8
</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>. <maven.compiler.target>11</maven.compiler.target>
</properties><dependencyManagement>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
</project>

Now, we will write the main program-

package com.example.client;import java.util.Arrays;
import java.util.List;
import org.apache.commons.collections.ArrayStack;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.SparkSession;
public class App
{
public static void main(String[] args)
{
SparkConf conf = new SparkConf();
conf.setAppName("Spark MultipleContest Test");
conf.set("spark.driver.allowMultipleContexts", "true");
conf.setMaster("local");

SparkContext sc = new SparkContext(conf);
JavaSparkContext context = JavaSparkContext.fromSparkContext(sc);

List<Integer> numList = Arrays.asList(1, 4, 6, 8, 4, 7, 9, 10, 11, 15, 17, 19, 21, 5, 3,9);
JavaRDD<Integer> javaRDD = context.parallelize(numList, 3);
javaRDD.foreach(new VoidFunction<Integer>() {

@Override
public void call(Integer value) throws Exception
{
System.out.println("Java RDD: " + value);
}
} );

try
{
Thread.sleep(100000);
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}

context.stop();

}
}

Once you submit this job, following will be the output in console. As you can see below, it’s executing three tasks in parallel-

21/12/25 19:33:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7900 bytes)21/12/25 19:33:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)Java RDD: 1
Java RDD: 4
Java RDD: 6
Java RDD: 8
Java RDD: 4
21/12/25 19:33:03 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 751 bytes result sent to driver21/12/25 19:33:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7905 bytes)21/12/25 19:33:03 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)21/12/25 19:33:03 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 77 ms on localhost (executor driver) (1/3)Java RDD: 7
Java RDD: 9
Java RDD: 10
Java RDD: 11
Java RDD: 15
21/12/25 19:33:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 751 bytes result sent to driver21/12/25 19:33:03 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, PROCESS_LOCAL, 7915 bytes)21/12/25 19:33:03 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)21/12/25 19:33:03 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 23 ms on localhost (executor driver) (2/3)Java RDD: 17
Java RDD: 19
Java RDD: 21
Java RDD: 5
Java RDD: 3
Java RDD: 9
21/12/25 19:33:03 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 708 bytes result sent to driver21/12/25 19:33:03 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 18 ms on localhost (executor driver) (3/3)21/12/25 19:33:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool21/12/25 19:33:03 INFO DAGScheduler: ResultStage 0 (foreach at App.java:30) finished in 0.587 s21/12/25 19:33:03 INFO DAGScheduler: Job 0 finished: foreach at App.java:30, took 0.626949 s

Let’s take one more example as Word Count job where words are taken from a text file-

package com.example.client;import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class WordCountJob
{
public static void main(String[] args)
{
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("Word Count App");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

JavaRDD<String> data = javaSparkContext.textFile("/Users/dhruv/Documents/Personal/Learning/Learning/Spark/abc.rtf");
JavaPairRDD<String, Integer> flattenPairs = data.flatMapToPair(text ->Arrays.asList(text.split(" ")).stream().map(word ->new Tuple2<String, Integer>(word, 1)).iterator()); JavaPairRDD<String, Integer> wordCountRDD = flattenPairs.reduceByKey((v1 , v2)-> v1 + v2);
wordCountRDD.saveAsTextFile("/Users/dhruv/Documents/Personal/Learning/Learning/Spark/abc-output.rtf");
}}

Well these are few basic examples which help you to get started with Spark !!

Let’s understand the code line by line-

sparkConf.setMaster("local");

This defines that the application will run inside a single JVM. If you are using a Spark Cluster then setMaster() points to the master URL for that cluster.

JavaSparkContext javaSparkContext = new  JavaSparkContext(sparkConf);

JavaSparkContext is specifically meant for Java based Spark applications and this helps us in building DAG for RDD transformations.

Now, we will look at some common transformations used in Spark

Map Transformation

Here, we will increment values in the list with RDD transformation-

package com.example.client;import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
public class MapTransformation
{
public static void main(String[] args)
{
SparkConf conf = new SparkConf();
conf.setAppName("Spark MultipleContest Test");
conf.set("spark.driver.allowMultipleContexts", "true");
conf.setMaster("local");
SparkContext sc = new SparkContext(conf);
JavaSparkContext context = JavaSparkContext.fromSparkContext(sc);
List<Integer> numList = Arrays.asList(1, 4, 6, 8, 4, 7, 9, 10, 11, 15, 17, 19, 21, 5, 3,9);

JavaRDD<Integer> javaRDD = context.parallelize(numList, 3);
JavaRDD<Integer> incrementedRDD = javaRDD.map(value -> value + 1);
incrementedRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer value) throws Exception {
System.out.println("Java RDD: " + value);
}} );
context.stop();
}
}

The output-

22/01/16 16:54:09 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)Java RDD: 2
Java RDD: 5
Java RDD: 7
Java RDD: 9
Java RDD: 5
22/01/16 16:54:09 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 751 bytes result sent to driver22/01/16 16:54:09 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7905 bytes)22/01/16 16:54:09 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)22/01/16 16:54:09 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 88 ms on localhost (executor driver) (1/3)Java RDD: 8
Java RDD: 10
Java RDD: 11
Java RDD: 12
Java RDD: 16
22/01/16 16:54:09 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 751 bytes result sent to driver22/01/16 16:54:09 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, PROCESS_LOCAL, 7915 bytes)22/01/16 16:54:09 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 21 ms on localhost (executor driver) (2/3)22/01/16 16:54:09 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)Java RDD: 18
Java RDD: 20
Java RDD: 22
Java RDD: 6
Java RDD: 4
Java RDD: 10

flatMap Transformation

This is the transformation when every element of sourceRDD has a corresponding multiple elements in targetRDD-

package com.example.client;import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
public class FlatMapTransformation
{
public static void main(String[] args)
{
SparkConf conf = new SparkConf();
conf.setAppName("Spark MultipleContest Test");
conf.set("spark.driver.allowMultipleContexts", "true");
conf.setMaster("local");

SparkContext sc = new SparkContext(conf);
JavaSparkContext context = JavaSparkContext.fromSparkContext(sc);
List<String> valueList = Arrays.asList("Hello World", "Hello Dhruv", "Hello Jaipur");
JavaRDD<String> javaRDD = context.parallelize(valueList, 3);
JavaRDD<String> flattenRDD = javaRDD.flatMap(value -> Arrays.asList(value.split(" ")).iterator());
flattenRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String value) throws Exception {
System.out.println("Java RDD: " + value);
}} );
context.stop(); }
}

Following is the output-

22/01/16 17:13:29 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)Java RDD: Hello
Java RDD: World
22/01/16 17:13:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 751 bytes result sent to driver22/01/16 17:13:29 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7869 bytes)22/01/16 17:13:29 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)22/01/16 17:13:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 81 ms on localhost (executor driver) (1/3)Java RDD: Hello
Java RDD: Dhruv
22/01/16 17:13:29 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 751 bytes result sent to driver22/01/16 17:13:29 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, PROCESS_LOCAL, 7870 bytes)22/01/16 17:13:29 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 26 ms on localhost (executor driver) (2/3)22/01/16 17:13:29 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)Java RDD: Hello
Java RDD: Jaipur

--

--