Fisseha Berhane, PhD

Data Scientist

443-970-2353 fisseha@jhu.edu CV Resume Linkedin GitHub twitter twitter



Big Data Analytics with Spark - Part 2

Most of the time, data analysis involves more than one table of data. Therefore, it is important to know techniques that enable us to combine data from various tables. In this blog post, let's see how we can work with joins in Spark. This is part two of my data analytics with Spark article series. You can get part one here.

Here, we will see inner, left outer, right outer, full outer, and other joins.

We will use the FDA adverse events data, which are publicly available. The FDA adverse events datasets can be downloaded in .csv format from the National Bureau of Economic Research. The adverse events datasets are created in quarterly temporal resolution and each quarter data includes demography information, drug/biologic information, adverse event, outcome and diagnosis, etc. I downloaded the data for the fourth quarter of 2015 and uploaded it to Databricks. Databricks have a free community edition. So, you can use it to learn Spark.

In this Notebook, the outputs from the Spark actions are not included because the page becomes slow to load. But if you want to see the code results, I have them here. If you open it with chrome, you can see the whole notebook but may be not in Firefox or other browsers.

Read demography data
In [4]:
demography = sqlContext.read.format("csv").options(header='true', inferSchema='true').load("/FileStore/tables/lv21dnqe1467649455111/demo15q4.csv")

print 'Demography data of the fourth quarter of 2015  has {0} observations'.format(demography.count())
In [5]:
print "Let's see the schema of the demography data"

demography.printSchema()

We can also display some of the rows of the demography data.

In [7]:
display(demography)
Read indications data
In [9]:
indications = sqlContext.read.format("csv").options(header='true', inferSchema='true').load("/FileStore/tables/extjbvpg1467623985565/indi2015q4.csv")

print 'Indications data of the fourth quarter of 2015  has {0} rows '.format(indications.count())
In [10]:
print 'This is the schema of the indications data'

indications.printSchema()

Let's display some of the indications

In [12]:
display(indications)
Read drugs data
In [14]:
drugs = sqlContext.read.format("csv").options(header='true', inferSchema='true').load("/FileStore/tables/lv21dnqe1467649455111/drug2015Q4.csv")

print 'Drugs data of the fourth quarter of 2015  has {0} rows '.format(drugs.count())
In [15]:
print 'Below is the schema of the drugs data'

drugs.printSchema()

Now, let's display some observations from the drugs data to see some of its contents.

In [17]:
display(drugs)
Read reactions data
In [19]:
reactions = sqlContext.read.format("csv").options(header='true', inferSchema='true').load("/FileStore/tables/extjbvpg1467623985565/reac2015q4.csv")

print 'Reactions data of the fourth quarter of 2015  has {0} observations '.format(reactions.count())
In [20]:
print "Let's see the schema of the reactions data"
reactions.printSchema()

Displaying some observations from the reactions data.

In [22]:
display(reactions)
Read outcomes data
In [24]:
outcomes = sqlContext.read.format("csv").options(header='true', inferSchema='true').load("/FileStore/tables/extjbvpg1467623985565/outc2015q4.csv")

print 'Outcomes data of the fourth quarter of 2015  has {0} observations '.format(outcomes.count())
In [25]:
print 'This is the schema of the outcomes data'
outcomes.printSchema()

JOINS

We will use the join function from the pyspark.sql.functions module to perform various joins. We will also use pure SQL commands to acheive the same tasks.

Inner Join

Let's start by joining the demography and drugs data. Here, we are interested in observations that are common to both tables. So, we are using inner join. An inner join requires each row in the two joined tables to have matching rows, and is a commonly used join operation in applications. Inner join creates a new result table by combining column values of two tables (A and B) based upon the join-predicate. The query compares each row of A with each row of B to find all pairs of rows which satisfy the join-predicate. When the join-predicate is satisfied by matching non-NULL values, column values for each matched pair of rows of A and B are combined into a result row.

