Fisseha Berhane, PhD

Data Scientist

443-970-2353 fisseha@jhu.edu CV Resume Linkedin GitHub twitter twitter

Integrating Hadoop and BI tools: Analyzing and Visualizing Big Data in Tableau with Spark



In the last four plus months, I have been working on interesting projects with Hadoop ecosystem and I will blog on some of my experiences in this session and in the next couple of blog posts. This blog post is on how to integrate Hadoop with business intelligence (BI) tools, such as Tableau, and leverage the capabilities of Spark for querying data. We can query data in Hive by using Tableau connector for Spark SQL. The Spark ODBC connector can be downloaded from Databricks. Integrating Spark and business intelligence tools, such as Tableau, enables us to process massive amounts of data by taking advantage of the in-memeory faster processing speed of Spark and we have the ability to scale up or down based on the size of our data.

A Hadoop data lake helps us to strore massive amount of data and data of different type and shape cheaply. We can store data with well-defined data model, unstructured data such as social media posts and binary data such as imgages and videos. And, we can use Spark to process unstructured data in our data lake and stage it for BI tools.

In this tutorial, I am using Hortonworks Data Platform (HDP), which is an open source Apache Hadoop distribution, deployed on Amazon Web Services (AWS). This link and this Youtube video show the steps on how to deploy a Hadoop Cluster on Amazon EC2 with HDP.

In this blog post, we will do the following:
  1. Download and ingest data to Hadoop Distributed File System (HDFS)
  2. Process the data with Spark
  3. Stage data by persisting to Hive
  4. Connect Tableau with Spark cluster
  5. Analyze and visualize with Tableau

First, let's use the os module from Python to create a local directory.

In [3]:
import os                            # provides a portable way of using operating system dependent functionality
os.mkdir("/root/power_consumption")  # create directory to download data to
os.chdir("/root/power_consumption")  # change working directory
os.getcwd()                         # check we are in the directory we just created
Out[3]:
'/root/power_consumption'

Download data

Here data from the UC Irvine Machine Learning Repository, a popular repository for machine learning datasets, will be used to show how to process data with Spark and stage it to Hive, and finally query it with Spark SQL using ODBC connector. In particular, I am using the “Individual household electric power consumption Data Set” which can be downloaded from here.

In [4]:
import requests, zipfile, StringIO

url="http://archive.ics.uci.edu/ml/machine-learning-databases/00235/household_power_consumption.zip"

r = requests.get(url, stream=True)
z = zipfile.ZipFile(StringIO.StringIO(r.content))
z.extractall()

Check if data has been downloaded

In [5]:
working_directory = os.getcwd()
os.listdir(working_directory)
Out[5]:
['household_power_consumption.txt']

Cool. The data has been downloaded.

Ingest data to Hadoop Distributed File System (HDFS)

Create directory

I am using Jupyter Notebook and I can use the Shell Interpreter by adding !

In [7]:
! hadoop fs -mkdir /tmp/power_consumption

Let's list the HDFS directories and check if "power_consumption" exists.

In [8]:
! hadoop fs -ls /tmp
Found 13 items
drwxr-xr-x   - ec2-user  hdfs          0 2016-08-04 21:41 /tmp/FAERS
drwx------   - ambari-qa hdfs          0 2016-07-24 03:11 /tmp/ambari-qa
drwxrwxrwx   - admin     hdfs          0 2016-08-19 11:06 /tmp/bike
drwxr-xr-x   - root      hdfs          0 2016-08-23 21:17 /tmp/bike_data
-rw-r--r--   3 root      hdfs     760179 2016-08-16 10:53 /tmp/elasticsearch-hadoop-2.3.4.jar
drwxr-xr-x   - hdfs      hdfs          0 2016-07-24 03:08 /tmp/entity-file-history
drwx-wx-wx   - ambari-qa hdfs          0 2016-08-22 11:36 /tmp/hive
-rwxr-xr-x   3 hdfs      hdfs       2161 2016-07-24 03:08 /tmp/id1fac5323_date082416
-rwxr-xr-x   3 ambari-qa hdfs       2161 2016-07-24 03:14 /tmp/idtest.ambari-qa.1469344450.52.in
-rwxr-xr-x   3 ambari-qa hdfs        957 2016-07-24 03:14 /tmp/idtest.ambari-qa.1469344450.52.pig
drwxr-xr-x   - root      hdfs          0 2016-08-29 00:08 /tmp/power_consumption
drwxr-xr-x   - ambari-qa hdfs          0 2016-07-24 03:12 /tmp/tezsmokeinput
drwxr-xr-x   - ambari-qa hdfs          0 2016-07-24 03:12 /tmp/tezsmokeoutput


