443-970-2353
[email protected]
CV Resume
In the last four plus months, I have been working on interesting projects with Hadoop ecosystem and I will blog on some of my experiences in this session and in the next couple of blog posts. This blog post is on how to integrate Hadoop with business intelligence (BI) tools, such as Tableau, and leverage the capabilities of Spark for querying data. We can query data in Hive by using Tableau connector for Spark SQL. The Spark ODBC connector can be downloaded from Databricks. Integrating Spark and business intelligence tools, such as Tableau, enables us to process massive amounts of data by taking advantage of the in-memeory faster processing speed of Spark and we have the ability to scale up or down based on the size of our data.
A Hadoop data lake helps us to strore massive amount of data and data of different type and shape cheaply. We can store data with well-defined data model, unstructured data such as social media posts and binary data such as imgages and videos. And, we can use Spark to process unstructured data in our data lake and stage it for BI tools.
In this tutorial, I am using Hortonworks Data Platform (HDP), which is an open source Apache Hadoop distribution, deployed on Amazon Web Services (AWS). This link and this Youtube video show the steps on how to deploy a Hadoop Cluster on Amazon EC2 with HDP.
First, let's use the os module from Python to create a local directory.
import os # provides a portable way of using operating system dependent functionality
os.mkdir("/root/power_consumption") # create directory to download data to
os.chdir("/root/power_consumption") # change working directory
os.getcwd() # check we are in the directory we just created
Here data from the UC Irvine Machine Learning Repository, a popular repository for machine learning datasets, will be used to show how to process data with Spark and stage it to Hive, and finally query it with Spark SQL using ODBC connector. In particular, I am using the “Individual household electric power consumption Data Set” which can be downloaded from here.
import requests, zipfile, StringIO
url="http://archive.ics.uci.edu/ml/machine-learning-databases/00235/household_power_consumption.zip"
r = requests.get(url, stream=True)
z = zipfile.ZipFile(StringIO.StringIO(r.content))
z.extractall()
working_directory = os.getcwd()
os.listdir(working_directory)
Cool. The data has been downloaded.
I am using Jupyter Notebook and I can use the Shell Interpreter by adding !
! hadoop fs -mkdir /tmp/power_consumption
Let's list the HDFS directories and check if "power_consumption" exists.
! hadoop fs -ls /tmp
We can also see the directory we just created using Apache Ambari.
! hadoop fs -put household_power_consumption.txt /tmp/power_consumption
Once we have ingested our data to HDFS, let's remove the "household_power_consumption.txt" data from the local directory.
! rm -rf household_power_consumption.txt
! hadoop fs -ls /tmp/power_consumption
We can also see the ingested data using Apache Ambari.
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
power_cons = sc.textFile("/tmp/power_consumption/household_power_consumption.txt")
column_names= power_cons.map(lambda x: x.split(";")).first()
column_names
from pyspark.sql.types import StructType
schema = StructType.fromJson({'fields': [{'metadata': {},
'name': 'Date','nullable': True,'type': 'string'},
{'metadata': {}, 'name': 'Time', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'Global_active_power', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'Global_reactive_power', 'nullable': True, 'type': 'string'},
{'metadata': {},'name': 'Voltage','nullable': True,'type': 'integer'},
{'metadata': {}, 'name': 'Global_intensity', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'Sub_metering_1', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'Sub_metering_2', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'Sub_metering_3', 'nullable': True, 'type': 'string'},
],
'type': 'struct'})
power_cons.take(5)
header= power_cons.first() # this is the header
power_cons=power_cons.filter(lambda x:x !=header) # filter out header
power_cons.take(5)
power_cons= power_cons.map(lambda x: x.split(";"))
power_cons=sqlContext.createDataFrame(power_cons, schema)
power_cons.count() #number of records
power_cons.printSchema()
power_cons.select("Global_reactive_power","Sub_metering_1","Sub_metering_3").show(5, truncate=False)
Let's select some of the variables and then persist it to Hive.
power_cons= power_cons.select("Date","Time","Global_reactive_power","Sub_metering_1","Sub_metering_3")
power_cons.registerTempTable("mytempTable")
sqlContext.sql("DROP TABLE IF EXISTS power_consumption")
sqlContext.sql("CREATE TABLE power_consumption STORED AS ORC AS SELECT * FROM mytempTable")
sqlContext.sql("SELECT Date, max(Sub_metering_1) as Maxmimum_Daily_Voltage FROM power_consumption group by Date order by Date").show()
We can also see the staged data in Hive using Apache Ambari.
Great! Our data is in Hive and we can now query it from Tableau using Spark SQL and Spark ODBC connector.
Make sure that Spark Thrift Server is running!.
Open Tableau and go to the connection options and choose Spark SQL. Make sure you have downloaded and installed the Spark ODBC connector. You can find it from Databricks.
Next, provide the addess of your server, the port and other authentication credentials.
You can get the port of Spark Thrift Server in the Spark congiguration settings.
Once you have connected Tableau with the data stored in Hive though Spark ODBC, you can easily and quickly create amazing dashboards in Tableau..
In this blog post, we saw how to ingest data to Hadoop, process it with Spark and stage it to Hive. We then created a Tableau dashboard by first connecting Tableau with the Hive data though Spark ODBC connector. This approach enables us to take advanatge of Spark's in-memory fast computing power and analyze and visualize big data with our favorite BI tools. Further, we can scale our cluster up or down depending on our requirements.