Fisseha Berhane, PhD

Data Scientist

443-970-2353 [email protected] CV Resume Linkedin GitHub twitter twitter

Introduction to Spark programming

Part 1. Quick overview

Spark is up to 100 times faster than Hadoop for certain applications and it is well suited to machine learning algorithms. One can read about Spark from spark website. Wikipedia also has general information about it.

Here, I am using the Python programming interface to Spark (pySpark which provides an easy-to-use programming abstraction and parallel runtime

Spark uses SparkContext to create RDDs

There are two types of operations. These are transformations and actions. Transformations are not computed immediately (they are lazy) e.g, map, filter. Transformed RDD is executed when action, such as collect and count, runs on it. We can also use cache to speed up operations.

Let's see a first example that adds the factorial of the first "n" non-negative integers. Let's use the xrange() function to create a list() of integers. xrange() only generates values as they are needed. This is different from the behavior of range() which generates the complete list upon execution. Because of this xrange() is more memory efficient than range(), especially for large ranges.

In [126]:
fish = xrange(101) # considering 0 to 100 inclusive
In [127]:
# Parallelize data using 8 partitions
# This operation is a transformation of data into an RDD
# Spark uses lazy evaluation, so no Spark jobs are ran at this point

factRDD = sc.parallelize(fish, 8)

Now, let us write a simple python function that computes the factorial of a number and squares it.

In [128]:
import math
In [129]:
# create a function that takes a number and calculates its factorial and squares it.
def myfunc(x):
    return (math.factorial(x)**2)
In [130]:
# Transform xrangeRDD through map transformation using myfunc function
# Because map is a transformation and Spark uses lazy evaluation, no jobs, stages,
# or tasks will be launched when we run this code.
factRDD1 = factRDD.map(myfunc)
In [131]:
# Obtain Python's add function
from operator import add
In [132]:
# Let's add and get the data
print factRDD1.reduce(add)
8710653556213906593217910546751236826770075534889462661969685875552902356740023437770774407535725726293284584188324726120379196905024071726217538933934155513097657108629996497860433734881084395513260664638775792257178708682117770296229335023994961148002046177649327941362944395975863609110502747608704759517069851818

Simple enough!


We can use the function filter() to get values that fulfill certain criteria.

In [133]:
fish = xrange(101) # considering 0 to 100 inclusive
In [134]:
# Parallelize data using 8 partitions
# This operation is a transformation of data into an RDD
# Spark uses lazy evaluation, so no Spark jobs are ran at this point

myRDD = sc.parallelize(fish, 8)
In [135]:
# Let's get numbers divisible by 7
myRDDdiv7 = myRDD.filter(lambda x:x%7==0)
In [136]:
# Let's collect the data
print myRDDdiv7.collect()
[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98]

Let's add the first 10,000 non-negative integers

In [137]:
fish = xrange(10001) # considering 0 to 10000 inclusive
In [138]:
# Parallelize data using 8 partitions
# This operation is a transformation of data into an RDD
# Spark uses lazy evaluation, so no Spark jobs are ran at this point

myRDD = sc.parallelize(fish, 8)
In [139]:
# add the numbers
print myRDD.reduce(add)
50005000

Part 2: Spark Context

In Spark, communication occurs between a driver and executors. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion. The results from these tasks are delivered back to the driver.

In part 1, we saw that normal python code can be executed via cells. When using Databricks Cloud this code gets executed in the Spark driver's Java Virtual Machine (JVM) and not in an executor's JVM, and when using an IPython notebook it is executed within the kernel associated with the notebook. Since no Spark functionality is actually being used, no tasks are launched on the executors.

In order to use Spark and its API we will need to use a SparkContext. When running Spark, you start a new Spark application by creating a SparkContext. When the SparkContext is created, it asks the master for some cores to use to do work. The master sets these cores aside just for you; they won't be used for other applications. When using Databricks Cloud or the virtual machine provisioned for this class, the SparkContext is created for you automatically as sc.

(2a) Example Cluster

The diagram below shows an example cluster, where the cores allocated for an application are outlined in purple.

executors

At a high level, every Spark application consists of a driver program that launches various parallel operations on executor Java Virtual Machines (JVMs) running either in a cluster or locally on the same machine. In Databricks Cloud, "Databricks Shell" is the driver program. When running locally, "PySparkShell" is the driver program. In all cases, this driver program contains the main loop for the program and creates distributed datasets on the cluster, then applies operations (transformations & actions) to those datasets.

Driver programs access Spark through a SparkContext object, which represents a connection to a computing cluster. A Spark context object (sc) is the main entry point for Spark functionality. A Spark context can be used to create Resilient Distributed Datasets (RDDs) on a cluster.

Try printing out sc to see its type.

In [56]:
# Display the type of the Spark Context sc
type(sc)
Out[56]:
pyspark.context.SparkContext

(2b) SparkContext attributes

You can use Python's dir() function to get a list of all the attributes (including methods) accessible through the sc object.

In [2]:
# List sc's attributes
dir(sc)

(2c) Getting help

Alternatively, you can use Python's help() function to get an easier to read list of all the attributes, including examples, that the sc object has.

In [3]:
# Use help to obtain more detailed information
help(sc)
In [59]:
# After reading the help we've decided we want to use sc.version to see what version of Spark we are running
sc.version
Out[59]:
u'1.3.1'
In [60]:
# Help can be used on any Python object
help(map)
Help on built-in function map in module __builtin__:

map(...)
    map(function, sequence[, sequence, ...]) -> list
    
    Return a list of the results of applying the function to the items of
    the argument sequence(s).  If more than one sequence is given, the
    function is called with an argument list consisting of the corresponding
    item of each sequence, substituting None for missing values when not all
    sequences have the same length.  If the function is None, return a list of
    the items of the sequence (or a list of tuples if more than one sequence).

Part 3: Using RDDs and chaining together transformations and actions

Working with your first RDD

In Spark, we first create a base Resilient Distributed Dataset (RDD). We can then apply one or more transformations to that base RDD. An RDD is immutable, so once it is created, it cannot be changed. As a result, each transformation creates a new RDD. Finally, we can apply one or more actions to the RDDs. Note that Spark uses lazy evaluation, so transformations are not actually executed until an action occurs.

We will perform several exercises to obtain a better understanding of RDDs:

  • ##### Create a Python collection of 10,000 integers
  • ##### Create a Spark base RDD from that collection
  • ##### Subtract one from each value using map
  • ##### Perform action collect to view results
  • ##### Perform action count to view counts
  • ##### Apply transformation filter and view results with collect
  • ##### Learn about lambda functions
  • ##### Explore how lazy evaluation works and the debugging challenges that it introduces

(3a) Create a Python collection of integers in the range of 1 .. 10000

We will use the xrange() function to create a list() of integers. xrange() only generates values as they are needed. This is different from the behavior of range() which generates the complete list upon execution. Because of this xrange() is more memory efficient than range(), especially for large ranges.

In [61]:
data = xrange(1, 10001)
In [62]:
type(data)
Out[62]:
xrange
In [63]:
# Data is just a normal Python list
# Obtain data's first element
data[0]
Out[63]:
1
In [64]:
data[100]
Out[64]:
101
In [65]:
# We can check the size of the list using the len() function
len(data)
Out[65]:
10000

(3b) Distributed data and using a collection to create an RDD

In Spark, datasets are represented as a list of entries, where the list is broken up into many different partitions that are each stored on a different machine. Each partition holds a unique subset of the entries in the list. Spark calls datasets that it stores "Resilient Distributed Datasets" (RDDs).

One of the defining features of Spark, compared to other data analytics frameworks (e.g., Hadoop), is that it stores data in memory rather than on disk. This allows Spark applications to run much more quickly, because they are not slowed down by needing to read data from disk.

The figure below illustrates how Spark breaks a list of data entries into partitions that are each stored in memory on a worker.

partitions

There are many different types of RDDs. The base class for RDDs is pyspark.RDD and other RDDs subclass pyspark.RDD. Since the other RDD types inherit from pyspark.RDD they have the same APIs and are functionally identical. We'll see that sc.parallelize() generates a pyspark.rdd.PipelinedRDD when its input is an xrange, and a pyspark.RDD when its input is a range.

After we generate RDDs, we can view them in the "Storage" tab of the web UI. You'll notice that new datasets are not listed until Spark needs to return a result due to an action being executed. This feature of Spark is called "lazy evaluation". This allows Spark to avoid performing unnecessary calculations.

In [66]:
# Parallelize data using 8 partitions
# This operation is a transformation of data into an RDD
# Spark uses lazy evaluation, so no Spark jobs are run at this point
xrangeRDD = sc.parallelize(data, 8)
In [67]:
# Let's view help on parallelize
help(sc.parallelize)
Help on method parallelize in module pyspark.context:

parallelize(self, c, numSlices=None) method of pyspark.context.SparkContext instance
    Distribute a local Python collection to form an RDD. Using xrange
    is recommended if the input represents a range for performance.
    
    >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
    [[0], [2], [3], [4], [6]]
    >>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
    [[], [0], [], [2], [4]]

In [68]:
# Let's see what type sc.parallelize() returned
print 'type of xrangeRDD: {0}'.format(type(xrangeRDD))

# How about if we use a range
dataRange = range(1, 10001)
rangeRDD = sc.parallelize(dataRange, 8)
print 'type of dataRangeRDD: {0}'.format(type(rangeRDD))
type of xrangeRDD: <class 'pyspark.rdd.PipelinedRDD'>
type of dataRangeRDD: <class 'pyspark.rdd.RDD'>
In [69]:
# Each RDD gets a unique ID
print 'xrangeRDD id: {0}'.format(xrangeRDD.id())
print 'rangeRDD id: {0}'.format(rangeRDD.id())
xrangeRDD id: 76
rangeRDD id: 75
In [70]:
# We can name each newly created RDD using the setName() method
xrangeRDD.setName('My first RDD')
Out[70]:
My first RDD PythonRDD[76] at RDD at PythonRDD.scala:43
In [71]:
# Let's view the lineage (the set of transformations) of the RDD using toDebugString()
print xrangeRDD.toDebugString()
(8) My first RDD PythonRDD[76] at RDD at PythonRDD.scala:43 []
 |  ParallelCollectionRDD[74] at parallelize at PythonRDD.scala:392 []
In [4]:
# Let's use help to see what methods we can call on this RDD
help(xrangeRDD)
In [73]:
# Let's see how many partitions the RDD will be split into by using the getNumPartitions()
xrangeRDD.getNumPartitions()
Out[73]:
8

(3c): Subtract one from each value using map

So far, we've created a distributed dataset that is split into many partitions, where each partition is stored on a single machine in our cluster. Let's look at what happens when we do a basic operation on the dataset. Many useful data analysis operations can be specified as "do something to each item in the dataset". These data-parallel operations are convenient because each item in the dataset can be processed individually: the operation on one entry doesn't effect the operations on any of the other entries. Therefore, Spark can parallelize the operation.

map(f), the most common Spark transformation, is one such example: it applies a function f to each item in the dataset, and outputs the resulting dataset. When you run map() on a dataset, a single stage of tasks is launched. A stage is a group of tasks that all perform the same computation, but on different input data. One task is launched for each partitition, as shown in the example below. A task is a unit of execution that runs on a single machine. When we run map(f) within a partition, a new task applies f to all of the entries in a particular partition, and outputs a new partition. In this example figure, the dataset is broken into four partitions, so four map() tasks are launched.

tasks

The figure below shows how this would work on the smaller data set from the earlier figures. Note that one task is launched for each partition.

foo

When applying the map() transformation, each item in the parent RDD will map to one element in the new RDD. So, if the parent RDD has twenty elements, the new RDD will also have twenty items.

Now we will use map() to subtract one from each value in the base RDD we just created. First, we define a Python function called sub() that will subtract one from the input integer. Second, we will pass each item in the base RDD into a map() transformation that applies the sub() function to each element. And finally, we print out the RDD transformation hierarchy using toDebugString().

In [74]:
# Create sub function to subtract 1
def sub(value):
    """"Subtracts one from `value`.

    Args:
       value (int): A number.

    Returns:
        int: `value` minus one.
    """
    return (value - 1)

# Transform xrangeRDD through map transformation using sub function
# Because map is a transformation and Spark uses lazy evaluation, no jobs, stages,
# or tasks will be launched when we run this code.
subRDD = xrangeRDD.map(sub)

# Let's see the RDD transformation hierarchy
print subRDD.toDebugString()
(8) PythonRDD[77] at RDD at PythonRDD.scala:43 []
 |  ParallelCollectionRDD[74] at parallelize at PythonRDD.scala:392 []

(3d) Perform action collect to view results

To see a list of elements decremented by one, we need to create a new list on the driver from the the data distributed in the executor nodes. To do this we call the collect() method on our RDD. collect() is often used after a filter or other operation to ensure that we are only returning a small amount of data to the driver. This is done because the data returned to the driver must fit into the driver's available memory. If not, the driver will crash.

The collect() method is the first action operation that we have encountered. Action operations cause Spark to perform the (lazy) transformation operations that are required to compute the RDD returned by the action. In our example, this means that tasks will now be launched to perform the parallelize, map, and collect operations.

In this example, the dataset is broken into four partitions, so four collect() tasks are launched. Each task collects the entries in its partition and sends the result to the SparkContext, which creates a list of the values, as shown in the figure below.

collect

The above figures showed what would happen if we ran collect() on a small example dataset with just four partitions.

Now let's run collect() on subRDD.

In [5]:
# Let's collect the data
print subRDD.collect()

(3d) Perform action count to view counts

One of the most basic jobs that we can run is the count() job which will count the number of elements in an RDD using the count() action. Since map() creates a new RDD with the same number of elements as the starting RDD, we expect that applying count() to each RDD will return the same result.

Note that because count() is an action operation, if we had not already performed an action with collect(), then Spark would now perform the transformation operations when we executed count().

Each task counts the entries in its partition and sends the result to your SparkContext, which adds up all of the counts. The figure below shows what would happen if we ran count() on a small example dataset with just four partitions.

count

In [76]:
print xrangeRDD.count()
print subRDD.count()
10000
10000

(3e) Apply transformation filter and view results with collect

Next, we'll create a new RDD that only contains the values less than ten by using the filter(f) data-parallel operation. The filter(f) method is a transformation operation that creates a new RDD from the input RDD by applying filter function f to each item in the parent RDD and only passing those elements where the filter function returns True. Elements that do not return True will be dropped. Like map(), filter can be applied individually to each entry in the dataset, so is easily parallelized using Spark.

The figure below shows how this would work on the small four-partition dataset.

filter

To filter this dataset, we'll define a function called ten(), which returns True if the input is less than 10 and False otherwise. This function will be passed to the filter() transformation as the filter function f.

To view the filtered list of elements less than ten, we need to create a new list on the driver from the distributed data on the executor nodes. We use the collect() method to return a list that contains all of the elements in this filtered RDD to the driver program.

In [77]:
# Define a function to filter a single value
def ten(value):
    """Return whether value is below ten.

    Args:
        value (int): A number.

    Returns:
        bool: Whether `value` is less than ten.
    """
    if (value < 10):
        return True
    else:
        return False
# The ten function could also be written concisely as: def ten(value): return value < 10

# Pass the function ten to the filter transformation
# Filter is a transformation so no tasks are run
filteredRDD = subRDD.filter(ten)

# View the results using collect()
# Collect is an action and triggers the filter transformation to run
print filteredRDD.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Part 4: Lambda Functions

(4a) Using Python lambda() functions

Python supports the use of small one-line anonymous functions that are not bound to a name at runtime. Borrowed from LISP, these lambda functions can be used wherever function objects are required. They are syntactically restricted to a single expression. Remember that lambda functions are a matter of style and using them is never required - semantically, they are just syntactic sugar for a normal function definition. You can always define a separate normal function instead, but using a lambda() function is an equivalent and more compact form of coding. Ideally you should consider using lambda functions where you want to encapsulate non-reusable code without littering your code with one-line functions.

Here, instead of defining a separate function for the filter() transformation, we will use an inline lambda() function.

In [78]:
lambdaRDD = subRDD.filter(lambda x: x < 10)
lambdaRDD.collect()
Out[78]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
In [79]:
# Let's collect the even values less than 10
evenRDD = lambdaRDD.filter(lambda x: x % 2 == 0)
evenRDD.collect()
Out[79]:
[0, 2, 4, 6, 8]

Part 5: Additional RDD actions

(5a) Other common actions

Let's investigate the additional actions: first(), take(), top(), takeOrdered(), and reduce()

One useful thing to do when we have a new dataset is to look at the first few entries to obtain a rough idea of what information is available. In Spark, we can do that using the first(), take(), top(), and takeOrdered() actions. Note that for the first() and take() actions, the elements that are returned depend on how the RDD is partitioned.

Instead of using the collect() action, we can use the take(n) action to return the first n elements of the RDD. The first() action returns the first element of an RDD, and is equivalent to take(1).

The takeOrdered() action returns the first n elements of the RDD, using either their natural order or a custom comparator. The key advantage of using takeOrdered() instead of first() or take() is that takeOrdered() returns a deterministic result, while the other two actions may return differing results, depending on the number of partions or execution environment. takeOrdered() returns the list sorted in ascending order. The top() action is similar to takeOrdered() except that it returns the list in descending order.

The reduce() action reduces the elements of a RDD to a single value by applying a function that takes two parameters and returns a single value. The function should be commutative and associative, as reduce() is applied at the partition level and then again to aggregate results from partitions. If these rules don't hold, the results from reduce() will be inconsistent. Reducing locally at partitions makes reduce() very efficient.

In [80]:
# Let's get the first element
print filteredRDD.first()
# The first 4
print filteredRDD.take(4)
# Note that it is ok to take more elements than the RDD has
print filteredRDD.take(12)
0
[0, 1, 2, 3]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
In [81]:
# Retrieve the three smallest elements
print filteredRDD.takeOrdered(3)
# Retrieve the five largest elements
print filteredRDD.top(5)
[0, 1, 2]
[9, 8, 7, 6, 5]
In [82]:
# Pass a lambda function to takeOrdered to reverse the order
filteredRDD.takeOrdered(4, lambda s: -s)
Out[82]:
[9, 8, 7, 6]
In [83]:
# Obtain Python's add function
from operator import add
# Efficiently sum the RDD using reduce
print filteredRDD.reduce(add)
# Sum using reduce with a lambda function
print filteredRDD.reduce(lambda a, b: a + b)
# Note that subtraction is not both associative and commutative
print filteredRDD.reduce(lambda a, b: a - b)
print filteredRDD.repartition(4).reduce(lambda a, b: a - b)
# While addition is
print filteredRDD.repartition(4).reduce(lambda a, b: a + b)
45
45
-45
21
45

(5b) Advanced actions

Here are two additional actions that are useful for retrieving information from an RDD: takeSample() and countByValue()

The takeSample() action returns an array with a random sample of elements from the dataset. It takes in a withReplacement argument, which specifies whether it is okay to randomly pick the same item multiple times from the parent RDD (so when withReplacement=True, you can get the same item back multiple times). It also takes an optional seed parameter that allows you to specify a seed value for the random number generator, so that reproducible results can be obtained.

The countByValue() action returns the count of each unique value in the RDD as a dictionary that maps values to counts.

In [84]:
# takeSample reusing elements
print filteredRDD.takeSample(withReplacement=True, num=6)
# takeSample without reuse
print filteredRDD.takeSample(withReplacement=False, num=6)
[2, 9, 7, 5, 9, 0]
[6, 9, 8, 2, 5, 4]
In [85]:
# Set seed for predictability
print filteredRDD.takeSample(withReplacement=False, num=6, seed=500)
# Try reruning this cell and the cell above -- the results from this cell will remain constant
# Use ctrl-enter to run without moving to the next cell
[0, 2, 5, 3, 6, 9]
In [86]:
# Create new base RDD to show countByValue
repetitiveRDD = sc.parallelize([1, 2, 3, 1, 2, 3, 1, 2, 1, 2, 3, 3, 3, 4, 5, 4, 6])
print repetitiveRDD.countByValue()
defaultdict(<type 'int'>, {1: 4, 2: 4, 3: 5, 4: 2, 5: 1, 6: 1})

Part 6: Additional RDD transformations

(6a) flatMap

When performing a map() transformation using a function, sometimes the function will return more (or less) than one element. We would like the newly created RDD to consist of the elements outputted by the function. Simply applying a map() transformation would yield a new RDD made up of iterators. Each iterator could have zero or more elements. Instead, we often want an RDD consisting of the values contained in those iterators. The solution is to use a flatMap() transformation, flatMap() is similar to map(), except that with flatMap() each input item can be mapped to zero or more output elements.

To demonstrate flatMap(), we will first emit a word along with its plural, and then a range that grows in length with each subsequent operation.

In [87]:
# Let's create a new base RDD to work from
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)