The three ways below give the same result and they are all inner joins between demography an drugs data based on 'primaryid' column.

In [29]:
demo_drug_inner_A=demography.join(drugs,demography['primaryid']==demography['primaryid'], how='inner')
demo_drug_inner_B=demography.join(drugs,demography.primaryid==demography.primaryid, how='inner')
demo_drug_inner_C=demography.join(drugs,'primaryid', how='inner') # primaryid is a common column name in both tables
SQL

We can also use SQL. Let's use inner join to join the demography and drugs tables.

We can execute SQL commands within a python notebook by invoking %sql or using sqlContext.sql(...).

To perform SQL queries, we have to register our DataFrames using registerTempTable. Temporary tables are not meant to be persistent, i.e. they will not survive cluster restarts.

In [32]:
demography.registerTempTable("demographyTempTable")
drugs.registerTempTable("drugsTempTable")
indications.registerTempTable("indicationsTempTable")
outcomes.registerTempTable("outcomesTempTable")
reactions.registerTempTable("reactionsTempTable")

Using SQL, let's see the demography column names and their types using the temporary table we registred.

In [34]:
%sql describe demographyTempTable  

Now, SQL inner join

In [36]:
demo_drug_inner_SQL = sqlContext.sql("SELECT * FROM demographyTempTable demo INNER JOIN drugsTempTable drugs on demo.primaryid=drugs.primaryid") 

Left Outer Join

What if we want to keep all the demography data and add columns to the demography data from the drugs data based on the primaryid column? In this case, we use left outer join. Left outer join returns all rows from the left table, and the matched rows from the right table.

The three ways below give the same result and they are all left outer joins between demography an drugs data based on 'primaryid' column.

In [39]:
demo_drug_left_A=demography.join(drugs,demography['primaryid']==demography['primaryid'], how='left_outer')
demo_drug_left_B=demography.join(drugs,demography.primaryid==demography.primaryid, how='left_outer')
demo_drug_left_C=demography.join(drugs,'primaryid', how='left_outer') # primaryid is a common column name in both tables

Now, using SQL:

In [41]:
demo_drug_left_SQL = sqlContext.sql("SELECT * FROM demographyTempTable demo LEFT OUTER JOIN drugsTempTable drugs on demo.primaryid=drugs.primaryid")

Right Outer Join

Now, let's say we want all the data from the drugs table but from the demography table, we want only rows that have maching primaryid in the drugs table. To acheive this purpose, we use right outer join. Right outer join returns all rows from the right table, and the matched rows from the left table.

The three ways below give the same result and they are all right outer joins between demography an drugs data based on 'primaryid' column.

In [44]:
demo_drug_right_A=demography.join(drugs,demography['primaryid']==demography['primaryid'], how='right_outer')
demo_drug_right_B=demography.join(drugs,demography.primaryid==demography.primaryid, how='right_outer')
demo_drug_right_C=demography.join(drugs,'primaryid', how='right_outer') # primaryid is a common column name in both tables

Now, using SQL:

In [46]:
demo_drug_right_SQL = sqlContext.sql("SELECT * FROM demographyTempTable demo RIGHT OUTER JOIN drugsTempTable drugs on demo.primaryid=drugs.primaryid")

Full Outer Join

If we want to keep all the rows from both tables, we use full outer join. Full outer join returns all rows when there is a match in ONE of the tables.

The three ways below give the same result and they are all full outer joins between demography an drugs data based on 'primaryid' column.

In [49]:
demo_drug_outer_A=demography.join(drugs,demography['primaryid']==demography['primaryid'], how='outer')
demo_drug_outer_B=demography.join(drugs,demography.primaryid==demography.primaryid, how='outer')
demo_drug_outer_C=demography.join(drugs,'primaryid', how='outer') # primaryid is a common column name in both tables

Equivalent SQL code for full outer join:

