Spark cache and persist
In Spark, you may need the same set of data many times during execution. Fetching it every time from source may be a time consuming and inefficient process. To overcome this, Spark provides caching mechanism where you can store a data in cache and extract it anytime in a much faster fashion.
In Spark, whenever an action is called then entire DAG is re-executed, which is not only time consuming but inefficient at the same time. Here is where caching helps in reducing in-efficiency by using the cached RDDs rather than creating a new one every time.
There are two ways to cache an RDD-
- cache() — persists data in unserialized format in memory. This cache is localized to the node processing the partition of the RDD. If that node is lost then next calling of action will not be from the cached RDD. Since, it is unserialized hence the memory consumption is on a higher side.
- persist() — To overcome the limitations of cache(), there is persist() method which has many options of the kind of storage one wants and at the same time supports replication of the cache aswell.
Following are different types of storage levels in persist() -
MEMORY_ONLY- Its stored in RAM and has high memory consumption and no replication.
MEMORY_ONLY_2 — It’s same as MEMORY_ONLY , but supports replication of cache across nodes.
MEMORY_ONLY_SER — It’s the serialized version of MEMORY_ONLY and consumes lesser memory than MEMORY_ONLY.
MEMORY_ONLY_SER_2 — This is same as MEMORY_ONLY_SER just that it has replication aswell.
DISK_ONLY — Here rather than storing on RAM, we are storing in Disk. Since, you are persisting on disk, hence it is serialized in nature.
DISK_ONLY_2 — It’s same as disk but supports replication aswell.
Then, we have combination of MEMORY and DISK aswell with both serialized and replication added to it. This gives the developers to choose different options based on criticality of data.
Spark uses LRU caching mechanism to manage cache in memory.
package com.example.client;import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;public class SparkCacheExample
{
public static void main(String[] args)
{
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("Caching example");
sparkConf.set("spark.driver.allowMultipleContexts", "true");
sparkConf.setMaster("local"); SparkContext sparkContext = new SparkContext(sparkConf);
JavaSparkContext javaSparkContext =
JavaSparkContext.fromSparkContext(sparkContext); JavaRDD rdd = javaSparkContext.parallelize(Arrays.asList(1 , 2, 3 ,4 ,5 ,6 ,7), 3).cache(); JavaRDD evenRDD = rdd.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer v1) throws Exception {
return ((v1%2)==0)? true: false;
} });
evenRDD.persist(StorageLevel.MEMORY_AND_DISK());
evenRDD.foreach(new VoidFunction<Integer>() { @Override
public void call(Integer t) throws Exception {
System.out.println("Value is :: " + t);
}}); evenRDD.unpersist();
rdd.unpersist(); }}