# Use map
singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
# Use flatMap
singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))

# View the results
print singularAndPluralWordsRDDMap.collect()
print singularAndPluralWordsRDD.collect()
# View the number of elements in the RDD
print singularAndPluralWordsRDDMap.count()
print singularAndPluralWordsRDD.count()
[('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')]
['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']
5
10
In [88]:
simpleRDD = sc.parallelize([2, 3, 4])
print simpleRDD.map(lambda x: range(1, x)).collect()
print simpleRDD.flatMap(lambda x: range(1, x)).collect()
[[1], [1, 2], [1, 2, 3]]
[1, 1, 2, 1, 2, 3]

(6b) groupByKey and reduceByKey

Let's investigate the additional transformations: groupByKey() and reduceByKey().

Both of these transformations operate on pair RDDs. A pair RDD is an RDD where each element is a pair tuple (key, value). For example, sc.parallelize([('a', 1), ('a', 2), ('b', 1)]) would create a pair RDD where the keys are 'a', 'a', 'b' and the values are 1, 2, 1.

The reduceByKey() transformation gathers together pairs that have the same key and applies a function to two associated values at a time. reduceByKey() operates by applying the function first within each partition on a per-key basis and then across the partitions.

While both the groupByKey() and reduceByKey() transformations can often be used to solve the same problem and will produce the same answer, the reduceByKey() transformation works much better for large distributed datasets. This is because Spark knows it can combine output with a common key on each partition before shuffling (redistributing) the data across nodes. Only use groupByKey() if the operation would not benefit from reducing the data before the shuffle occurs.

