Working on RDDs in Apache Spark shell

Dhruv Saksena
3 min readNov 1, 2021

--

The main data structure used in Spark is an RDD.

Let’s work on few quick examples on RDDs-

  1. Start Shell

To start spark shell, the command is spark-shell.

Whenever you open a spark shell, you get access to SparkContext with variable name as “sc”.

 $ spark-shell

2. Filtering RDDs using Spark

First, we create an array of string of city names, then we apply a filter on it. Note at this stage no execution has happened on server, its just forming the DAG for the execution and as soon as we do filteredRdd.collect then execution happens on server and we get a filtered list of Jaipur and Jabalpur.

scala> var stringRdd = sc.parallelize(Array("Delhi","Jaipur","Ahmedabad","Calcutta","Jabalpur","Indore","Mussorie"));stringRdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:23scala> var filteredRdd = stringRdd.filter(value => value.startsWith("Ja"));filteredRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at filter at <console>:23scala> var valueList = filteredRdd.collect;list: Array[String] = Array(Jaipur, Jabalpur)scala> valueListres0: Array[String] = Array(Jaipur, Jabalpur)

If you see in above example, every RDD has a number stringRDD is first RDD denoted by ParallelCollectionRDD[0] and filteredRdd is the second RDD denoted by MapPartitionsRDD[1].

3. reduceByKey() operation

This operation is transformation operation and can only be applied on Pairs RDD, which implies it should be in the form of a key value-pair. It takes an associative reduce function which applies the function on the values having the same key thereby reducing by the key-

scala> var pairArray = sc.parallelize(Array(("a",1),("b",1),("a",2),("c",1),("b",5)));pairArray: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:23scala> var reducedValue = pairArray.reduceByKey((accum , value) => (accum + value));reducedValue: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at <console>:23scala> reducedValue.collectres1: Array[(String, Int)] = Array((a,3), (b,6), (c,1))

4. Convert list to a map

Let’s reuse the stringRdd we created in the example above. Now, let’s convert the same into a map-

scala> var stringMap = stringRdd.map(value => (value,1));stringMap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:23scala> stringMap.collectres2: Array[(String, Int)] = Array((Delhi,1), (Jaipur,1), (Ahmedabad,1), (Calcutta,1), (Jabalpur,1), (Indore,1), (Mussorie,1))

5. Filter operations

Let’s say we want to filter out the numbers which are even-

scala> var numbersRdd = sc.parallelize(Array(1,4,2,5,6,2,4,8,9));numbersRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23scala> var evenNumbersRDD = numbersRdd.filter(num => num%2==0);evenNumbersRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:23scala> evenNumbersRDD.collect;res0: Array[Int] = Array(4, 2, 6, 2, 4, 8)

6. SUM operations

scala> evenNumbersRDD.sum;res1: Double = 26.0

7. Check hierarchy of RDDs

Let’ say we want to check the lineage of an RDD-

scala> evenNumbersRDD.toDebugStringres2: String =(8) MapPartitionsRDD[1] at filter at <console>:23 []|  ParallelCollectionRDD[0] at parallelize at <console>:23 []

Now, we know first a collection was created then a filter was created. This gives you entire dependency tree for an RDD.

8. Check for Spark Context

To check for the SparkContext-

scala> scres0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@41a7445f

9. Check version of Spark context

scala> sc.version
res0: String = 3.2.0

10. Default parallelism in Spark

To check number of tasks spark can execute in parallel-

scala> sc.defaultParallelismres1: Int = 8

12. Customising parallelism

To customize the number of parallel tasks, simply add the value as parameter to parallelism() method call —

val sampleRDD = sc.parallelize(1 to 10000 , 5);

Here, it will execute the task in 5 parallel threads.

14. collect() action

Whenever we execute .collect() action on an RDD, it brings all data in current RDD from all the nodes to the driver node.

--

--

No responses yet