Fisseha Berhane, PhD

Data Scientist

CV Resume Linkedin GitHub twitter twitter

Hive Partitioning with Spark

In this post, I will show how to perform Hive partitioning in Spark and talk about its benefits, including performance.

Why I ended up using partitioning

I am currently working on clustering users based on subsection pageviews. I have a couple of functions to achieve that. One of them cleans the data by removing those with too high and too small pageviews. The next function further cleans the data by normalizing it and writes an intermediate table. The next function scales the cleaned data by applying TF-IDF and centering it. Then, the next function applies principal component analysis (PCA) on the scaled data. Finally, the last function uses the PCs from the PCA function to create clusters using K-means clustering and saves a table with predictions for each ID. The functions have start date and end date arguments which should be passed to the functions. They also have detail arguments with default values. The arguments of the final function look like below:

In [ ]:
k_means_clustering_subsection(start_date, 
                              end_date, 
                              pageviews_threshold = 5, 
                              bot_threshold_percentage = 0.0025, 
                              variance_to_retain = 99.0)

We want to experiment by changing the default values and also the start and end dates. However, we do not want to create many tables for each experiment. Therefore, the solution is to create a partitioned table for each function that writes a table and then for each experiment we can add a column of constant value to be used for partitioning. When we want to join the tables, we can use the value of the partition column.

How is partitioning beneficial?

Partitioning is mainly helpful when we need to filter our data based on specific column values. When we partition tables, subdirectories are created under the table’s data directory for each unique value of a partition column. Therefore, when we filter the data based on a specific column, Hive does not need to scan the whole table; it rather goes to the appropriate partition which improves the performance of the query. Similarly, if the table is partitioned on multiple columns, nested subdirectories are created based on the order of partition columns provided in our table definition. We will see these things with examples.

Hands On Example

We will work with New York City 311 service requests. The data can be downloaded from NYC Opendata. The data has about 19 million rows and 41 columns and it is about 11 GB. It is from 2010 to present.

We can see the column names using the head shell command.

In [1]:
!head -1 311_Service_Requests_from_2010_to_Present.csv
Unique Key,Created Date,Closed Date,Agency,Agency Name,Complaint Type,Descriptor,Location Type,Incident Zip,Incident Address,Street Name,Cross Street 1,Cross Street 2,Intersection Street 1,Intersection Street 2,Address Type,City,Landmark,Facility Type,Status,Due Date,Resolution Description,Resolution Action Updated Date,Community Board,BBL,Borough,X Coordinate (State Plane),Y Coordinate (State Plane),Open Data Channel Type,Park Facility Name,Park Borough,Vehicle Type,Taxi Company Borough,Taxi Pick Up Location,Bridge Highway Name,Bridge Highway Direction,Road Ramp,Bridge Highway Segment,Latitude,Longitude,Location

Let's create a string containing all the columns shown above. Then, we will split it and create a list of column names

In [73]:
columns = """Unique Key,Created Date,Closed Date,Agency,Agency Name,Complaint Type,Descriptor,Location Type,Incident Zip,Incident Address,Street Name,Cross Street 1,Cross Street 2,Intersection Street 1,Intersection Street 2,Address Type,City,Landmark,Facility Type,Status,Due Date,Resolution Description,Resolution Action Updated Date,Community Board,BBL,Borough,X Coordinate,Y Coordinate,Open Data Channel Type,Park Facility Name,Park Borough,Vehicle Type,Taxi Company Borough,Taxi Pick Up Location,Bridge Highway Name,Bridge Highway Direction,Road Ramp,Bridge Highway Segment,Latitude,Longitude,Location"""

I converted the column names to lower case and replaced space by underscore not to face issues when inserting data into a table.

In [74]:
columns = columns.replace(" ",'_').replace('\n','').lower().split(",")
columns[:10]  # just showing the first few column names
Out[74]:
['unique_key',
 'created_date',
 'closed_date',
 'agency',
 'agency_name',
 'complaint_type',
 'descriptor',
 'location_type',
 'incident_zip',
 'incident_address']

As noted on the website), there are 41 columns.

In [50]:
len(columns)  
Out[50]:
41

Initializing SparkSession

In this tutorial, I am using stand alone Spark and instantiated SparkSession with Hive support which creates spark-warehouse.

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().config("spark.network.timeout", '200s').getOrCreate()
from pyspark.sql.types import *

Define schema

In [52]:
nyc_schema = StructType([StructField(colName, StringType(), True) for colName in columns])      

For simplicity, I will assume all the columns are string. We will change the created_date column to timestamp later because we want to extract year from it and use the year column for partitioning.

Read Data

In [53]:
nyc311 = spark.read.csv("311_Service_Requests_from_2010_to_Present.csv",
                         header = True, 
                         schema = nyc_schema)

Show sample data