Look at the diagram below to understand how reduceByKey works. Notice how pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result.

reduceByKey() figure

On the other hand, when using the groupByKey() transformation - all the key-value pairs are shuffled around, causing a lot of unnecessary data to being transferred over the network.

To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time, so if a single key has more key-value pairs than can fit in memory an out of memory exception occurs. This will be more gracefully handled in a later release of Spark so that the job can still proceed, but should still be avoided. When Spark needs to spill to disk, performance is severely impacted.

groupByKey() figure

As your dataset grows, the difference in the amount of data that needs to be shuffled, between the reduceByKey() and groupByKey() transformations, becomes increasingly exaggerated.

Here are more transformations to prefer over groupByKey():

  • #### combineByKey() can be used when you are combining elements but your return type differs from your input value type.
  • #### foldByKey() merges the values for each key using an associative function and a neutral "zero value". #### Now let's go through a simple groupByKey() and reduceByKey() example.
In [89]:
pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
# mapValues only used to improve format for printing
print pairRDD.groupByKey().mapValues(lambda x: list(x)).collect()

# Different ways to sum by key
print pairRDD.groupByKey().map(lambda (k, v): (k, sum(v))).collect()
# Using mapValues, which is recommended when they key doesn't change
print pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect()
# reduceByKey is more efficient / scalable
print pairRDD.reduceByKey(add).collect()
[('a', [1, 2]), ('b', [1])]
[('a', 3), ('b', 1)]
[('a', 3), ('b', 1)]
[('a', 3), ('b', 1)]