In [51]:
demo_drug_outer_SQL = sqlContext.sql("SELECT * FROM demographyTempTable demo FULL OUTER JOIN drugsTempTable drugs on demo.primaryid=drugs.primaryid")

Using more than one joining fields

We can also use more than one joining fields. To demonstrate this, le's join the drugs table with the indications table. We are using the primaryid and drug_seq fields from the drugs table, and primaryid and indi_drug_seq from the indications table.

In [54]:
cond = [drugs.primaryid==indications.primaryid, drugs.drug_seq==indications.indi_drug_seq] # defining a join-predicate
In [55]:
drug_indi_inner = drugs.join(indications,cond, how='inner')
drug_indi_left = drugs.join(indications,cond, how='left_outer')
drug_indi_right = drugs.join(indications,cond, how='right_outer')
drug_indi_outer = drugs.join(indications,cond, how='outer')

Now, let's see the equivalent SQL codes that give the same tasks

SQL joins with multiple joining fields
In [58]:
drug_indi_inner_SQL = sqlContext.sql("SELECT * FROM drugsTempTable drugs INNER JOIN indicationsTempTable indi on drugs.primaryid=indi.primaryid AND drugs.drug_seq=indi.indi_drug_seq ")


drug_indi_left_SQL = sqlContext.sql("SELECT * FROM drugsTempTable drugs LEFT OUTER JOIN indicationsTempTable indi on drugs.primaryid=indi.primaryid AND drugs.drug_seq=indi.indi_drug_seq ")

drug_indi_right_SQL = sqlContext.sql("SELECT * FROM drugsTempTable drugs RIGHT OUTER JOIN indicationsTempTable indi on drugs.primaryid=indi.primaryid AND drugs.drug_seq=indi.indi_drug_seq ")

drug_indi_outer_SQL = sqlContext.sql("SELECT * FROM drugsTempTable drugs FULL OUTER JOIN indicationsTempTable indi on drugs.primaryid=indi.primaryid AND drugs.drug_seq=indi.indi_drug_seq ")

Left Outer Join Excluding Inner Join

Let's assume we want rows in the demography table that do not match any records in the drugs table, in this scenario, we use left outer join excluding inner join.

In [61]:
result = demography.join(drugs, demography['primaryid']==drugs['primaryid'],'left').where(drugs['primaryid'].isNull())

SQL:

In [63]:
result=sqlContext.sql("SELECT * FROM demographyTempTable demo LEFT JOIN drugsTempTable drugs ON demo.primaryid = drugs.primaryid WHERE drugs.primaryid IS NULL")

Right Outer Join Excluding Inner Join

If we want rows in the drugs table that do not match any records in the demography table, we use right outer join excluding inner join.

In [66]:
result =demography.join(drugs, demography['primaryid']==drugs['primaryid'],'right').where(demography['primaryid'].isNull())

SQL:

In [68]:
result=sqlContext.sql("SELECT * FROM demographyTempTable demo RIGHT JOIN drugsTempTable drugs ON demo.primaryid = drugs.primaryid WHERE demo.primaryid IS NULL")

Full Outer Join Excluding Inner Join

This kind of join gives all records but those that are matching.

In [71]:
result =demography.join(drugs, demography['primaryid']==drugs['primaryid'],'outer').where((demography['primaryid'].isNull()) | (drugs['primaryid'].isNull()))

SQL:

In [73]:
result=sqlContext.sql("SELECT * FROM demographyTempTable demo FULL OUTER JOIN drugsTempTable drugs ON demo.primaryid = drugs.primaryid WHERE demo.primaryid IS NULL OR drugs.primaryid IS NULL")

Summary

In this blog post, we saw how to perform various joins using Spark via PySpark. We used Spark SQL functions and equivalent SQL codes that accomplish the same tasks.

In the next blog post, we will see working with multiple tables and window functions.

comments powered by Disqus