443-970-2353
[email protected]
CV Resume
Jupyter is a web-based notebook which is used for data exploration, visualization, sharing and collaboration. It is an ideal environment for experimenting with different ideas and/or datasets. We can start with vague ideas and in Jupyter we can crystallize, after various experiments, our ideas for building our projects. It can also be used for staging data from a data lake to be used by BI and other tools. Jupyter supports more than 40 programming languages including Python, R, Scala, and Julia.
In this blog post, we will see how to use Jupyter to download data from the web and ingest the data to Hadoop Distributed File System (HDFS). Finally, we will explore our data in HDFS using Spark and create simple visualization. We will use Bay Area Bike Share's trip data from this website.
If you don't have a Hadoop cluster, you can download and deploy the Hortonworks Sandbox. It is a nice environment to practice the Hadoop ecosystem components and Spark. You can install Jupyter following this tutorial.
First, let's use the os module from Python to create a local directory.
import os
os.chdir("/home") # change to /home
os.getcwd() # checking that we have changed to home directory
We will create a local directory called bike_share to download our data to.
if not os.path.exists(r'/home/bike_share'): # if the directory does not
os.makedirs(r'/home/bike_share') # exist create it
os.chdir(r'/home/bike_share') # changing to "bike_share" directory
os.getcwd() # Are we in the bike_share directory?
! helps us to run a shell command
! pwd # checking the working directory is the bike_share directory
! ls # list files and folders in bike_share directory
# actually, the directory is empty at this time
Let's download Bay Area Bike Share's trip data from this website.
Let's use the Shell Interpreter by adding ! to download the data using wget
As you can see from the website, there are two bike share data sets.
(September 2014 - August 2015)
Let's download the first one.
! wget https://s3.amazonaws.com/babs-open-data/babs_open_data_year_1.zip
Now, let's see if the data has been downloaded.
! ls
Great! Now, let's unzip it.
! unzip babs_open_data_year_1.zip
Now, let's see the files and folders.
! ls
Let's remove the zipped file since we do not want it anymore.
! rm -f *.zip
! ls
Size of downloaded data:
! du -h
Now, let's see the contents of each folder.
! ls 201402_babs_open_data/
! ls 201408_babs_open_data/
! hadoop fs -mkdir /bike_data
Let's list the HDFS directories and check if "bike_data" exists.
! hadoop fs -ls /
! hadoop fs -put * /bike_data
Once we have ingested our data to HDFS, let's remove the bike data from the local directory.
! rm -rf *
! hadoop fs -ls /bike_data
Great, our data has been ingested. Now, let's use Spark and explore the data.
Let's open 201408_status_data.csv which has approx. 18 million records.
from pyspark.sql import SQLContext
sqlContext = HiveContext(sc)
status08 = sqlContext.read.format('com.databricks.spark.csv').\
options(header='true', inferSchema='true').\
load("hdfs://sandbox.hortonworks.com/bike_data/201408_babs_open_data/201408_status_data.csv")
Let's see number of records in the data.
print status08.count()
So, we can see that the data has more than 18 million observations.
What about the schema?
status08.printSchema()
status08.select("*").show(truncate=False)
Let's change the last column from string to time format so that we can do analysis on monthly basis.
from datetime import datetime
from pyspark.sql.functions import col,udf, unix_timestamp
from pyspark.sql.types import DateType
Let's create a user defined function that converts that last column from string to date and time.
myfunc = udf (lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'), DateType())
status_df = status08.withColumn('Date_and_Time', myfunc(col('time'))).drop('time')
status_df.printSchema()
from pyspark.sql.functions import dayofyear
day_to_bikes_available = status_df.select(status_df['bikes_available'],\
dayofyear(status_df['Date_and_Time']).alias('day'))
day_to_bikes_available_df = day_to_bikes_available.groupBy('day').max('bikes_available').\
alias('Count')
day_to_bikes_available_df.show()
days = day_to_bikes_available_df.map(lambda r:r[0]).collect()
bikes_available = day_to_bikes_available_df.map(lambda r:r[1]).collect()
import matplotlib.pyplot as plt
plt.plot(days,bikes_available, 'bo')
plt.axis([min(days)-5, max(days), min(bikes_available)-10, max(bikes_available)+10])
plt.xlabel('Day')
plt.ylabel('Maximu Bikes Available')
plt.axhline(linewidth=3, color='#999999')
plt.axvline(linewidth=2, color='#999999')
plt.show()
In this post, we saw how to fetch data from the web, ingested it to Hadoop Distributed File System (HDFS) and did some data transformation using Spark and visualization using Matplot, Python's plotting library. We used Jupyter, a great data science notebook, to perform all the tasks.