(6c) Advanced transformations [Optional]

Let's investigate the advanced transformations: mapPartitions() and mapPartitionsWithIndex()

The mapPartitions() transformation uses a function that takes in an iterator (to the items in that specific partition) and returns an iterator. The function is applied on a partition by partition basis.

The mapPartitionsWithIndex() transformation uses a function that takes in a partition index (think of this like the partition number) and an iterator (to the items in that specific partition). For every partition (index, iterator) pair, the function returns a tuple of the same partition index number and an iterator of the transformed items in that partition.

In [90]:
# mapPartitions takes a function that takes an iterator and returns an iterator
print wordsRDD.collect()
itemsRDD = wordsRDD.mapPartitions(lambda iterator: [','.join(iterator)])
print itemsRDD.collect()
['cat', 'elephant', 'rat', 'rat', 'cat']
['cat', 'elephant', 'rat', 'rat,cat']
In [91]:
itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: [(index, list(iterator))])
# We can see that three of the (partitions) workers have one element and the fourth worker has two
# elements, although things may not bode well for the rat...
print itemsByPartRDD.collect()
# Rerun without returning a list (acts more like flatMap)
itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: (index, list(iterator)))
print itemsByPartRDD.collect()
[(0, ['cat']), (1, ['elephant']), (2, ['rat']), (3, ['rat', 'cat'])]
[0, ['cat'], 1, ['elephant'], 2, ['rat'], 3, ['rat', 'cat']]

