This is the fourth tutorial on the Spark RDDs Vs DataFrames vs SparkSQL blog post series. The first one is available here. In the first part, we saw how to retrieve, sort and filter data using Spark RDDs, DataFrames and SparkSQL. In the second part (here), we saw how to work with multiple tables in Spark the RDD way, the DataFrame way and with SparkSQL. In the third part (available here) of the blog post series, we performed web server log analysis using real-world text-based production logs. In this fourth part, we will see set operators in Spark the RDD way, the DataFrame way and the SparkSQL way.
Also, check out my other recent blog posts on Spark on Analyzing the Bible and the Quran using Spark and Spark DataFrames: Exploring Chicago Crimes.
The data and the notebooks can be downloaded from my GitHub repository.
The three types of set operators in RDD, DataFrame and SQL approach are shown below.
RDD
union: This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by a distinct.
intersection
subtract
DataFrame
unionAll/union: This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by a distinct. unionAll deprecated in Spark 2.0 use union instead.
intersect
subtract
SparkSQL
union all
intersect
except
The inputs set operations expect have to have the same variables (columns).
For this tutorial, we will work with the SalesLTCustomer.txt, and SalesLTCustomerAddress.txt datasets. Let's answer a couple of questions using Spark Resilient Distiributed (RDD) way, DataFrame way and SparkSQL by employing set operators.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf().setAppName("RDD Vs DataFrames Vs SparkSQL -part 4").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
sqlcontext = SQLContext(sc)
Create RDD
customer = sc.textFile("SalesLTCustomer.txt")
customer_address = sc.textFile("SalesLTCustomerAddress.txt")
Understand the data
customer.first()
customer_address.first()
As shown above, the data is tab delimited.
Remove the header from the RDD
customer_header = customer.first()
customer_rdd = customer.filter(lambda line: line != customer_header)
customer_address_header = customer_address.first()
customer_address_rdd = (customer_address.filter(lambda line: line != customer_address_header))
Create DataFrames, understand the schema and show sample data
customer_df = sqlcontext.createDataFrame(customer_rdd.map(lambda line: line.split("\t")),
schema = customer_header.split("\t") )
customer_df.printSchema()
customer_df.select(['CustomerID','FirstName','MiddleName','LastName','CompanyName']).show(10, truncate = False)
customer_address_df = sqlcontext.createDataFrame(customer_address_rdd.map(lambda line: line.split("\t")),
schema = customer_address_header.split("\t") )
customer_address_df.printSchema()
customer_address_df.show(10, truncate = False)
Register the DataFrames as Tables so as to excute SQL over the tables
customer_df.createOrReplaceTempView("customer_table")
customer_address_df.createOrReplaceTempView("customer_address_table")
1. Retrieve customers with only a main office address Write a query that returns the company name of each company that appears in a table of customers with a ‘Main Office’ address, but not in a table of customers with a ‘Shipping’ address.
SparkSQL way
sql1 = sqlcontext.sql("SELECT c.CompanyName \
FROM customer_table AS c INNER JOIN customer_address_table AS ca \
ON c.CustomerID = ca.CustomerID \
WHERE ca.AddressType = 'Main Office'\
EXCEPT\
SELECT c.CompanyName \
FROM customer_table AS c INNER JOIN customer_address_table AS ca \
ON c.CustomerID = ca.CustomerID \
WHERE ca.AddressType = 'Shipping' ORDER BY CompanyName")
sql1.show(5, truncate = False)
DataFrame way
df1 = ( (
customer_df.join(customer_address_df.filter(customer_address_df.AddressType == 'Main Office'),
'CustomerID', 'inner').select("CompanyName")
)
.subtract
(
(customer_df.join(customer_address_df.filter(customer_address_df.AddressType == 'Shipping'),
'CustomerID', 'inner').select("CompanyName")
)
)
).orderBy('CompanyName')
df1.show(5, truncate = False)
RDD way
z = (customer_rdd.map(lambda line: (line.split("\t")[0],line.split("\t")[7]))
.join(
customer_address_rdd.filter(lambda line: line.split("\t")[2] =='Main Office')
.map(lambda line: (line.split("\t")[0], (line.split("\t")[1])))
)
.map(lambda line: line[1][0]).distinct()
)
rdd1 = (
(customer_rdd.map(lambda line: (line.split("\t")[0],line.split("\t")[7]))
.join(
customer_address_rdd.filter(lambda line: line.split("\t")[2] =='Main Office')
.map(lambda line: (line.split("\t")[0], (line.split("\t")[1])))
)
.map(lambda line: line[1][0]).distinct()
)
.subtract(
(customer_rdd.map(lambda line: (line.split("\t")[0],line.split("\t")[7]))
.join(
customer_address_rdd.filter(lambda line: line.split("\t")[2] =='Shipping')
.map(lambda line: (line.split("\t")[0], (line.split("\t")[1])))
)
.map(lambda line: line[1][0]).distinct()
)
)
).collect()
sorted(rdd1)[:5]
We see that the first five companies from the SparkSQL way, RDD way and DataFrame way are the same but let's compare all the results.
The results from the SQL and DataFrame are of type pyspark.sql.types.Row. So, let's make them orginary Python lists.
df1.collect()[:5]
df = [i[0] for i in df1.collect()]
df[:5]
sql1.collect()[:5]
sql = [i[0] for i in sql1.collect()]
sql[:5]
Now, let's see if they have the same length.
[len(sql), len(rdd1), len(df)]
Next, let's check if they have the same elements. First, we have to soft our lists.
sorted(sql) ==sorted(rdd1)
sorted(sql) ==sorted(df)
sorted(df) ==sorted(rdd1)
Therefore, we see that the results from the SparkSQL appraoch, DataFrame approach and RDD approach are the same.
2.Retrieve only customers with both a main office address and a shipping address
Write a query that returns the company name of each company that appears in a table of customers with a ‘Main Office’ address, and also in a table of customers with a ‘Shipping’ address.
SparkSQL way
sqlcontext.sql("SELECT c.CompanyName \
FROM customer_table AS c INNER JOIN customer_address_table AS ca \
ON c.CustomerID = ca.CustomerID\
WHERE ca.AddressType = 'Main Office'\
INTERSECT\
SELECT c.CompanyName \
FROM customer_table AS c INNER JOIN customer_address_table AS ca \
ON c.CustomerID = ca.CustomerID \
WHERE ca.AddressType = 'Shipping' ORDER BY CompanyName").show(truncate = False)
There are only ten companies that have ‘Main Office’ address and ‘Shipping’ address.
DataFrame way
( (customer_df.join(customer_address_df.filter(customer_address_df.AddressType == 'Main Office'),
'CustomerID', 'inner')
.select("CompanyName")
)
.intersect
(
customer_df.join(customer_address_df.filter(customer_address_df.AddressType == 'Shipping'),
'CustomerID', 'inner')
.select("CompanyName")
)
.orderBy("CompanyName")
.show(truncate = False)
)
As shown above, the results from the SparkSQL approach and DataFrame approach are the same.
RDD way
result = (
(customer_rdd.map(lambda line: (line.split("\t")[0],line.split("\t")[7]))
.join(
customer_address_rdd.filter(lambda line: line.split("\t")[2] =='Main Office')
.map(lambda line: (line.split("\t")[0], (line.split("\t")[1],
line.split("\t")[2])))
)
.map(lambda line: line[1][0])
)
.intersection(
(customer_rdd.map(lambda line: (line.split("\t")[0],line.split("\t")[7]))
.join(
customer_address_rdd.filter(lambda line: line.split("\t")[2] =='Shipping')
.map(lambda line: (line.split("\t")[0], (line.split("\t")[1],
line.split("\t")[2])))
)
.map(lambda line: line[1][0])
)
)
).collect()
sorted(result)
The results from the RDD way are also the same to the DataFrame way and the SparkSQL way.
This is enough for today. In the next part of the Spark RDDs Vs DataFrames vs SparkSQL tutorial series, I will come with a different topic. If you have any questions, or suggestions, feel free to drop them below.