Fisseha Berhane, PhD

Data Scientist

443-970-2353 [email protected] CV Resume Linkedin GitHub twitter twitter

Data Ingestion to a Hadoop Data Lake with Jupyter

Jupyter is a web-based notebook which is used for data exploration, visualization, sharing and collaboration. It is an ideal environment for experimenting with different ideas and/or datasets. We can start with vague ideas and in Jupyter we can crystallize, after various experiments, our ideas for building our projects. It can also be used for staging data from a data lake to be used by BI and other tools. Jupyter supports more than 40 programming languages including Python, R, Scala, and Julia.

In this blog post, we will see how to use Jupyter to download data from the web and ingest the data to Hadoop Distributed File System (HDFS). Finally, we will explore our data in HDFS using Spark and create simple visualization. We will use Bay Area Bike Share's trip data from this website.

If you don't have a Hadoop cluster, you can download and deploy the Hortonworks Sandbox. It is a nice environment to practice the Hadoop ecosystem components and Spark. You can install Jupyter following this tutorial.

First, let's use the os module from Python to create a local directory.

In [1]:
import os
os.chdir("/home")   # change to /home
In [2]:
os.getcwd()        # checking that we have changed to home directory
Out[2]:
'/home'

We will create a local directory called bike_share to download our data to.

In [3]:
if not os.path.exists(r'/home/bike_share'): #  if the directory does not 
    os.makedirs(r'/home/bike_share')        #  exist create it 
In [4]:
os.chdir(r'/home/bike_share')     # changing to "bike_share" directory
In [5]:
os.getcwd()      # Are we in the bike_share directory?
Out[5]:
'/home/bike_share'
In [ ]:
! helps us to run a shell command
In [6]:
! pwd        # checking the working directory is the bike_share directory
/home/bike_share
In [7]:
! ls    # list files and folders in bike_share directory
        # actually, the directory is empty at this time

Download data

Let's download Bay Area Bike Share's trip data from this website.

Let's use the Shell Interpreter by adding ! to download the data using wget

As you can see from the website, there are two bike share data sets.

  • (August 2013 - August 2014)
  • (September 2014 - August 2015)

    Let's download the first one.

In [8]:
! wget https://s3.amazonaws.com/babs-open-data/babs_open_data_year_1.zip
--2016-08-10 17:31:21--  https://s3.amazonaws.com/babs-open-data/babs_open_data_year_1.zip
Resolving s3.amazonaws.com... 54.231.16.136
Connecting to s3.amazonaws.com|54.231.16.136|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 84402747 (80M) [application/zip]
Saving to: “babs_open_data_year_1.zip”

100%[======================================>] 84,402,747  4.34M/s   in 17s     

2016-08-10 17:31:39 (4.73 MB/s) - “babs_open_data_year_1.zip” saved [84402747/84402747]

Now, let's see if the data has been downloaded.

In [9]:
! ls
babs_open_data_year_1.zip

Great! Now, let's unzip it.

In [10]:
! unzip babs_open_data_year_1.zip 
Archive:  babs_open_data_year_1.zip
   creating: 201402_babs_open_data/
  inflating: 201402_babs_open_data/201402_station_data.csv  
  inflating: 201402_babs_open_data/201402_status_data.csv  
  inflating: 201402_babs_open_data/201402_trip_data.csv  
  inflating: 201402_babs_open_data/201402_weather_data.csv  
  inflating: 201402_babs_open_data/README.txt  
   creating: 201408_babs_open_data/
  inflating: 201408_babs_open_data/201408_station_data.csv  
  inflating: 201408_babs_open_data/201408_status_data.csv  
  inflating: 201408_babs_open_data/201408_trip_data.csv  
  inflating: 201408_babs_open_data/201408_weather_data.csv  
  inflating: 201408_babs_open_data/README.txt  

Now, let's see the files and folders.

In [11]:
! ls
201402_babs_open_data  201408_babs_open_data  babs_open_data_year_1.zip

Let's remove the zipped file since we do not want it anymore.

In [12]:
! rm -f *.zip
In [13]:
! ls
201402_babs_open_data  201408_babs_open_data

Size of downloaded data:

In [14]:
! du -h
611M	./201402_babs_open_data
643M	./201408_babs_open_data
1.3G	.

Now, let's see the contents of each folder.

In [15]:
! ls 201402_babs_open_data/
201402_station_data.csv  201402_trip_data.csv	  README.txt
201402_status_data.csv	 201402_weather_data.csv
In [16]:
! ls 201408_babs_open_data/
201408_station_data.csv  201408_trip_data.csv	  README.txt
201408_status_data.csv	 201408_weather_data.csv

Ingest data to Hadoop Distributed File System (HDFS)

Create directory

In [19]:
! hadoop fs -mkdir /bike_data

Let's list the HDFS directories and check if "bike_data" exists.

In [21]:
! hadoop fs -ls /
Found 12 items
drwxrwxrwx   - yarn   hadoop          0 2016-03-14 14:19 /app-logs
drwxr-xr-x   - hdfs   hdfs            0 2016-03-14 14:25 /apps
drwxr-xr-x   - yarn   hadoop          0 2016-03-14 14:19 /ats
drwxr-xr-x   - root   hdfs            0 2016-08-10 18:27 /bike_data
drwxr-xr-x   - hdfs   hdfs            0 2016-03-14 14:50 /demo
drwxr-xr-x   - hdfs   hdfs            0 2016-03-14 14:19 /hdp
drwxr-xr-x   - mapred hdfs            0 2016-03-14 14:19 /mapred
drwxrwxrwx   - mapred hadoop          0 2016-03-14 14:19 /mr-history
drwxr-xr-x   - hdfs   hdfs            0 2016-03-14 14:42 /ranger
drwxrwxrwx   - spark  hadoop          0 2016-08-10 17:28 /spark-history
drwxrwxrwx   - hdfs   hdfs            0 2016-03-14 14:31 /tmp
drwxr-xr-x   - hdfs   hdfs            0 2016-03-14 14:33 /user