Part 7: Caching RDDs and storage options

(7a) Caching RDDs

For efficiency Spark keeps your RDDs in memory. By keeping the contents in memory, Spark can quickly access the data. However, memory is limited, so if you try to keep too many RDDs in memory, Spark will automatically delete RDDs from memory to make space for new RDDs. If you later refer to one of the RDDs, Spark will automatically recreate the RDD for you, but that takes time.

So, if you plan to use an RDD more than once, then you should tell Spark to cache that RDD. You can use the cache() operation to keep the RDD in memory. However, if you cache too many RDDs and Spark runs out of memory, it will delete the least recently used (LRU) RDD first. Again, the RDD will be automatically recreated when accessed.

You can check if an RDD is cached by using the is_cached attribute, and you can see your cached RDD in the "Storage" section of the Spark web UI. If you click on the RDD's name, you can see more information about where the RDD is stored.

In [92]:
# Name the RDD
filteredRDD.setName('My Filtered RDD')
# Cache the RDD
filteredRDD.cache()
# Is it cached
print filteredRDD.is_cached
True

(7b) Unpersist and storage options

Spark automatically manages the RDDs cached in memory and will save them to disk if it runs out of memory. For efficiency, once you are finished using an RDD, you can optionally tell Spark to stop caching it in memory by using the RDD's unpersist() method to inform Spark that you no longer need the RDD in memory.

