Fisseha Berhane, PhD

Data Scientist

443-970-2353 fisseha@jhu.edu CV Resume Linkedin GitHub twitter twitter

Big Data Analytics with Spark - Part 1

In this big data era, Spark, which is a fast and general engine for large-scale data processing, is the hottest big data tool. Spark is a cluster computing framework which is used for scalable and efficient analysis of big data. Other data analysis tools such as R and Pandas run on a single machine but with Spark, we can use many machines which divide the tasks among themselves and perform fault tolerant computations by distributing the data over a cluster.

Spark programs run up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk. Among the many capabilities of Spark, which made it famous, is its ability to be used with various programing languages through APIs. We can write Spark operations in Java, Scala, Python or R. Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3.

Spark components consist of Core Spark, Spark SQL, MLlib and ML for machine learning and GraphX for graph analytics. In this blog post, we will focus on Spark SQL.

Here, I am using the flights data from the nycflights13 R package. I downloaded the data from R and uploaded it to Databricks. Databricks have a free community edition. So, you can use it to learn Spark.


In this Notebook, the outputs from the Spark actions are not included because the page becomes slow to load. But if you want to see the code results, I have them here. If you open it with chrome, you can see the whole notebook but may be not in Firefox or other browsers.

Read data

In order to use Spark and its DataFrame API, we will need to use a SQLContext. When running Spark, we start Spark application by creating a SparkContext. We can then create a SQLContext from the SparkContext.

In [5]:
spark_DF = sqlContext.read.format("csv").options(header='true', inferSchema='true').load("FileStore/tables/jvo1773d1466128441021/flights.csv")

Let's display the first 1000 rows.

In [7]:
display(spark_DF)

But how many rows do we have in the data frame?

In [9]:
print spark_DF.count()

Before doing any analysis, let's see the schema. We can also see the type of the DataFrame.

In [11]:
spark_DF.printSchema()
In [12]:
print type(spark_DF)
Spark Operations

Now, let's perform various Spark DataFrame operations.

select helps us to select one or more columns. * means all columns

In [15]:
m=spark_DF.select("*")

display(m)
In [16]:
spark_DF.select('year','month','day').show(10)

drop helps us to drop column(s).

In [18]:
display(m.drop("month"))

We can also change the column names.

In [20]:
m=spark_DF.select(spark_DF.year.alias('Year'), spark_DF.month.alias('Month'), spark_DF.day.alias('Day'))
display(m)
In [21]:
from pyspark.sql.functions import length

m=spark_DF.select(spark_DF.origin,length(spark_DF.origin).alias('Length'))
display(m)

We can use filter or where to filter certain rows.

In [23]:
m=spark_DF.filter("month =1") # selecting January only
display(m)
In [24]:
m=spark_DF.filter(spark_DF.month > 9)
display(m)
In [25]:
m=spark_DF.filter(spark_DF.month < 9).filter('month>=5')
display(m)

distinct() filters out duplicate rows, and it considers all columns.

In [27]:
m=spark_DF.select('month').filter(spark_DF.month < 9).filter('month>=5').distinct().collect()
display(m)

where() is an alias for filter().

In [29]:
m=spark_DF.where("month <= 4")
display(m)
In [30]:
m=spark_DF.where(spark_DF.month==12)
display(m)

We can also use user defined functions

In [32]:
from pyspark.sql.types import BooleanType
even_months = udf(lambda s: s%2==0, BooleanType())  # select even months only
m = spark_DF.filter(even_months(spark_DF.month))
display(m.select('month').distinct())    # want to see the distinct months
In [33]:
even_months = udf(lambda s: s in (6,7,8,12), BooleanType())  # select June-August and December
m = spark_DF.filter(even_months(spark_DF.month))
display(m.select('month').distinct())

groupBy() is one of the most powerful transformations. It allows you to perform aggregations on a DataFrame.

Unlike other DataFrame transformations, groupBy() does not return a DataFrame. Instead, it returns a special GroupedData object that contains various aggregation functions.

The most commonly used aggregation function is count(), but there are others (like sum(), max(), and avg().

These aggregation functions typically create a new column and return a new DataFrame.

In [35]:
m=spark_DF.groupBy(spark_DF.carrier).avg('distance')
display(m)

Averages of groups using DataFrames. orderBy() allows you to sort a DataFrame by one or more columns, producing a new DataFrame.

In [37]:
m=spark_DF.groupBy(spark_DF.carrier).avg('distance').orderBy('carrier')
display(m)

in orderBy(), the default is ascending but we can change it to descending by setting "ascending=False".

In [39]:
m=spark_DF.groupBy(spark_DF.carrier).avg('distance').orderBy('carrier',ascending=False)
display(m)
In [40]:
m=spark_DF.groupBy(spark_DF.carrier).avg('distance')
display(m.sort('carrier',ascending=False))

Use the count function to find the number of times each carrier occurs

In [42]:
m=spark_DF.groupBy(spark_DF.carrier).count()
display(m)

We can also calculate maximum, minimum and sum of groups using DataFrames

In [44]:
m=spark_DF.groupBy(spark_DF.carrier).max('distance','hour')
display(m)
In [45]:
m=spark_DF.groupBy(spark_DF.carrier).min('distance','hour')
display(m)
In [46]:
m=spark_DF.groupBy(spark_DF.carrier).sum('distance')
display(m)

SQL statements in Spark

In [48]:
 # SQL statements can be run by using the sql methods provided by `spark`
  
# Register this DataFrame as a table.

spark_DF.registerTempTable("spark_DF")
In [49]:
m = sqlContext.sql("SELECT * FROM spark_DF WHERE month >= 5 AND month <= 9")
display(m)
In [50]:
m = sqlContext.sql("SELECT * FROM spark_DF WHERE month >= 5 AND month <= 9")
display(m)
In [51]:
m = sqlContext.sql("SELECT carrier, max(distance) AS Maximum_Distance FROM spark_DF GROUP BY carrier ORDER BY Maximum_Distance DESC")
display(m)
In [52]:
m = sqlContext.sql("SELECT DISTINCT(carrier) FROM spark_DF")
display(m)
In [53]:
m = sqlContext.sql("SELECT DISTINCT(month) FROM spark_DF")
display(m)
In [54]:
m = sqlContext.sql("SELECT DISTINCT month FROM spark_DF WHERE month > 3 AND month < 10")
display(m)
In [55]:
m = sqlContext.sql("SELECT DISTINCT month FROM spark_DF WHERE month BETWEEN 4 AND 9")
display(m)
In [56]:
m = sqlContext.sql("SELECT carrier, max(distance) AS Maximum_Distance FROM spark_DF GROUP BY carrier ORDER BY Maximum_Distance DESC LIMIT 5")
display(m)
In [57]:
m = sqlContext.sql("SELECT carrier, max(distance) AS Maximum_Distance FROM spark_DF GROUP BY carrier HAVING Maximum_Distance > 2000 ORDER BY Maximum_Distance DESC")

display(m)
In [58]:
m = sqlContext.sql("SELECT DISTINCT month FROM spark_DF WHERE month !=2")
display(m)
In [59]:
m = sqlContext.sql("SELECT DISTINCT carrier FROM spark_DF WHERE month =1 OR month = 12")
display(m)
In [60]:
m = sqlContext.sql("SELECT DISTINCT carrier FROM spark_DF WHERE carrier LIKE ('A%')")
display(m)
comments powered by Disqus