We can also see the directory we just created using Apache Ambari.


Put data to the directory

In [9]:
! hadoop fs -put household_power_consumption.txt /tmp/power_consumption

Once we have ingested our data to HDFS, let's remove the "household_power_consumption.txt" data from the local directory.

In [10]:
! rm -rf household_power_consumption.txt
Now, let's check if our data has been ingested.
In [11]:
! hadoop fs -ls  /tmp/power_consumption
Found 1 items
-rw-r--r--   3 root hdfs  132960755 2016-08-29 00:09 /tmp/power_consumption/household_power_consumption.txt


We can also see the ingested data using Apache Ambari.




Process data using Spark to create a dataframe

In [1]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
In [5]:
power_cons = sc.textFile("/tmp/power_consumption/household_power_consumption.txt")
In [6]:
column_names= power_cons.map(lambda x: x.split(";")).first()
In [7]:
column_names
Out[7]:
[u'Date',
 u'Time',
 u'Global_active_power',
 u'Global_reactive_power',
 u'Voltage',
 u'Global_intensity',
 u'Sub_metering_1',
 u'Sub_metering_2',
 u'Sub_metering_3']
Define schema
In [8]:
from pyspark.sql.types import StructType

schema = StructType.fromJson({'fields': [{'metadata': {},
   'name': 'Date','nullable': True,'type': 'string'},
  {'metadata': {}, 'name': 'Time', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'Global_active_power', 'nullable': True, 'type': 'integer'},
  {'metadata': {}, 'name': 'Global_reactive_power', 'nullable': True, 'type': 'string'},
  {'metadata': {},'name': 'Voltage','nullable': True,'type': 'integer'},
  {'metadata': {}, 'name': 'Global_intensity', 'nullable': True, 'type': 'integer'},
  {'metadata': {}, 'name': 'Sub_metering_1', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'Sub_metering_2', 'nullable': True, 'type': 'integer'},
  {'metadata': {}, 'name': 'Sub_metering_3', 'nullable': True, 'type': 'string'},
  ],
 'type': 'struct'})
In [9]:
power_cons.take(5)
Out[9]:
[u'Date;Time;Global_active_power;Global_reactive_power;Voltage;Global_intensity;Sub_metering_1;Sub_metering_2;Sub_metering_3',
 u'16/12/2006;17:24:00;4.216;0.418;234.840;18.400;0.000;1.000;17.000',
 u'16/12/2006;17:25:00;5.360;0.436;233.630;23.000;0.000;1.000;16.000',
 u'16/12/2006;17:26:00;5.374;0.498;233.290;23.000;0.000;2.000;17.000',
 u'16/12/2006;17:27:00;5.388;0.502;233.740;23.000;0.000;1.000;17.000']
In [10]:
header= power_cons.first()  # this is the header
power_cons=power_cons.filter(lambda x:x !=header)          # filter out header
In [11]:
power_cons.take(5)
Out[11]:
[u'16/12/2006;17:24:00;4.216;0.418;234.840;18.400;0.000;1.000;17.000',
 u'16/12/2006;17:25:00;5.360;0.436;233.630;23.000;0.000;1.000;16.000',
 u'16/12/2006;17:26:00;5.374;0.498;233.290;23.000;0.000;2.000;17.000',
 u'16/12/2006;17:27:00;5.388;0.502;233.740;23.000;0.000;1.000;17.000',
 u'16/12/2006;17:28:00;3.666;0.528;235.680;15.800;0.000;1.000;17.000']
