This is the third tutorial on the Spark RDDs Vs DataFrames vs SparkSQL blog post series. The first one is available here. In the first part, we saw how to retrieve, sort and filter data using Spark RDDs, DataFrames and SparkSQL. In the second part (here), we saw how to work with multiple tables in Spark the RDD way, the DataFrame way and with SparkSQL. In this third part of the blog post series, we will perform web server log analysis using real-world text-based production logs. Log data can be used monitoring servers, improving business and customer intelligence, building recommendation systems, fraud detection, and much more. Server log analysis is a good use case for Spark. It's a very large, common data source and contains a rich set of information.
If you like this tutorial series, check also my other recent blos posts on Spark on Analyzing the Bible and the Quran using Spark and Spark DataFrames: Exploring Chicago Crimes. The data and the notebooks can be downloaded from my GitHub repository.
The log files that we use for this assignment are in the Apache Common Log Format (CLF). The log file entries produced in CLF will look something like this:
127.0.0.1 - - [01/Aug/1995:00:00:01 -0400] "GET /images/launch-logo.gif HTTP/1.0" 200 1839 Each part of this log entry is described below.
127.0.0.1 This is the IP address (or host name, if available) of the client (remote host) which made the request to the server.
- The "hyphen" in the output indicates that the requested piece of information (user identity from remote machine) is not available.
- The "hyphen" in the output indicates that the requested piece of information (user identity from local logon) is not available.
[01/Aug/1995:00:00:01 -0400] the time that the server finished processing the request. The format is: [day/month/year:hour:minute:second timezone]
day = 2 digits
month = 3 letters
year = 4 digits
hour = 2 digits
minute = 2 digits
second = 2 digits
zone = (+ | -) 4 digits
"GET /images/launch-logo.gif HTTP/1.0" This is the first line of the request string from the client. It consists of a three components: the request method (e.g., GET, POST, etc.), the endpoint (a Uniform Resource Identifier), and the client protocol version.
200 This is the status code that the server sends back to the client. This information is very valuable, because it reveals whether the request resulted in a successful response (codes beginning in 2), a redirection (codes beginning in 3), an error caused by the client (codes beginning in 4), or an error in the server (codes beginning in 5).
1839 The last entry indicates the size of the object returned to the client, not including the response headers. If no content was returned to the client, this value will be "-" (or sometimes 0).
we will use a data set from NASA Kennedy Space Center WWW server in Florida. The full data set is freely available here and contains two month's of all HTTP requests.
Let's download the data. Since I am using Jupyter Notebook, ! helps us to run a shell command
! wget ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz
! wget ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf().setAppName("Spark-RDD-DataFrame_SQL").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
sqlcontext = SQLContext(sc)
rdd = sc.textFile("NASA_access*")
for line in rdd.sample(withReplacement = False, fraction = 0.000001, seed = 120).collect():
print(line)
print("\n")
import re
def parse_log1(line):
match = re.search('^(\S+) (\S+) (\S+) \[(\S+) [-](\d{4})\] "(\S+)\s*(\S+)\s*(\S+)\s*([\w\.\s*]+)?\s*"*(\d{3}) (\S+)', line)
if match is None:
return 0
else:
return 1
n_logs = rdd.count()
failed = rdd.map(lambda line: parse_log1(line)).filter(lambda line: line == 0).count()
print('Out of {} logs, {} failed to parse'.format(n_logs,failed))
we see that 1685 out of the 3.5 million logs failed to parse. I took samples of the failed logs and tried to modify the above regular expression pattern as show below.
def parse_log2(line):
match = re.search('^(\S+) (\S+) (\S+) \[(\S+) [-](\d{4})\] "(\S+)\s*(\S+)\s*(\S+)\s*([/\w\.\s*]+)?\s*"* (\d{3}) (\S+)',line)
if match is None:
match = re.search('^(\S+) (\S+) (\S+) \[(\S+) [-](\d{4})\] "(\S+)\s*([/\w\.]+)>*([\w/\s\.]+)\s*(\S+)\s*(\d{3})\s*(\S+)',line)
if match is None:
return (line, 0)
else:
return (line, 1)
failed = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 0).count()
print('Out of {} logs, {} failed to parse'.format(n_logs,failed))
Still, 1253 of them failed to parse. However, since we have successfully parsed more than 99.9% of the data, we can work with what we have parsed. You can play with the regular expression pattern to match all of the data :).
def map_log(line):
match = re.search('^(\S+) (\S+) (\S+) \[(\S+) [-](\d{4})\] "(\S+)\s*(\S+)\s*(\S+)\s*([/\w\.\s*]+)?\s*"* (\d{3}) (\S+)',line)
if match is None:
match = re.search('^(\S+) (\S+) (\S+) \[(\S+) [-](\d{4})\] "(\S+)\s*([/\w\.]+)>*([\w/\s\.]+)\s*(\S+)\s*(\d{3})\s*(\S+)',line)
return(match.groups())
parsed_rdd = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 1).map(lambda line : line[0])
parsed_rdd2 = parsed_rdd.map(lambda line: map_log(line))
for i in parsed_rdd2.take(3):
print(i)
print('\n')
As shown below, each line is a log of length 11.
parsed_rdd2.map(lambda line: len(line)).distinct().collect()
Now, let's try to answer some questions.
RDD way
result = parsed_rdd2.map(lambda line: (line[0],1)).reduceByKey(lambda a, b: a + b).takeOrdered(10, lambda x: -x[1])
result
We can also use Pandas and Matplotlib to creata a viz.
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
Host = [x[0] for x in result]
count = [x[1] for x in result]
Host_count_dct = {'Host':Host, 'count':count}
Host_count_df = pd.DataFrame(Host_count_dct )
myplot = Host_count_df.plot(figsize = (12,8), kind = "barh", color = "#7a7a52", width = 0.8,
x = "Host", y = "count", legend = False)
myplot.invert_yaxis()
plt.xlabel("Count", fontsize = 16)
plt.ylabel("Host", fontsize = 16)
plt.title("Most common hosts ", fontsize = 20, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()
DataFrame way
parsed_df = sqlcontext.createDataFrame(parsed_rdd2,
schema = ['host','identity_remote', 'identity_local','date_time',
'time_zone','request_method','endpoint','client_protocol','mis',
'status_code','size_returned'], samplingRatio = 0.1)
parsed_df.printSchema()
parsed_df.groupBy('host').count().orderBy('count', ascending = False).show(10, truncate = False)
SQL way
parsed_df.createOrReplaceTempView("parsed_table")
sqlcontext.sql('SELECT host, count(*) AS count FROM parsed_table GROUP BY\
host ORDER BY count DESC LIMIT 10 ').show()
RDD way
def convert_long(x):
x = re.sub('[^0-9]',"",x)
if x =="":
return 0
else:
return int(x)
parsed_rdd2.map(lambda line: convert_long(line[-1])).stats()
DataFrame way
Here, we can use functions from pyspark.
from pyspark.sql.functions import mean, udf, col, min, max, stddev, count
from pyspark.sql.types import DoubleType, IntegerType
my_udf = udf(convert_long, IntegerType() )
(parsed_df.select(my_udf('size_returned').alias('size'))
.select(mean('size').alias('Mean Size'),
max('size').alias('Max Size'),
min('size').alias('Min Size'),
count('size').alias('Count'),
stddev('size').alias('stddev Size')).show()
)
SQL way
parsed_df_cleaned = parsed_rdd2.map(lambda line: convert_long(line[-1])).toDF(IntegerType())
parsed_df_cleaned.createOrReplaceTempView("parsed_df_cleaned_table")
sqlcontext.sql("SELECT avg(value) AS Mean_size, max(value) AS Max_size, \
min(value) AS Min_size, count(*) AS count, \
std(value) AS stddev_size FROM parsed_df_cleaned_table").show()
RDD way
n_codes = parsed_rdd2.map(lambda line: (line[-2], 1)).distinct().count()
codes_count = (parsed_rdd2.map(lambda line: (line[-2], 1))
.reduceByKey(lambda a, b: a + b)
.takeOrdered(n_codes, lambda x: -x[1]))
codes_count
codes =[x[0] for x in codes_count]
count =[x[1] for x in codes_count]
codes_dict = {'code':codes,'count':count}
codes_df = pd.DataFrame(codes_dict)
plot = codes_df.plot(figsize = (12, 6), kind = 'barh', y = 'count', x = 'code', legend = False)
plot.invert_yaxis()
plt.title('Number of requests by response code', fontsize = 20, color = 'b')
plt.xlabel('Count', fontsize = 16, color = 'b')
plt.ylabel('Response code', fontsize = 16, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()
We can also create a pie chart as below.
def pie_pct_format(value):
return '' if value < 7 else '%.0f%%' % value
fig = plt.figure(figsize =(10, 10), facecolor = 'white', edgecolor = 'white')
colors = ['yellowgreen', 'lightskyblue', 'gold', 'purple', 'lightcoral', 'yellow', 'black']
explode = (0.05, 0.05, 0.1, 0, 0, 0, 0,0,0,0)
patches, texts, autotexts = plt.pie(count, labels = codes, colors = colors,
explode = explode, autopct = pie_pct_format,
shadow = False, startangle = 125)
for text, autotext in zip(texts, autotexts):
if autotext.get_text() == '':
text.set_text('')
plt.legend(codes, loc = (0.80, -0.1), shadow=True)
pass
DataFrame way
parsed_df.groupBy('status_code').count().orderBy('count', ascending = False).show()
SQL way
sqlcontext.sql("SELECT status_code, count(*) AS count FROM parsed_table \
GROUP BY status_code ORDER BY count DESC").show()
RDD way
result = parsed_rdd2.map(lambda line: (line[6],1)).reduceByKey(lambda a, b: a + b).takeOrdered(10, lambda x: -x[1])
result
endpoint = [x[0] for x in result]
count = [x[1] for x in result]
endpoint_count_dct = {'endpoint':endpoint, 'count':count}
endpoint_count_df = pd.DataFrame(endpoint_count_dct )
myplot = endpoint_count_df .plot(figsize = (12,8), kind = "barh", color = "#669999", width = 0.8,
x = "endpoint", y = "count", legend = False)
myplot.invert_yaxis()
plt.xlabel("Count", fontsize = 16)
plt.ylabel("End point", fontsize = 16)
plt.title("Top ten endpoints ", fontsize = 20, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()
DataFrame way
parsed_df.groupBy('endpoint').count().orderBy('count', ascending = False).show(10, truncate = False)
SQL way
sqlcontext.sql("SELECT endpoint, count(*) AS count FROM parsed_table \
GROUP BY endpoint ORDER BY count DESC LIMIT 10").show(truncate = False)
These are error endpoints
RDD way
result = (parsed_rdd2.filter(lambda line: line[9] != '200')
.map(lambda line: (line[6], 1))
.reduceByKey(lambda a, b: a+b)
.takeOrdered(8, lambda x: -x[1]))
result
endpoint = [x[0] for x in result]
count = [x[1] for x in result]
endpoint_count_dct = {'endpoint':endpoint, 'count':count}
endpoint_count_df = pd.DataFrame(endpoint_count_dct )
myplot = endpoint_count_df .plot(figsize = (12,8), kind = "barh", color = "#ff4000", width = 0.8,
x = "endpoint", y = "count", legend = False)
myplot.invert_yaxis()
plt.xlabel("Count", fontsize = 16)
plt.ylabel("End Point", fontsize = 16)
plt.title("Top eight error endpoints ", fontsize = 20, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()
DataFrame way
(parsed_df.filter(parsed_df['status_code']!=200)
.groupBy('endpoint').count().orderBy('count', ascending = False)
.show(8, truncate = False))
SQL way
sqlcontext.sql("SELECT endpoint, count(*) AS count FROM parsed_table \
WHERE status_code != 200 GROUP BY endpoint ORDER BY count DESC LIMIT 8").show(truncate = False)
RDD way
parsed_rdd2.map(lambda line: line[0]).distinct().count()
DataFrame way
parsed_df.select(parsed_df['host']).distinct().count()
SQL way
sqlcontext.sql("SELECT count(distinct(host)) AS unique_host_count FROM parsed_table ").show(truncate = False)
RDD way
from datetime import datetime
def day_month(line):
date_time = line[3]
return datetime.strptime(date_time[:11], "%d/%b/%Y")
result = parsed_rdd2.map(lambda line: (day_month(line), 1)).reduceByKey(lambda a, b: a + b).collect()
day = [x[0] for x in result]
count = [x[1] for x in result]
day_count_dct = {'day':day, 'count':count}
day_count_df = pd.DataFrame(day_count_dct )
myplot = day_count_df.plot(figsize = (12,8), kind = "line", color = "#cc9900",
x = "day", y = "count", legend = False)
plt.ylabel("Count", fontsize = 16)
plt.xlabel("")
plt.title("Number of hosts per day ", fontsize = 20, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()
Now, let's just display the first ten values to compare with results from the other methods.
parsed_rdd2.map(lambda line: (day_month(line), 1)).reduceByKey(lambda a, b: a + b).takeOrdered(10, lambda x: x[0])
DataFrame way
from datetime import datetime
from pyspark.sql.functions import col,udf
from pyspark.sql.types import TimestampType
myfunc = udf(lambda x: datetime.strptime(x, '%d/%b/%Y:%H:%M:%S'), TimestampType())
parsed_df2 = parsed_df.withColumn('date_time', myfunc(col('date_time')))
from pyspark.sql.functions import date_format, month,dayofmonth
parsed_df2 = parsed_df2.withColumn("month", month(col("date_time"))).\
withColumn("DayOfmonth", dayofmonth(col("date_time")))
n_hosts_by_day = parsed_df2.groupBy(["month", "DayOfmonth"]).count().orderBy((["month", "DayOfmonth"]))
n_hosts_by_day.show(n = 10)
SQL way
parsed_df2.createOrReplaceTempView("parsed_df2_table")
sqlcontext.sql("SELECT month, DayOfmonth, count(*) As count FROM parsed_df2_table GROUP BY month, DayOfmonth\
ORDER BY month, DayOfmonth LIMIT 10").show()
RDD way
result = (parsed_rdd2.map(lambda line: (day_month(line),line[0]))
.groupByKey().mapValues(set)
.map(lambda x: (x[0], len(x[1])))).collect()
day = [x[0] for x in result]
count = [x[1] for x in result]
day_count_dct = {'day':day, 'count':count}
day_count_df = pd.DataFrame(day_count_dct )
myplot = day_count_df.plot(figsize = (12,8), kind = "line", color = "#000099",
x = "day", y = "count", legend = False)
plt.ylabel("Count", fontsize = 16)
plt.xlabel("")
plt.title("Number of unique hosts per day ", fontsize = 20, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()
Now, let's display 10 days with the highest values to compare with results from the other methods
(parsed_rdd2.map(lambda line: (day_month(line),line[0]))
.groupByKey().mapValues(set)
.map(lambda x: (x[0], len(x[1])))).takeOrdered(10, lambda x: x[0])
SQL way
sqlcontext.sql("SELECT DATE(date_time) Date, COUNT(DISTINCT host) AS totalUniqueHosts FROM\
parsed_df2_table GROUP BY DATE(date_time) ORDER BY DATE(date_time) ASC").show(n = 10)
RDD way
unique_result = (parsed_rdd2.map(lambda line: (day_month(line),line[0]))
.groupByKey().mapValues(set)
.map(lambda x: (x[0], len(x[1]))))
length_result = (parsed_rdd2.map(lambda line: (day_month(line),line[0]))
.groupByKey().mapValues(len))
joined = length_result.join(unique_result).map(lambda a: (a[0], (a[1][0])/(a[1][1]))).collect()
day = [x[0] for x in joined]
count = [x[1] for x in joined]
day_count_dct = {'day':day, 'count':count}
day_count_df = pd.DataFrame(day_count_dct )
myplot = day_count_df.plot(figsize = (12,8), kind = "line", color = "#000099",
x = "day", y = "count", legend = False)
plt.ylabel("Count", fontsize = 16)
plt.xlabel("")
plt.title("Average number of daily requests per host", fontsize = 20, color = 'b')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()
sorted(joined)[:10]
SQL way
sqlcontext.sql("SELECT DATE(date_time) Date, COUNT(host)/COUNT(DISTINCT host) AS daily_requests_per_host FROM\
parsed_df2_table GROUP BY DATE(date_time) ORDER BY DATE(date_time) ASC").show(n = 10)
RDD way
parsed_rdd2.filter(lambda line: line[9] == '404').count()
DataFrame way
parsed_df2.filter(parsed_df2['status_code']=="404").count()
SQL way
sqlcontext.sql("SELECT COUNT(*) AS logs_404_count FROM\
parsed_df2_table WHERE status_code ==404").show(n = 10)
RDD way
result = (parsed_rdd2.filter(lambda line: line[9] == '404')
.map(lambda line: (line[6], 1))
.reduceByKey(lambda a, b: a+b)
.takeOrdered(5, lambda x: -x[1]))
result
endpoint = [x[0] for x in result]
count = [x[1] for x in result]
endpoint_count_dct = {'endpoint':endpoint, 'count':count}
endpoint_count_df = pd.DataFrame(endpoint_count_dct )
myplot = endpoint_count_df .plot(figsize = (12,8), kind = "barh", color = "#cc9900", width = 0.8,
x = "endpoint", y = "count", legend = False)
myplot.invert_yaxis()
plt.xlabel("Count", fontsize = 16)
plt.ylabel("End Point", fontsize = 16)
plt.title("Top ten endpoints with 404 response codes ", fontsize = 20, color = 'r')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()
DataFrame way
(parsed_df2.filter(parsed_df2['status_code']=="404")
.groupBy('endpoint').count().orderBy('count', ascending = False).show(5, truncate = False))
SQL way
sqlcontext.sql("SELECT endpoint, COUNT(*) AS count FROM\
parsed_df2_table WHERE status_code ==404 GROUP BY endpoint\
ORDER BY count DESC LIMIT 5").show(truncate = False)
RDD way
result = (parsed_rdd2.filter(lambda line: line[9] == '404')
.map(lambda line: (line[0], 1))
.reduceByKey(lambda a, b: a+b)
.takeOrdered(5, lambda x: -x[1]))
result
host = [x[0] for x in result]
count = [x[1] for x in result]
host_count_dct = {'host':host, 'count':count}
host_count_df = pd.DataFrame(host_count_dct )
myplot = host_count_df .plot(figsize = (12,8), kind = "barh", color = "#cc9900", width = 0.8,
x = "host", y = "count", legend = False)
myplot.invert_yaxis()
plt.xlabel("Count", fontsize = 16)
plt.ylabel("Host", fontsize = 16)
plt.title("Top five hosts with 404 response codes ", fontsize = 20, color = 'r')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()
DataFrame way
(parsed_df2.filter(parsed_df2['status_code']=="404")
.groupBy('host').count().orderBy('count', ascending = False).show(5, truncate = False))
SQL way
sqlcontext.sql("SELECT host, COUNT(*) AS count FROM\
parsed_df2_table WHERE status_code ==404 GROUP BY host\
ORDER BY count DESC LIMIT 5").show(truncate = False)
RDD way
result = (parsed_rdd2.filter(lambda line: line[9] == '404')
.map(lambda line: (day_month(line), 1))
.reduceByKey(lambda a, b: a+b).collect())
day = [x[0] for x in result]
count = [x[1] for x in result]
day_count_dct = {'day':day, 'count':count}
day_count_df = pd.DataFrame(day_count_dct )
myplot = day_count_df.plot(figsize = (12,8), kind = "line", color = "#cc9900",
x = "day", y = "count", legend = False)
plt.ylabel("Count", fontsize = 16)
plt.xlabel("")
plt.title("Number of 404 response codes per day ", fontsize = 20, color = 'r')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()
Now, let's display 10 days with the highest number of 404 errors to compare results from the other methods.
day_count_df.sort_values('count', ascending = False)[:10]
DataFrame way
(parsed_df2.filter(parsed_df2['status_code']=="404")
.groupBy(["month", "DayOfmonth"]).count()
.orderBy('count', ascending = False).show(10)
)
SQL way
sqlcontext.sql("SELECT DATE(date_time) AS Date, COUNT(*) AS daily_404_erros FROM\
parsed_df2_table WHERE status_code = 404 \
GROUP BY DATE(date_time) ORDER BY daily_404_erros DESC LIMIT 10").show()
RDD way
(parsed_rdd2.filter(lambda line: line[9] == '404')
.map(lambda line: (day_month(line), 1))
.reduceByKey(lambda a, b: a+b).takeOrdered(5, lambda x: -x[1]))
This has been solved the SQL way and the RDD way in No. 13 above.
RDD way
def date_time(line):
date_time = line[3]
return datetime.strptime(date_time, "%d/%b/%Y:%H:%M:%S")
result = (parsed_rdd2.filter(lambda line: line[9] == '404').map(lambda line: (date_time(line).hour, 1))
.reduceByKey(lambda a, b: a + b)).collect()
result = sorted(result)
hour = [x[0] for x in result]
count = [x[1] for x in result]
hour_count_dct = {'hour': hour, 'count':count}
hour_count_df = pd.DataFrame(hour_count_dct )
myplot = hour_count_df.plot(figsize = (12,8), kind = "bar", color = "#000000",x ='hour',
y = "count", legend = False)
plt.ylabel("Count", fontsize = 16)
plt.xlabel("Hour of day", fontsize = 16)
plt.title("Number of 404 errors per hour ", fontsize = 20, color = 'r')
plt.xticks(size = 14)
plt.yticks(size = 14)
plt.show()
result[:10] # Just displaying the first five to compare results from the other methods
DataFrame way
from pyspark.sql.functions import hour
parsed_df3 = parsed_df2.withColumn('hour_of_day', hour(col('date_time')))
(parsed_df3.filter(parsed_df3['status_code']=="404")
.groupBy("hour_of_day").count()
.orderBy("hour_of_day", ascending = True).show(10))
SQL way
sqlcontext.sql("SELECT HOUR(date_time) AS hour, COUNT(*) AS hourly_404_erros FROM\
parsed_df2_table WHERE status_code = 404 \
GROUP BY HOUR(date_time) ORDER BY HOUR(date_time) LIMIT 10").show(n = 100)
This is enough for today. See you in the next part of the DataFrames Vs RDDs in Spark tutorial series.