You can see the set of transformations that were applied to create an RDD by using the toDebugString() method, which will provide storage information, and you can directly query the current storage information for an RDD using the getStorageLevel() operation.

Advanced: Spark provides many more options for managing how RDDs are stored in memory or even saved to disk. You can explore the API for RDD's persist() operation using Python's help() command. The persist() operation, optionally, takes a pySpark StorageLevel object.

In [93]:
# Note that toDebugString also provides storage information
print filteredRDD.toDebugString()
(8) My Filtered RDD PythonRDD[80] at collect at <ipython-input-77-2e6525e1a0c2>:23 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[74] at parallelize at PythonRDD.scala:392 [Memory Serialized 1x Replicated]
In [94]:
# If we are done with the RDD we can unpersist it so that its memory can be reclaimed
filteredRDD.unpersist()
# Storage level for a non cached RDD
print filteredRDD.getStorageLevel()
filteredRDD.cache()
# Storage level for a cached RDD
print filteredRDD.getStorageLevel()
Serialized 1x Replicated
Memory Serialized 1x Replicated

Part 8: Debugging Spark applications and lazy evaluation

How Python is Executed in Spark

Internally, Spark executes using a Java Virtual Machine (JVM). pySpark runs Python code in a JVM using Py4J. Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a Java Virtual Machine. Methods are called as if the Java objects resided in the Python interpreter and Java collections can be accessed through standard Python collection methods. Py4J also enables Java programs to call back Python objects.

