My previous tutorials on Apache Spark with Python:
.Analyzing the Bible and the Quran using Spark
.Spark DataFrames: Exploring Chicago Crimes
.Spark RDDs vs DataFrames vs SparkSQL - part 1: Retrieving, Sorting and Filtering
.Spark RDDs Vs DataFrames vs SparkSQL - Part 2 : Working With Multiple Tables
.Spark RDDs Vs DataFrames vs SparkSQL - Part 3 : Web Server Log Analysis
.Spark RDDs Vs DataFrames vs SparkSQL - Part 4 : Set Operators
.Spark RDDs Vs DataFrames vs SparkSQL - Part 5 : Set Operators
In this blog post, we will see how to use Spark with Hive, particularly:
- how to create and use Hive databases
- how to create Hive tables
- how to load data to Hive tables
- how to insert data into Hive tables
- how to read data from Hive tables
- we will also see how to save dataframes to any Hadoop supported file system
To work with Hive, we have to instantiate SparkSession with Hive support, including connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined functions if we are using Spark 2.0.0 and later. If we are using earleir Spark versions, we have to use HiveContext which is variant of Spark SQL that integrates with data stored in Hive. Even when we do not have an existing Hive deployment, we can still enable Hive support.
In this tutorial, I am using stand alone Spark. When not configured by the Hive-site.xml, the context automatically creates metastore_db in the current directory. As shown below, initially, we do not have metastore_db but after we instantiate SparkSession with Hive support, we see that metastore_db has been created. Further, when we excute create database command, spark-warehouse is created.
First, let's see what we have in the current working directory.
import os
os.listdir(os.getcwd())
Initially, we do not have metastore_db.
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
Now, let's check if metastore_db has been created.
os.listdir(os.getcwd())
Now, as you can see above, metastore_db has been created.
Now, we can use Hive commands to see databases and tables. However, at this point, we do not have any database or table. We will create them below.
spark.sql('show databases').show()
spark.sql('show tables').show()
We can see the functions in Spark.SQL using the command below. At the time of this writing, we have about 250 functions.
fncs = spark.sql('show functions').collect()
len(fncs)
Let's see some of them.
for i in fncs[100:111]:
print(i[0])
By the way, we can see what a function is used for and what the arguments are as below.
spark.sql("describe function instr").show(truncate = False)
Now, let's create a database. The data we will use is MovieLens 20M Dataset. We will use movies, ratings and tags data sets.
spark.sql('create database movies')
Let's check if our database has been created.
spark.sql('show databases').show()
Yes, movies database has been created.
Now, let's download the data. I am using Jupyter Notebook so ! enabes me to use shell commands.
! wget http://files.grouplens.org/datasets/movielens/ml-latest.zip
Now, let's create tables: in textfile format, in ORC and in AVRO format. But first, we have to make sure we are using the movies database by switching to it using the command below.
spark.sql('use movies')
The movies dataset has movieId, title and genres fields. The ratings dataset, on the other hand, as userId, movieID, rating and timestamp fields. Now, let's create the tables.
Please refer to the Hive manual for details on how to create tables and load/insert data into the tables.
spark.sql('create table movies \
(movieId int,title string,genres string) \
row format delimited fields terminated by ","\
stored as textfile') # in textfile format
spark.sql("create table ratings\
(userId int,movieId int,rating float,timestamp string)\
stored as ORC" ) # in ORC format
Let's create another table in AVRO format. We will insert count of movies by generes into it later.
spark.sql("create table genres_by_count\
( genres string,count int)\
stored as AVRO" ) # in AVRO format
Now, let's see if the tables have been created.
spark.sql("show tables").show()
We see all the tables we created above.
We can get information about a table as below. If we do not include formatted or extended in the command, we see only information about the columns. But now, we see even its location, the database and other attributes.
spark.sql("describe formatted ratings").show(truncate = False)
Now let's load data to the movies table. We can load data from a local file system or from any hadoop supported file system. If we are using a hadoop directory, we have to remove local from the command below. Please refer the Hive manual for details. If we are loading it just one time, we do not need to include overwrite. However, if there is possiblity that we could run the code more than one time, including overwrite is important not to append the same dataset to the table again and again. Hive does not do any transformation while loading data into tables. Load operations are currently pure copy/move operations that move datafiles into locations corresponding to Hive tables. Hive does some minimal checks to make sure that the files being loaded match the target table. So, pay careful attention to your code.
spark.sql("load data local inpath '/home/fish/MySpark/HiveSpark/movies.csv'\
overwrite into table movies")
Rather than loading the data as a bulk, we can pre-process it and create a dataframe and insert our dataframe to the table. Let's insert the ratings data by first creating a dataframe.
We can create dataframes in two ways.
Let's specify schema for the ratings dataset.
from pyspark.sql.types import *
schema = StructType([
StructField('userId', IntegerType()),
StructField('movieId', IntegerType()),
StructField('rating', DoubleType()),
StructField('timestamp', StringType())
])
Now, we can read it in as dataframe using dataframe reader as below.
ratings_df = spark.read.csv("/home/fish/MySpark/HiveSpark/ratings.csv", schema = schema, header = True)
We can see the schema of the dataframe as:
ratings_df.printSchema()
We can also display the first five records from the dataframe.
ratings_df.show(5)
The second option to create a dataframe is to read it in as RDD and change it to dataframe by using the toDF dataframe function or createDataFrame from SparkSession . Remember, we have to use the Row function from pyspark.sql to use toDF.
from pyspark.sql import Row
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
rdd = sc.textFile("/home/fish/MySpark/HiveSpark/ratings.csv")
header = rdd.first()
ratings_df2 = rdd.filter(lambda line: line != header).map(lambda line: Row(userId = int(line.split(",")[0]),
movieId = int(line.split(",")[1]),
rating = float(line.split(",")[2]),
timestamp = line.split(",")[3]
)).toDF()
We can also do as below:
rdd2 = rdd.filter(lambda line: line != header).map(lambda line:line.split(","))
ratings_df2_b =spark.createDataFrame(rdd2, schema = schema)
We see the schema and the the first five records from ratings_df and ratings_df2 are the same.
ratings_df2.printSchema()
ratings_df2.show(5)
To insert a dataframe into a Hive table, we have to first create a temporary table as below.
ratings_df.createOrReplaceTempView("ratings_df_table") # we can also use registerTempTable
Now, let's insert the data to the ratings Hive table.
spark.sql("insert into table ratings select * from ratings_df_table")
Next, let's check if the movies and ratings Hive tables have the data.
spark.sql("select * from movies limit 10").show(truncate = False)
spark.sql("select * from ratings limit 10").show(truncate = False)
We see that we can put our data in Hive tables by either directly loading data in a local or hadoop file system or by creating a dataframe and registering the dataframe as a temporary table.
We can also query data in Hive table and save it another Hive table. Let's calculate number of movies by genres and insert those genres which occur more than 500 times to genres_by_count AVRO Hive table we created above.
spark.sql("select genres, count(*) as count from movies\
group by genres\
having count(*) > 500 \
order by count desc").show()
spark.sql("insert into table genres_by_count \
select genres, count(*) as count from movies\
group by genres\
having count(*) >= 500 \
order by count desc")
Now, we can check if the data has been inserted to the Hive table appropriately:
spark.sql("select * from genres_by_count order by count desc limit 3").show()
We can also use data in Hive tables with other dataframes by first registering the dataframes as temporary tables.
Now, let's create a temporary table from the tags dataset and then we will join it with movies and ratings tables which are in Hive.
schema = StructType([
StructField('userId', IntegerType()),
StructField('movieId', IntegerType()),
StructField('tag', StringType()),
StructField('timestamp', StringType())
])
tags_df = spark.read.csv("/home/fish/MySpark/HiveSpark/tags.csv", schema = schema, header = True)
tags_df.printSchema()
Next, register the dataframe as temporary table.
tags_df.registerTempTable('tags_df_table')
From the show tables Hive command below, we see that three of them are permanent but two of them are temporary tables.
spark.sql('show tables').show()
Now, lets' join the three tables by using inner join. The result is a dataframe.
joined = spark.sql("select m.title, m.genres, r.movieId, r.userId, r.rating, r.timestamp as ratingTimestamp, \
t.tag, t.timestamp as tagTimestamp from ratings as r inner join tags_df_table as t\
on r.movieId = t.movieId and r.userId = t.userId inner join movies as m on r.movieId = m.movieId")
type(joined)
We can see the first five records as below.
joined.select(['title','genres','rating']).show(5, truncate = False)
We can also save our dataframe in other file system.
Let's create a new directory and save the dataframe in csv, json, orc and parquet formats.
Let's see two ways to do that:
!pwd
!mkdir output
joined.write.csv("/home/fish/MySpark/HiveSpark/output/joined.csv", header = True)
joined.write.json("/home/fish/MySpark/HiveSpark/output/joined.json")
joined.write.orc("/home/fish/MySpark/HiveSpark/output/joined_orc")
joined.write.parquet("/home/fish/MySpark/HiveSpark/output/joined_parquet" )
Now, let's check if the data is there in the formats we specified.
! ls output
The second option to save data:
joined.write.format('csv').save("/home/fish/MySpark/HiveSpark/output/joined2.csv" , header = True)
joined.write.format('json').save("/home/fish/MySpark/HiveSpark/output/joined2.json" )
joined.write.format('orc').save("/home/fish/MySpark/HiveSpark/output/joined2_orc" )
joined.write.format('parquet').save("/home/fish/MySpark/HiveSpark/output/joined2_parquet" )
Now, let's see if we have data from both oprions.
! ls output
Similarly, let's see two ways to read the data.
First option:
read_csv = spark.read.csv('/home/fish/MySpark/HiveSpark/output/joined.csv', header = True)
read_orc = spark.read.orc('/home/fish/MySpark/HiveSpark/output/joined_orc')
read_parquet = spark.read.parquet('/home/fish/MySpark/HiveSpark/output/joined_parquet')
read_orc.printSchema()
second option:
read2_csv = spark.read.format('csv').load('/home/fish/MySpark/HiveSpark/output/joined.csv', header = True)
read2_orc = spark.read.format('orc').load('/home/fish/MySpark/HiveSpark/output/joined_orc')
read2_parquet = spark.read.format('parquet').load('/home/fish/MySpark/HiveSpark/output/joined_parquet')
read2_parquet.printSchema()
We can also write a dataframe into a Hive table by using insertInto. This requires that the schema of the DataFrame is the same as the schema of the table.
Let's see the schema of the joined dataframe and create two Hive tables: one in ORC and one in PARQUET formats to insert the dataframe into.
joined.printSchema()
Create ORC Hive Table:
spark.sql("create table joined_orc\
(title string,genres string, movieId int, userId int, rating float, \
ratingTimestamp string,tag string, tagTimestamp string )\
stored as ORC" )
Create PARQUET Hive Table:
spark.sql("create table joined_parquet\
(title string,genres string, movieId int, userId int, rating float, \
ratingTimestamp string,tag string, tagTimestamp string )\
stored as PARQUET")
Let's see if the tables have been created.
spark.sql('show tables').show()
They are there. Now, let's insert dataframe into the tables.
joined.write.insertInto('joined_orc')
joined.write.insertInto('joined_parquet')
Finally, let's check if the data has been inserted into the Hive tbales.
spark.sql('select title, genres, rating from joined_orc order by rating desc limit 5').show(truncate = False)
spark.sql('select title, genres, rating from joined_parquet order by rating desc limit 5').show(truncate = False)
Everything looks great! See you in my next tutorial on Apache Spark.