In [75]:
nyc311.select('created_date','agency_name','city').show(10, truncate = False)
+----------------------+-------------------------------+-------------------+
|created_date          |agency_name                    |city               |
+----------------------+-------------------------------+-------------------+
|10/11/2018 09:48:53 AM|Department of Transportation   |BRONX              |
|10/11/2018 10:40:48 PM|Department of Consumer Affairs |NEW YORK           |
|10/11/2018 03:38:45 PM|Department of Consumer Affairs |SOUTH RICHMOND HILL|
|10/11/2018 10:32:01 AM|Department of Consumer Affairs |SUNNYSIDE          |
|10/11/2018 11:27:35 AM|Department of Buildings        |BRONX              |
|10/11/2018 04:10:38 PM|Department of Buildings        |BROOKLYN           |
|10/11/2018 09:43:55 AM|Department of Transportation   |BROOKLYN           |
|10/11/2018 05:35:41 AM|Department of Transportation   |BROOKLYN           |
|10/14/2018 07:14:24 PM|New York City Police Department|BRONX              |
|10/20/2015 03:41:02 PM|Department of Transportation   |BROOKLYN           |
+----------------------+-------------------------------+-------------------+
only showing top 10 rows

Change data type of the created_date column to timestamp and create year column

Now, let's change the data type of the created_date from string to timestamp format and create year column from it. We will use the year column for partitioning.

In [55]:
from datetime import datetime
from pyspark.sql.functions import col,udf, year

to_date_time =  udf(lambda x: datetime.strptime(x, '%m/%d/%Y %I:%M:%S %p'), TimestampType())
nyc311_2 = nyc311.withColumn('created_date', to_date_time(col('created_date'))).withColumn('year', year('created_date'))
nyc311_2.select(['created_date', 'year']).show(10)
+-------------------+----+
|       created_date|year|
+-------------------+----+
|2018-10-11 09:48:53|2018|
|2018-10-11 22:40:48|2018|
|2018-10-11 15:38:45|2018|
|2018-10-11 10:32:01|2018|
|2018-10-11 11:27:35|2018|
|2018-10-11 16:10:38|2018|
|2018-10-11 09:43:55|2018|
|2018-10-11 05:35:41|2018|
|2018-10-14 19:14:24|2018|
|2015-10-20 15:41:02|2015|
+-------------------+----+
only showing top 10 rows

Create Partitioned Table

Inserting the data to the tables took more time and I ended using half of the data (about 9.4 million records) after sampling 50% of the records as shown below.

In [61]:
nyc311_2.sample(False, 0.5, 42).createOrReplaceTempView('nyc311_table')
In [65]:
create_partitioned_table_sql = """CREATE TABLE if not exists nyc311_orc_partitioned (unique_key STRING, created_date STRING, closed_date STRING, agency STRING, agency_name STRING, complaint_type STRING, descriptor STRING, location_type STRING, incident_zip STRING, incident_address STRING, street_name STRING, cross_street_1 STRING, cross_street_2 STRING, intersection_street_1 STRING, intersection_street_2  STRING, address_type STRING, city  STRING, landmark STRING, facility_type STRING, status STRING, due_date STRING, resolution_description  STRING,resolution_action_updated_date STRING, community_board STRING, bbl STRING, borough STRING, x_coordinate STRING, y_coordinate STRING, open_data_channel_type STRING, park_facility_name STRING, park_borough STRING, vehicle_type STRING, taxi_company_borough STRING, taxi_pick_up_location STRING, bridge_highway_name STRING, bridge_highway_direction  STRING, road_ramp STRING, bridge_highway_segment STRING, latitude STRING, longitude STRING, location STRING) PARTITIONED BY (year INT) STORED AS ORC """
In [66]:
spark.sql(create_partitioned_table_sql)
Out[66]:
DataFrame[]

Dynamic partitioning is disabled by default. We enable it by setting hive.exec.dynamic.partition.mode to True.

In [67]:
set_dynamic_mode  = "SET hive.exec.dynamic.partition.mode = nonstrict"
spark.sql(set_dynamic_mode)
Out[67]:
DataFrame[key: string, value: string]

Insert data into partitioned table

In [68]:
insert_sql = """INSERT INTO nyc311_orc_partitioned PARTITION (year) SELECT unique_key, created_date, closed_date, agency, agency_name, complaint_type, descriptor, location_type, incident_zip, incident_address, street_name, cross_street_1, cross_street_2, intersection_street_1, intersection_street_2, address_type, city, landmark, facility_type, status, due_date, resolution_description, resolution_action_updated_date, community_board, bbl, borough, x_coordinate, y_coordinate, open_data_channel_type, park_facility_name, park_borough, vehicle_type, taxi_company_borough, taxi_pick_up_location, bridge_highway_name, bridge_highway_direction, road_ramp, bridge_highway_segment, latitude, longitude, location, year FROM nyc311_table"""
In [69]:
spark.sql(insert_sql)
Out[69]:
DataFrame[]