Put data to the directory

In [22]:
! hadoop fs -put * /bike_data

Once we have ingested our data to HDFS, let's remove the bike data from the local directory.

In [24]:
! rm -rf *
Now, let's check if our data has been ingested.
In [1]:
! hadoop fs -ls  /bike_data
Found 2 items
drwxr-xr-x   - root hdfs          0 2016-08-10 18:29 /bike_data/201402_babs_open_data
drwxr-xr-x   - root hdfs          0 2016-08-10 18:29 /bike_data/201408_babs_open_data

Great, our data has been ingested. Now, let's use Spark and explore the data.

Let's open 201408_status_data.csv which has approx. 18 million records.

In [1]:
from pyspark.sql import SQLContext
sqlContext = HiveContext(sc)
In [2]:
status08 = sqlContext.read.format('com.databricks.spark.csv').\
options(header='true', inferSchema='true').\
load("hdfs://sandbox.hortonworks.com/bike_data/201408_babs_open_data/201408_status_data.csv")

Let's see number of records in the data.

In [3]:
print status08.count()
18342210

So, we can see that the data has more than 18 million observations.

What about the schema?

In [4]:
status08.printSchema()
root
 |-- station_id: integer (nullable = true)
 |-- bikes_available: integer (nullable = true)
 |-- docks_available: integer (nullable = true)
 |-- time: string (nullable = true)

In [5]:
status08.select("*").show(truncate=False)
+----------+---------------+---------------+-------------------+
|station_id|bikes_available|docks_available|time               |
+----------+---------------+---------------+-------------------+
|2         |12             |15             |2014-03-01 00:00:02|
|2         |12             |15             |2014-03-01 00:01:03|
|2         |12             |15             |2014-03-01 00:02:03|
|2         |12             |15             |2014-03-01 00:03:02|
|2         |12             |15             |2014-03-01 00:04:03|
|2         |12             |15             |2014-03-01 00:05:02|
|2         |12             |15             |2014-03-01 00:06:03|
|2         |12             |15             |2014-03-01 00:07:02|
|2         |13             |14             |2014-03-01 00:08:03|
|2         |13             |14             |2014-03-01 00:09:02|
|2         |13             |14             |2014-03-01 00:10:03|
|2         |13             |14             |2014-03-01 00:11:03|
|2         |13             |14             |2014-03-01 00:12:02|
|2         |13             |14             |2014-03-01 00:13:02|
|2         |13             |14             |2014-03-01 00:14:02|
|2         |13             |14             |2014-03-01 00:15:02|
|2         |13             |14             |2014-03-01 00:16:03|
|2         |13             |14             |2014-03-01 00:17:03|
|2         |13             |14             |2014-03-01 00:18:03|
|2         |13             |14             |2014-03-01 00:19:02|
+----------+---------------+---------------+-------------------+
only showing top 20 rows

Let's change the last column from string to time format so that we can do analysis on monthly basis.

In [6]:
from datetime import datetime
from pyspark.sql.functions import col,udf, unix_timestamp
from pyspark.sql.types import DateType

Let's create a user defined function that converts that last column from string to date and time.

In [7]:
myfunc =  udf (lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'), DateType())
In [8]:
status_df = status08.withColumn('Date_and_Time', myfunc(col('time'))).drop('time')
In [9]:
status_df.printSchema()
root
 |-- station_id: integer (nullable = true)
 |-- bikes_available: integer (nullable = true)
 |-- docks_available: integer (nullable = true)
 |-- Date_and_Time: date (nullable = true)

In [10]:
from pyspark.sql.functions import dayofyear
In [11]:
day_to_bikes_available = status_df.select(status_df['bikes_available'],\
                                          dayofyear(status_df['Date_and_Time']).alias('day'))

day_to_bikes_available_df = day_to_bikes_available.groupBy('day').max('bikes_available').\
                                  alias('Count')

day_to_bikes_available_df.show()
+---+--------------------+
|day|max(bikes_available)|
+---+--------------------+
|231|                  27|
|232|                  23|
|233|                  24|
|234|                  26|
|235|                  23|
|236|                  23|
|237|                  25|
|238|                  23|
|239|                  27|
|240|                  27|
|241|                  27|
|242|                  23|
|243|                  27|
| 60|                  22|
| 61|                  21|
| 62|                  27|
| 63|                  24|
| 64|                  25|
| 65|                  27|
| 66|                  27|
+---+--------------------+
only showing top 20 rows

In [13]:
days = day_to_bikes_available_df.map(lambda r:r[0]).collect()

bikes_available = day_to_bikes_available_df.map(lambda r:r[1]).collect()
In [15]:
import matplotlib.pyplot as plt
In [23]:
plt.plot(days,bikes_available, 'bo')
plt.axis([min(days)-5, max(days), min(bikes_available)-10, max(bikes_available)+10])
plt.xlabel('Day')
plt.ylabel('Maximu Bikes Available')
plt.axhline(linewidth=3, color='#999999')
plt.axvline(linewidth=2, color='#999999')
plt.show()

In this post, we saw how to fetch data from the web, ingested it to Hadoop Distributed File System (HDFS) and did some data transformation using Spark and visualization using Matplot, Python's plotting library. We used Jupyter, a great data science notebook, to perform all the tasks.

comments powered by Disqus