In [12]:
power_cons= power_cons.map(lambda x: x.split(";"))

Create DataFrame

In [14]:
 power_cons=sqlContext.createDataFrame(power_cons, schema)
In [15]:
power_cons.count() #number of records
Out[15]:
2075259
In [16]:
power_cons.printSchema()
root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Global_active_power: integer (nullable = true)
 |-- Global_reactive_power: string (nullable = true)
 |-- Voltage: integer (nullable = true)
 |-- Global_intensity: integer (nullable = true)
 |-- Sub_metering_1: string (nullable = true)
 |-- Sub_metering_2: integer (nullable = true)
 |-- Sub_metering_3: string (nullable = true)

We can use Spark SQL to process and query the data
In [25]:
power_cons.select("Global_reactive_power","Sub_metering_1","Sub_metering_3").show(5, truncate=False)
+---------------------+--------------+--------------+
|Global_reactive_power|Sub_metering_1|Sub_metering_3|
+---------------------+--------------+--------------+
|0.418                |0.000         |17.000        |
|0.436                |0.000         |16.000        |
|0.498                |0.000         |17.000        |
|0.502                |0.000         |17.000        |
|0.528                |0.000         |17.000        |
+---------------------+--------------+--------------+
only showing top 5 rows

Let's select some of the variables and then persist it to Hive.

In [26]:
power_cons= power_cons.select("Date","Time","Global_reactive_power","Sub_metering_1","Sub_metering_3")

Staging data by persisting it to Hive

In [27]:
power_cons.registerTempTable("mytempTable")
sqlContext.sql("DROP TABLE IF EXISTS power_consumption")
sqlContext.sql("CREATE TABLE power_consumption STORED AS ORC AS SELECT * FROM mytempTable")
Out[27]:
DataFrame[]

Checking that the table persists in Hive

In [30]:
sqlContext.sql("SELECT Date, max(Sub_metering_1) as Maxmimum_Daily_Voltage FROM power_consumption group by Date order by Date").show()
+---------+----------------------+
|     Date|Maxmimum_Daily_Voltage|
+---------+----------------------+
| 1/1/2007|                 0.000|
| 1/1/2008|                 6.000|
| 1/1/2009|                 5.000|
| 1/1/2010|                 9.000|
|1/10/2007|                39.000|
|1/10/2008|                 9.000|
|1/10/2009|                 8.000|
|1/10/2010|                 8.000|
|1/11/2007|                 0.000|
|1/11/2008|                 0.000|
|1/11/2009|                38.000|
|1/11/2010|                 0.000|
|1/12/2007|                 9.000|
|1/12/2008|                 6.000|
|1/12/2009|                 0.000|
| 1/2/2007|                38.000|
| 1/2/2008|                 4.000|
| 1/2/2009|                     ?|
| 1/2/2010|                 4.000|
| 1/3/2007|                 0.000|
+---------+----------------------+
only showing top 20 rows


We can also see the staged data in Hive using Apache Ambari.



Great! Our data is in Hive and we can now query it from Tableau using Spark SQL and Spark ODBC connector.

Connecting with Tableau


Make sure that Spark Thrift Server is running in your cluster.




Next

Open Tableau and go to the connection options and choose Spark SQL. Make sure you have downloaded and installed the Spark ODBC connector. You can find it from Databricks.



Next, provide the addess of your server, the port and other authentication credentials.



You can get the port of Spark Thrift Server in the Spark congiguration settings.




Once you have connected Tableau with the data stored in Hive though Spark ODBC, you can easily and quickly create amazing dashboards in Tableau..




Summary

In this blog post, we saw how to ingest data to Hadoop, process it with Spark and stage it to Hive. We then created a Tableau dashboard by first connecting Tableau with the Hive data though Spark ODBC connector. This approach enables us to take advanatge of Spark's in-memory fast computing power and analyze and visualize big data with our favorite BI tools. Further, we can scale our cluster up or down depending on our requirements.




comments powered by Disqus