Wednesday, August 16, 2017

Spark Programming

What is RDD:
The main abstraction Spark provides is a Resilient Distributed Dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures. Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel.

RDD Types:

Parallelized collections, which take an existing Scala collection and run functions on it in parallel.
Parallelized collections are created by calling SparkContext’s parallelize method on an existing Scala collection (a Seq object). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel
One important parameter for parallel collections is the number of slices to cut the dataset into. Spark will run one task for each slice of the cluster. Typically, you want 2-4 slices for each CPU in your cluster. Normally, Spark tries to set the number of slices automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc. parallelize(data, 10)).

Hadoop datasets, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop.
Spark can create distributed datasets from any file stored in the Hadoop distributed file system (HDFS) or other storage systems supported by Hadoop (including your local file system, Amazon S3, Hypertable, HBase). Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
For other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method, which takes an arbitrary JobConf and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source.

RDD Operations
Transformations which create a new dataset from an existing one, and Actions, which return a value to the driver program after running a computation on the dataset.

For example, map is a transformation that passes each dataset element through a function and returns a new distributed dataset representing the results. On the other hand, reduce is an action that aggregates all the elements of the dataset using some function and returns the final result to the driver program.

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset. The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently – for example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

By default, each transformed RDD is recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting datasets on disk.

How to share state between nodes:
A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable need to be shared across tasks, or between tasks and the driver program.

broadcast variables: which can be used to cache a value in memory on all nodes,
accumulators: which are variables that are only “added” to, such as counters and sums.

Dataset (New Abstraction of Spark)
For long, RDD was the standard abstraction of Spark. But from Spark 2.0, Dataset will become the new abstraction layer for spark. Though RDD API will be available, it will become low-level API, used mostly for runtime and library development. All userland code will be written against the Dataset abstraction and it’s subset Dataframe API.

Dataset is a superset of Dataframe API which is released in Spark 1.3. Dataset together with Dataframe API brings better performance and flexibility to the platform compared to RDD API. Dataset will be also replacing RDD as an abstraction for streaming in future releases.

SparkSession (New entry point of Spark)
In earlier versions of spark, spark context was an entry point for Spark. As RDD was the main API, it was created and manipulated using context API’s. For every other API, we needed to use different contexts. For streaming, we needed StreamingContext, for SQL sqlContext and for hive HiveContext. But as DataSet and Dataframe API’s are becoming new standard API’s we need an entry point build for them. So in Spark 2.0, we have a new entry point for DataSet and Dataframe API’s called as Spark Session.

SparkSession is essentially a combination of SQLContext, HiveContext and StreamingContext. All the API’s available on those contexts are available on spark session also. Spark session internally has a spark context for actual computation.
Creating SparkSession
val sparkSession = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()
The above is similar to creating an SparkContext with local and creating an SQLContext wrapping it.
The Spark Session encapsulates the existing Spark Context, therefore, existing functionality should not be affected and developers may continue using the Spark Context as desired. However, the new Spark Session abstraction is preferred by the Spark community in Spark 2.0.0 on beyond.

Read data using Spark Session

The below code is reading data from csv using spark session.

val df = sparkSession.read.option("header","true").
    csv("src/main/resources/sales.csv")