For dynamic partitioning to work in Hive, the partition column should be the last column in insert_sql above. Note: make sure the column names are lower case. That worked for me but I was getting errors with upper case column names.

We can see the partitions of our table as below:

In [76]:
spark.sql("show partitions nyc311_orc_partitioned").show()
+-----------+
|  partition|
+-----------+
|year=2010|
|year=2011|
|year=2012|
|year=2013|
|year=2014|
|year=2015|
|year=2016|
|year=2017|
|year=2018|
+-----------+

Create unpartitioned table

In [62]:
spark.sql('create table nyc311_orc_unpartitioned stored as orc as select * from nyc311_table')
Out[62]:
DataFrame[]

Compare data directory

The directory for the non-partitioned table contains all the data files. The partitioned table, on the other hand, contains subdirectories for each unique value of the partition column (year in our case).

In [90]:
import os

Below, I show datafiles from the unpartitioned table.

In [96]:
os.listdir("spark-warehouse/nyc311_orc_unpartitioned")[:10]
Out[96]:
['.part-00019-a89ad98f-a0aa-4a78-8056-9776b806741e-c000.crc',
 '.part-00067-a89ad98f-a0aa-4a78-8056-9776b806741e-c000.crc',
 'part-00066-a89ad98f-a0aa-4a78-8056-9776b806741e-c000',
 'part-00013-a89ad98f-a0aa-4a78-8056-9776b806741e-c000',
 'part-00040-a89ad98f-a0aa-4a78-8056-9776b806741e-c000',
 '.part-00036-a89ad98f-a0aa-4a78-8056-9776b806741e-c000.crc',
 'part-00035-a89ad98f-a0aa-4a78-8056-9776b806741e-c000',
 '.part-00048-a89ad98f-a0aa-4a78-8056-9776b806741e-c000.crc',
 '.part-00044-a89ad98f-a0aa-4a78-8056-9776b806741e-c000.crc',
 'part-00079-a89ad98f-a0aa-4a78-8056-9776b806741e-c000']

The partitioned table, on the other hand, contains subdirectories for each unique value of the partition column, as shown below.

In [94]:
os.listdir("spark-warehouse/nyc311_orc_partitioned")
Out[94]:
['year=2013',
 'year=2017',
 'year=2010',
 'year=2014',
 'year=2011',
 'year=2015',
 'year=2012',
 'year=2018',
 'year=2016']

Below, I show the ten datafiles from the year=2010 sub-directory.

In [95]:
os.listdir("spark-warehouse/nyc311_orc_partitioned/year=2010.0")[:10]
Out[95]:
['part-00046-4d119e3d-79db-4975-9b5a-7215c9efe46e.c000',
 '.part-00073-4d119e3d-79db-4975-9b5a-7215c9efe46e.c000.crc',
 'part-00060-4d119e3d-79db-4975-9b5a-7215c9efe46e.c000',
 '.part-00022-4d119e3d-79db-4975-9b5a-7215c9efe46e.c000.crc',
 'part-00059-4d119e3d-79db-4975-9b5a-7215c9efe46e.c000',
 '.part-00001-4d119e3d-79db-4975-9b5a-7215c9efe46e.c000.crc',
 '.part-00064-4d119e3d-79db-4975-9b5a-7215c9efe46e.c000.crc',
 'part-00058-4d119e3d-79db-4975-9b5a-7215c9efe46e.c000',
 '.part-00035-4d119e3d-79db-4975-9b5a-7215c9efe46e.c000.crc',
 '.part-00039-4d119e3d-79db-4975-9b5a-7215c9efe46e.c000.crc']

Compare time taken to query from both tables

Finally, let's query from both tables and compare time taken.

In [2]:
import time
In [3]:
t0 = time.time()
aggregates = spark.sql("select agency_name, city, incident_zip, count(*) as frequency from nyc311_orc_unpartitioned where year = '2010' group by agency_name, city, incident_zip").toPandas()
t1 = time.time()

print('time taken with unpartitioned table ', round(t1-t0,2))
time taken with unpartitioned table  7.43
In [11]:
t0 = time.time()

aggregates = spark.sql("select agency_name, city,incident_zip, count(*) as frequency from nyc311_orc_partitioned where year = '2010' group by agency_name, city, incident_zip").toPandas()
t1 = time.time()

print('time taken with partitioned table ', round(t1-t0,2))
time taken with partitioned table  1.87

Final Remarks

  • Partitioning helps to improve performance specially when we filter large volume data based on specific column values.

  • In our queries above, response time decreased by 75% (from 7.43 sec to 1.87sec).

  • It can also help us to organize our data in ways that work better for our use cases (for example, to partition each experiment output in a partioned table)

comments powered by Disqus