Because pySpark uses Py4J, coding errors often result in a complicated, confusing stack trace that can be difficult to understand. In the following section, we'll explore how to understand stack traces.

(8a) Challenges with lazy evaluation using transformations and actions

Spark's use of lazy evaluation can make debugging more difficult because code is not always executed immediately. To see an example of how this can happen, let's first define a broken filter function.

Next we perform a filter() operation using the broken filtering function. No error will occur at this point due to Spark's use of lazy evaluation.

The filter() method will not be executed until an action operation is invoked on the RDD. We will perform an action by using the collect() method to return a list that contains all of the elements in this RDD.

In [95]:
def brokenTen(value):
    """Incorrect implementation of the ten function.

    Note:
        The `if` statement checks an undefined variable `val` instead of `value`.

    Args:
        value (int): A number.

    Returns:
        bool: Whether `value` is less than ten.

    Raises:
        NameError: The function references `val`, which is not available in the local or global
            namespace, so a `NameError` is raised.
    """
    if (val < 10):
        return True
    else:
        return False

brokenRDD = subRDD.filter(brokenTen)
In [6]:
# Now we'll see the error
brokenRDD.collect()

(8b) Finding the bug

When the filter() method is executed, Spark evaluates the RDD by executing the parallelize() and filter() methods. Since our filter() method has an error in the filtering function brokenTen(), an error occurs.

