Working on RDDs in Apache Spark shell
The main data structure used in Spark is an RDD.
Let’s work on few quick examples on RDDs-
- Start Shell
To start spark shell, the command is spark-shell.
Whenever you open a spark shell, you get access to SparkContext with variable name as “sc”.
$ spark-shell
2. Filtering RDDs using Spark
First, we create an array of string of city names, then we apply a filter on it. Note at this stage no execution has happened on server, its just forming the DAG for the execution and as soon as we do filteredRdd.collect then execution happens on server and we get a filtered list of Jaipur and Jabalpur.
scala> var stringRdd = sc.parallelize(Array("Delhi","Jaipur","Ahmedabad","Calcutta","Jabalpur","Indore","Mussorie"));stringRdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:23scala> var filteredRdd = stringRdd.filter(value => value.startsWith("Ja"));filteredRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at filter at <console>:23scala> var valueList = filteredRdd.collect;list: Array[String] = Array(Jaipur, Jabalpur)scala> valueListres0: Array[String] = Array(Jaipur, Jabalpur)
If you see in above example, every RDD has a number stringRDD is first RDD denoted by ParallelCollectionRDD[0] and filteredRdd is the second RDD denoted by MapPartitionsRDD[1].
3. reduceByKey() operation
This operation is transformation operation and can only be applied on Pairs RDD, which implies it should be in the form of a key value-pair. It takes an associative reduce function which applies the function on the values having the same key thereby reducing by the key-
scala> var pairArray = sc.parallelize(Array(("a",1),("b",1),("a",2),("c",1),("b",5)));pairArray: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:23scala> var reducedValue = pairArray.reduceByKey((accum , value) => (accum + value));reducedValue: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at <console>:23scala> reducedValue.collectres1: Array[(String, Int)] = Array((a,3), (b,6), (c,1))
4. Convert list to a map
Let’s reuse the stringRdd we created in the example above. Now, let’s convert the same into a map-
scala> var stringMap = stringRdd.map(value => (value,1));stringMap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:23scala> stringMap.collectres2: Array[(String, Int)] = Array((Delhi,1), (Jaipur,1), (Ahmedabad,1), (Calcutta,1), (Jabalpur,1), (Indore,1), (Mussorie,1))
5. Filter operations
Let’s say we want to filter out the numbers which are even-
scala> var numbersRdd = sc.parallelize(Array(1,4,2,5,6,2,4,8,9));numbersRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23scala> var evenNumbersRDD = numbersRdd.filter(num => num%2==0);evenNumbersRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:23scala> evenNumbersRDD.collect;res0: Array[Int] = Array(4, 2, 6, 2, 4, 8)
6. SUM operations
scala> evenNumbersRDD.sum;res1: Double = 26.0
7. Check hierarchy of RDDs
Let’ say we want to check the lineage of an RDD-
scala> evenNumbersRDD.toDebugStringres2: String =(8) MapPartitionsRDD[1] at filter at <console>:23 []| ParallelCollectionRDD[0] at parallelize at <console>:23 []
Now, we know first a collection was created then a filter was created. This gives you entire dependency tree for an RDD.
8. Check for Spark Context
To check for the SparkContext-
scala> scres0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@41a7445f
9. Check version of Spark context
scala> sc.version
res0: String = 3.2.0
10. Default parallelism in Spark
To check number of tasks spark can execute in parallel-
scala> sc.defaultParallelismres1: Int = 8
12. Customising parallelism
To customize the number of parallel tasks, simply add the value as parameter to parallelism() method call —
val sampleRDD = sc.parallelize(1 to 10000 , 5);
Here, it will execute the task in 5 parallel threads.
14. collect() action
Whenever we execute .collect() action on an RDD, it brings all data in current RDD from all the nodes to the driver node.