This is the second blog post on the Spark tutorial series to help big data enthusiasts prepare for Apache Spark Certification from companies such as Cloudera, Hortonworks, Databricks, etc. The first one is here. If you want to learn/master Spark with Python or if you are preparing for a Spark Certification to show your skills in big data, these articles are for you.
In this tutorial, we will analyze crimes data from data.gov. The dataset reflects reported incidents of crime (with the exception of murders where data exists for each victim) that occurred in the City of Chicago since 2001.
A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files. It is the entry point to programming Spark with the DataFrame API. We can create a SparkSession, usfollowing builder pattern:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Chicago_crime_analysis").getOrCreate()
We can let Spark infer the schema of our csv data but proving pre-defined schema makes the reading process faster. Further,it helps us to make the colum names to have the format we want, for example, to avoid spaces in the names of the columns.
from pyspark.sql.types import (StructType,
StructField,
DateType,
BooleanType,
DoubleType,
IntegerType,
StringType,
TimestampType)
crimes_schema = StructType([StructField("ID", StringType(), True),
StructField("CaseNumber", StringType(), True),
StructField("Date", StringType(), True ),
StructField("Block", StringType(), True),
StructField("IUCR", StringType(), True),
StructField("PrimaryType", StringType(), True ),
StructField("Description", StringType(), True ),
StructField("LocationDescription", StringType(), True ),
StructField("Arrest", BooleanType(), True),
StructField("Domestic", BooleanType(), True),
StructField("Beat", StringType(), True),
StructField("District", StringType(), True),
StructField("Ward", StringType(), True),
StructField("CommunityArea", StringType(), True),
StructField("FBICode", StringType(), True ),
StructField("XCoordinate", DoubleType(), True),
StructField("YCoordinate", DoubleType(), True ),
StructField("Year", IntegerType(), True),
StructField("UpdatedOn", DateType(), True ),
StructField("Latitude", DoubleType(), True),
StructField("Longitude", DoubleType(), True),
StructField("Location", StringType(), True )
])
crimes = spark.read.csv("Chicago_crimes_2001_to_present.csv",
header = True,
schema = crimes_schema)
First, let'se see how many rows the crimes dataframe has:
print(" The crimes dataframe has {} records".format(crimes.count()))
We can also see the columns, the data type of each column and the schema using the commands below.
crimes.columns
crimes.dtypes
crimes.printSchema()
We can also quickly see some rows as below. We select one or more columns using select. show helps us to print the first n rows.
crimes.select("Date").show(10, truncate = False)
The Date column is in string format. Let's change it to timestamp format using the user defined functions (udf).
withColumn helps to create a new column and we remove one or more columns with drop.
from datetime import datetime
from pyspark.sql.functions import col,udf
myfunc = udf(lambda x: datetime.strptime(x, '%m/%d/%Y %I:%M:%S %p'), TimestampType())
df = crimes.withColumn('Date_time', myfunc(col('Date'))).drop("Date")
df.select(df["Date_time"]).show(5)
We can calculate the statistics of string and numeric columns using describe. When we select more than one columns, we have to pass the column names as a python list.
crimes.select(["Latitude","Longitude","Year","XCoordinate","YCoordinate"]).describe().show()
The above numbers are ugly. Let's round them using format_number from PySpark's the functions.
from pyspark.sql.functions import format_number
result = crimes.select(["Latitude","Longitude","Year","XCoordinate","YCoordinate"]).describe()
result.select(result['summary'],
format_number(result['Latitude'].cast('float'),2).alias('Latitude'),
format_number(result['Longitude'].cast('float'),2).alias('Longitude'),
result['Year'].cast('int').alias('year'),
format_number(result['XCoordinate'].cast('float'),2).alias('XCoordinate'),
format_number(result['YCoordinate'].cast('float'),2).alias('YCoordinate')
).show()
distinct returns distinct elements.
crimes.select("PrimaryType").distinct().count()
We can also see a list of the primary crime types.
crimes.select("PrimaryType").distinct().show(n = 35)
crimes.where(crimes["PrimaryType"] == "HOMICIDE").count()
Make sure to add in the parenthesis separating the statements!
crimes.filter((crimes["PrimaryType"] == "ASSAULT") & (crimes["Domestic"] == "True")).count()
We can use filter or where to do filtering.
columns = ['PrimaryType', 'Description', 'Arrest', 'Domestic']
crimes.where((crimes["PrimaryType"] == "HOMICIDE") & (crimes["Arrest"] == "true"))\
.select(columns).show(10)
We can use limit to limit the number of columns we want to retrieve from a dataframe.
crimes.select(columns).limit(10). show(truncate = True)
lat_max = crimes.agg({"Latitude" : "max"}).collect()[0][0]
print("The maximum latitude values is {}".format(lat_max))
Let's subtract each latitude value from the maximum latitude.
df = crimes.withColumn("difference_from_max_lat",lat_max - crimes["Latitude"])
df.select(["Latitude", "difference_from_max_lat"]).show(5)
Let's rename Latitude to Lat.
df = crimes.withColumnRenamed("Latitude", "Lat")
df.columns
columns = ['PrimaryType', 'Description', 'Arrest', 'Domestic','Lat']
df.orderBy(df["Lat"].desc()).select(columns).show(10)
Calculate average latitude value.
from pyspark.sql.functions import mean
df.select(mean("Lat")).alias("Mean Latitude").show()
We can also use the agg method to calculate the average.
df.agg({"Lat":"avg"}).show()
from pyspark.sql.functions import max,min
df.select(max("Xcoordinate"),min("Xcoordinate")).show()
df.filter(df["Domestic"]==True).count()/df.count() * 100
from pyspark.sql.functions import corr
df.select(corr("Lat","Ycoordinate")).show()
df.groupBy("Year").count().show()
df.groupBy("Year").count().collect()
count = [item[1] for item in df.groupBy("Year").count().collect()]
year = [item[0] for item in df.groupBy("Year").count().collect()]
number_of_crimes_per_year = {"count":count, "year" : year}
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
number_of_crimes_per_year = pd.DataFrame(number_of_crimes_per_year)
number_of_crimes_per_year.head()
number_of_crimes_per_year = number_of_crimes_per_year.sort_values(by = "year")
number_of_crimes_per_year.plot(figsize = (20,10), kind = "bar", color = "red",
x = "year", y = "count", legend = False)
plt.xlabel("", fontsize = 18)
plt.ylabel("Number of Crimes", fontsize = 18)
plt.title("Number of Crimes Per Year", fontsize = 28)
plt.xticks(size = 18)
plt.yticks(size = 18)
plt.show()
we can use the month function from PySpark's functions to get the numeric month.
from pyspark.sql.functions import month
monthdf = df.withColumn("Month",month("Date_time"))
monthCounts = monthdf.select("Month").groupBy("Month").count()
monthCounts.show()
monthCounts = monthCounts.collect()
months = [item[0] for item in monthCounts]
count = [item[1] for item in monthCounts]
crimes_per_month = {"month":months, "crime_count": count}
crimes_per_month = pd.DataFrame(crimes_per_month)
crimes_per_month = crimes_per_month.sort_values(by = "month")
crimes_per_month.plot(figsize = (20,10), kind = "line", x = "month", y = "crime_count",
color = "red", linewidth = 8, legend = False)
plt.xlabel("Month", fontsize = 18)
plt.ylabel("Number of Crimes", fontsize = 18)
plt.title("Number of Crimes Per Month", fontsize = 28)
plt.xticks(size = 18)
plt.yticks(size = 18)
plt.show()
crimes.groupBy("LocationDescription").count().show()
crime_location = crimes.groupBy("LocationDescription").count().collect()
location = [item[0] for item in crime_location]
count = [item[1] for item in crime_location]
crime_location = {"location" : location, "count": count}
crime_location = pd.DataFrame(crime_location)
crime_location = crime_location.sort_values(by = "count", ascending = False)
crime_location.iloc[:5]
crime_location = crime_location.iloc[:20]
myplot = crime_location .plot(figsize = (20,20), kind = "barh", color = "#b35900", width = 0.8,
x = "location", y = "count", legend = False)
myplot.invert_yaxis()
plt.xlabel("Number of crimes", fontsize = 28)
plt.ylabel("Crime Location", fontsize = 28)
plt.title("Number of Crimes By Location", fontsize = 36)
plt.xticks(size = 24)
plt.yticks(size = 24)
plt.show()
Let's add day of week and hour of day columns using the date_format.
from pyspark.sql.functions import date_format
df = df.withColumn("DayOfWeek", date_format("Date_time","E")).\
withColumn("DayOfWeek_number", date_format("Date_time","u")).\
withColumn("HourOfDay", date_format("Date_time","H"))
weekDaysCount = df.groupBy(["DayOfWeek", "DayOfWeek_number"]).count()
weekDaysCount.show()
We can also print the schema to see the columns.
df.printSchema()
weekDaysCount = df.groupBy(["DayOfWeek", "DayOfWeek_number"]).count().collect()
days = [item[0] for item in weekDaysCount]
count = [item[2] for item in weekDaysCount]
day_number = [item[1] for item in weekDaysCount]
crime_byDay = {"days" : days, "count": count, "day_number": day_number}
crime_byDay = pd.DataFrame(crime_byDay)
crime_byDay = crime_byDay.sort_values(by = "day_number", ascending = True)
crime_byDay.plot(figsize = (20,10), kind = "line", x = "days", y = "count",
color = "red", linewidth = 8, legend = False)
plt.ylabel("Number of Crimes", fontsize = 18)
plt.xlabel("")
plt.title("Number of Crimes by Day", fontsize = 28)
plt.xticks(size = 18)
plt.yticks(size = 18)
plt.show()
weekDaysCount = df.filter(df["Domestic"] == "true").groupBy(["DayOfWeek", "DayOfWeek_number"]).count().collect()
days = [item[0] for item in weekDaysCount]
count = [item[2] for item in weekDaysCount]
day_number = [item[1] for item in weekDaysCount]
crime_byDay = {"days" : days, "count": count, "day_number": day_number}
crime_byDay = pd.DataFrame(crime_byDay)
crime_byDay = crime_byDay.sort_values(by = "days", ascending = True)
crime_byDay.plot(figsize = (20,10), kind = "line", x = "days", y = "count",
color = "red", linewidth = 8, legend = False)
plt.ylabel("Number of Domestic Crimes", fontsize = 18)
plt.xlabel("")
plt.title("Number of domestic crimes by day", fontsize = 28)
plt.xticks(size = 18)
plt.yticks(size = 18)
plt.show()
temp = df.filter(df["Domestic"] == "true")
temp = temp.select(df['HourOfDay'].cast('int').alias('HourOfDay'))
hourlyCount = temp.groupBy(["HourOfDay"]).count().collect()
hours = [item[0] for item in hourlyCount]
count = [item[1] for item in hourlyCount]
crime_byHour = {"count": count, "hours": hours}
crime_byHour = pd.DataFrame(crime_byHour)
crime_byHour = crime_byHour.sort_values(by = "hours", ascending = True)
crime_byHour.plot(figsize = (20,10), kind = "line", x = "hours", y = "count",
color = "green", linewidth = 5, legend = False)
plt.ylabel("Number of Domestic Crimes", fontsize = 18)
plt.xlabel("Hour", fontsize = 18)
plt.title("Number of domestic crimes by hour", fontsize = 28)
plt.xticks(size = 18)
plt.yticks(size = 18)
plt.show()
import seaborn as sns
temp = df.filter(df["Domestic"] == "true")
temp = temp.select("DayOfWeek", df['HourOfDay'].cast('int').alias('HourOfDay'))
hourlyCount = temp.groupBy(["DayOfWeek","HourOfDay"]).count().collect()
days = [item[0] for item in hourlyCount]
hours = [item[1] for item in hourlyCount]
count = [item[2] for item in hourlyCount]
crime_byHour = {"count": count, "hours": hours, "days": days}
crime_byHour = pd.DataFrame(crime_byHour)
crime_byHour = crime_byHour.sort_values(by = "hours", ascending = True)
import seaborn as sns
g = sns.FacetGrid(crime_byHour, hue="days", size = 12)
g.map(plt.plot, "hours", "count", linewidth = 3)
g.add_legend()
plt.ylabel("Number of Domestic Crimes", fontsize = 18)
plt.xlabel("Hour", fontsize = 18)
plt.title("Number of domestic crimes by day and hour", fontsize = 28)
plt.xticks(size = 18)
plt.yticks(size = 18)
plt.show()
This is the second blog post on the Spark tutorial series to help big data enthusiasts prepare for Apache Spark Certifications from companies such as Cloudera, Hortonworks, Databricks, etc. The first one is here. If you want to learn/master Spark with Python or if you are preparing for a Spark Certification to show your skills in big data, these articles are for you.