Scroll through the output "Py4JJavaError Traceback (most recent call last)" part of the cell and first you will see that the line that generated the error is the collect() method line. There is nothing wrong with this line. However, it is an action and that caused other methods to be executed. Continue scrolling through the Traceback and you will see the following error line:

NameError: global name 'val' is not defined

Looking at this error line, we can see that we used the wrong variable name in our filtering function brokenTen().

(8c) Moving toward expert style

As you are learning Spark, I recommend that you write your code in the form:

RDD.transformation1()
RDD.action1()
RDD.transformation2()
RDD.action2()

Using this style will make debugging your code much easier as it makes errors easier to localize - errors in your transformations will occur when the next action is executed.

Once you become more experienced with Spark, you can write your code with the form:

RDD.transformation1().transformation2().action()

We can also use lambda() functions instead of separately defined functions when their use improves readability and conciseness.

In [97]:
# Cleaner code through lambda use
subRDD.filter(lambda x: x < 10).collect()
Out[97]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
In [98]:
# Even better by moving our chain of operators into a single line.
sc.parallelize(data).map(lambda y: y - 1).filter(lambda x: x < 10).collect()
Out[98]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

(8d) Readability and code style

To make the expert coding style more readable, enclose the statement in parentheses and put each method, transformation, or action on a separate line.

In [99]:
# Final version
(sc
 .parallelize(data)
 .map(lambda y: y - 1)
 .filter(lambda x: x < 10)
 .collect())
Out[99]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
comments powered by Disqus