Fisseha Berhane, PhD

Data Scientist

CV Resume Linkedin GitHub twitter twitter

Machine Learning with Text in PySpark - Part 1

We usually work with structured data in our machine learning applications. However, unstructured text data can also have vital content for machine learning models. In this blog post, we will see how to use PySpark to build machine learning models with unstructured text data.The data is from UCI Machine Learning Repository. The data can be downloaded from here. According to the data description the data is a set of SMS tagged messages that have been collected for SMS Spam research. It contains one set of SMS messages in English of 5,574 messages, tagged acording being ham (legitimate) or spam.

We will tokenize the messages and create TF-IDF and then we will build various models using cross-validation and grid search and compare their performances.

Start a SparkSession

In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Read Data

In [9]:
df = spark.read.csv("SMSSpamCollection", sep = "\t", inferSchema=True, header = False)

Let's see the first five rows. As shown below, the data does not have column names. So, we will rename them.

In [10]:
df.show(5, truncate = False)
+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0 |_c1                                                                                                                                                        |
+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|ham |Ok lar... Joking wif u oni...                                                                                                                              |
|spam|Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
|ham |U dun say so early hor... U c already then say...                                                                                                          |
|ham |Nah I don't think he goes to usf, he lives around here though                                                                                              |
+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows

Rename Columns

In [11]:
df = df.withColumnRenamed('_c0', 'status').withColumnRenamed('_c1', 'message')
df.show(5, truncate = False)
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|status|message                                                                                                                                                    |
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham   |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|ham   |Ok lar... Joking wif u oni...                                                                                                                              |
|spam  |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
|ham   |U dun say so early hor... U c already then say...                                                                                                          |
|ham   |Nah I don't think he goes to usf, he lives around here though                                                                                              |
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows

Change the status column to numeric

Now, let's change the status field to numeric field: ham to 1.0 and spam to 0. All our fields need to be numeric.

In [12]:
df.createOrReplaceTempView('temp')
df = spark.sql('select case status when "ham" then 1.0  else 0 end as label, message from temp')
df.show(5, truncate = False)
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|message                                                                                                                                                    |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|1.0  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|1.0  |Ok lar... Joking wif u oni...                                                                                                                              |
|0.0  |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
|1.0  |U dun say so early hor... U c already then say...                                                                                                          |
|1.0  |Nah I don't think he goes to usf, he lives around here though                                                                                              |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows

Tokenize the messages

Tokenization is the process of taking text (such as a sentence) and breaking it into individual terms (usually words). Let's tokenize the messages and create a list of words of each message.

In [13]:
from pyspark.ml.feature import  Tokenizer
In [14]:
tokenizer = Tokenizer(inputCol="message", outputCol="words")
wordsData = tokenizer.transform(df)
wordsData.show()
+-----+--------------------+--------------------+
|label|             message|               words|
+-----+--------------------+--------------------+
|  1.0|Go until jurong p...|[go, until, juron...|
|  1.0|Ok lar... Joking ...|[ok, lar..., joki...|
|  0.0|Free entry in 2 a...|[free, entry, in,...|
|  1.0|U dun say so earl...|[u, dun, say, so,...|
|  1.0|Nah I don't think...|[nah, i, don't, t...|
|  0.0|FreeMsg Hey there...|[freemsg, hey, th...|
|  1.0|Even my brother i...|[even, my, brothe...|
|  1.0|As per your reque...|[as, per, your, r...|
|  0.0|WINNER!! As a val...|[winner!!, as, a,...|
|  0.0|Had your mobile 1...|[had, your, mobil...|
|  1.0|I'm gonna be home...|[i'm, gonna, be, ...|
|  0.0|SIX chances to wi...|[six, chances, to...|
|  0.0|URGENT! You have ...|[urgent!, you, ha...|
|  1.0|I've been searchi...|[i've, been, sear...|
|  1.0|I HAVE A DATE ON ...|[i, have, a, date...|
|  0.0|XXXMobileMovieClu...|[xxxmobilemoviecl...|
|  1.0|Oh k...i'm watchi...|[oh, k...i'm, wat...|
|  1.0|Eh u remember how...|[eh, u, remember,...|
|  1.0|Fine if that’s th...|[fine, if, that’s...|
|  0.0|England v Macedon...|[england, v, mace...|
+-----+--------------------+--------------------+
only showing top 20 rows

Apply CountVectorizer

CountVectorizer converts the list of tokens above to vectors of token counts. See the documentation description for details.

In [15]:
from pyspark.ml.feature import CountVectorizer
In [16]:
count = CountVectorizer (inputCol="words", outputCol="rawFeatures")
model = count.fit(wordsData)
featurizedData = model.transform(wordsData)
featurizedData.show()
+-----+--------------------+--------------------+--------------------+
|label|             message|               words|         rawFeatures|
+-----+--------------------+--------------------+--------------------+
|  1.0|Go until jurong p...|[go, until, juron...|(13587,[8,42,52,6...|
|  1.0|Ok lar... Joking ...|[ok, lar..., joki...|(13587,[5,75,411,...|
|  0.0|Free entry in 2 a...|[free, entry, in,...|(13587,[0,3,8,20,...|
|  1.0|U dun say so earl...|[u, dun, say, so,...|(13587,[5,22,60,1...|
|  1.0|Nah I don't think...|[nah, i, don't, t...|(13587,[0,1,66,87...|
|  0.0|FreeMsg Hey there...|[freemsg, hey, th...|(13587,[0,2,6,10,...|
|  1.0|Even my brother i...|[even, my, brothe...|(13587,[0,7,9,13,...|
|  1.0|As per your reque...|[as, per, your, r...|(13587,[0,10,11,4...|
|  0.0|WINNER!! As a val...|[winner!!, as, a,...|(13587,[0,2,3,14,...|
|  0.0|Had your mobile 1...|[had, your, mobil...|(13587,[0,4,5,10,...|
|  1.0|I'm gonna be home...|[i'm, gonna, be, ...|(13587,[0,1,6,32,...|
|  0.0|SIX chances to wi...|[six, chances, to...|(13587,[0,6,40,46...|
|  0.0|URGENT! You have ...|[urgent!, you, ha...|(13587,[0,2,3,4,8...|
|  1.0|I've been searchi...|[i've, been, sear...|(13587,[0,1,2,3,4...|
|  1.0|I HAVE A DATE ON ...|[i, have, a, date...|(13587,[1,3,14,16...|
|  0.0|XXXMobileMovieClu...|[xxxmobilemoviecl...|(13587,[0,4,8,11,...|
|  1.0|Oh k...i'm watchi...|[oh, k...i'm, wat...|(13587,[158,314,3...|
|  1.0|Eh u remember how...|[eh, u, remember,...|(13587,[1,5,20,47...|
|  1.0|Fine if that’s th...|[fine, if, that’s...|(13587,[4,5,29,59...|
|  0.0|England v Macedon...|[england, v, mace...|(13587,[0,4,28,82...|
+-----+--------------------+--------------------+--------------------+
only showing top 20 rows

Apply term frequency–inverse document frequency (TF-IDF)

In [17]:
from pyspark.ml.feature import  IDF

IDF down-weighs features which appear frequently in a corpus. This generally improves performance when using text as features since most frequent, and hence less important words, get down-weighed.

In [18]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()  # We want only the label and features columns for our machine learning models
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(13587,[8,42,52,6...|
|  1.0|(13587,[5,75,411,...|
|  0.0|(13587,[0,3,8,20,...|
|  1.0|(13587,[5,22,60,1...|
|  1.0|(13587,[0,1,66,87...|
|  0.0|(13587,[0,2,6,10,...|
|  1.0|(13587,[0,7,9,13,...|
|  1.0|(13587,[0,10,11,4...|
|  0.0|(13587,[0,2,3,14,...|
|  0.0|(13587,[0,4,5,10,...|
|  1.0|(13587,[0,1,6,32,...|
|  0.0|(13587,[0,6,40,46...|
|  0.0|(13587,[0,2,3,4,8...|
|  1.0|(13587,[0,1,2,3,4...|
|  1.0|(13587,[1,3,14,16...|
|  0.0|(13587,[0,4,8,11,...|
|  1.0|(13587,[158,314,3...|
|  1.0|(13587,[1,5,20,47...|
|  1.0|(13587,[4,5,29,59...|
|  0.0|(13587,[0,4,28,82...|
+-----+--------------------+
only showing top 20 rows

Split data into training (80%) and testing (20%)

We will split the dataframe into training and test sets, train on the first dataset, and then evaluate on the held-out test set.

In [26]:
seed = 0  # set seed for reproducibility

trainDF, testDF = rescaledData.randomSplit([0.8,0.2],seed)

Number of records of each dataframe

In [27]:
trainDF.count()
Out[27]:
4450
In [28]:
testDF.count()
Out[28]:
1124

Now, let's fit different classifiers. We will use grid search with cross-validation to search better parameter values among the provided ones. You can fine tune the models by providing finer parameter grid, and also including more of the important parameters for each algorithm.

Logistic Regression Classifier

Logistic regression is a popular method to predict a categorical response. It is a special case of Generalized Linear models that predicts the probability of the outcomes. In spark.ml logistic regression can be used to predict a binary outcome by using binomial logistic regression, or it can be used to predict a multiclass outcome by using multinomial logistic regression. Use the family parameter to select between these two algorithms, or leave it unset and Spark will infer the correct variant.

In [20]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np
In [20]:
lr = LogisticRegression(maxIter = 10)

paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, np.linspace(0.3, 0.01, 10)) \
    .addGrid(lr.elasticNetParam, np.linspace(0.3, 0.8, 6)) \
    .build()
In [21]:
crossval_lr = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid_lr,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds= 5)  
In [22]:
cvModel_lr = crossval_lr.fit(trainDF)
In [ ]:
best_model_lr = cvModel_lr.bestModel.summary
In [24]:
best_model_lr.predictions.columns
Out[24]:
['label',
 'message',
 'words',
 'rawFeatures',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

As shown below, we see the data was fitted perfectly. We will see the models performance with the test data. We have to practice caution when the models show extraordinary performance with the training data as this can be due to overfitting problem which makes the model not to generize to unseen data.

Area under the curve for the training data

In [27]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_eval_lr = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
my_eval_lr.evaluate(best_model_lr.predictions)
Out[27]:
1.0

We can get the f1 score, accuracy, precision and recall using MulticlassClassificationEvaluator which can be used for binary classification as well.

In [30]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
my_mc_lr.evaluate(best_model_lr.predictions)
Out[30]:
1.0
In [31]:
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_lr.evaluate(best_model_lr.predictions)
Out[31]:
1.0
In [33]:
train_fit_lr = best_model_lr.predictions.select('label','prediction')
train_fit_lr.groupBy('label','prediction').count().show()
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0| 3848|
|  0.0|       0.0|  602|
+-----+----------+-----+

Predict using the test data and evaluate the predictions

In [34]:
predictions_lr = cvModel_lr.transform(testDF)

As you can see below, the predictions dataframe contains the original data and the predictions.

In [35]:
predictions_lr.columns
Out[35]:
['label',
 'message',
 'words',
 'rawFeatures',
 'features',
 'rawPrediction',
 'probability',
 'prediction']
In [38]:
predictions_lr.show(5)
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|             message|               words|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  0.0|(Bank of Granite ...|[(bank, of, grani...|(13587,[3,7,10,12...|(13587,[3,7,10,12...|[1.46730111617655...|[0.81264682234974...|       0.0|
|  0.0|+123 Congratulati...|[+123, congratula...|(13587,[0,4,5,8,1...|(13587,[0,4,5,8,1...|[5.36211698537807...|[0.99533093752360...|       0.0|
|  0.0|+449071512431 URG...|[+449071512431, u...|(13587,[0,4,7,14,...|(13587,[0,4,7,14,...|[2.52752829905056...|[0.92604926446112...|       0.0|
|  0.0|3. You have recei...|[3., you, have, r...|(13587,[2,11,14,9...|(13587,[2,11,14,9...|[-0.6098888828551...|[0.35208454565894...|       1.0|
|  0.0|44 7732584351, Do...|[44, 7732584351,,...|(13587,[0,2,3,15,...|(13587,[0,2,3,15,...|[6.30912983467855...|[0.99818368907109...|       0.0|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
only showing top 5 rows

Show sample predictions:

In [39]:
predictions_lr.select('label', 'prediction').show(5)
+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       0.0|
+-----+----------+
only showing top 5 rows

It missed 21 spam messages but it got the ham ones correctly.

In [40]:
predictions_lr.groupBy('label','prediction').count().show()
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       1.0|   21|
|  1.0|       1.0|  979|
|  0.0|       0.0|  124|
+-----+----------+-----+

Area under the curve with the test data

In [64]:
my_eval_lr.evaluate(predictions_lr)
Out[64]:
0.9275862068965517

F1-score with the test data

In [46]:
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
my_mc_lr.evaluate(predictions_lr)
Out[46]:
0.9806865812338207

Acccuracy with the test data

In [29]:
(979+124)/(979+124+21)
Out[29]:
0.9813167259786477
In [47]:
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_lr.evaluate(predictions_lr)
Out[47]:
0.9813167259786477

We see that the model has good performance with the test data.

Naive Bayes

Naive Bayes classifiers are a family of simple probabilistic classifiers based on applying Bayes’ theorem with strong (naive) independence assumptions between the features. The spark.ml implementation currently supports both multinomial naive Bayes and Bernoulli naive Bayes.

In [30]:
from pyspark.ml.classification import NaiveBayes
In [33]:
nb = NaiveBayes()


paramGrid_nb = ParamGridBuilder() \
    .addGrid(nb.smoothing, np.linspace(0.3, 10, 10)) \
    .build()
    
crossval_nb = CrossValidator(estimator=nb,
                          estimatorParamMaps=paramGrid_nb,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds= 5) 
In [34]:
cvModel_nb = crossval_nb.fit(trainDF)

Shown below are the average area under the curve values for the ten smoothing values. We see the best value is 0.8932204672602355.

In [57]:
cvModel_nb.avgMetrics
Out[57]:
[0.8932204672602355,
 0.8845042603061242,
 0.8805366509217409,
 0.8777792239323049,
 0.8756333853762843,
 0.8740922267500476,
 0.8728146322696352,
 0.8716234788548282,
 0.8705026371694609,
 0.8694540174664303]

Make predictions:

In [61]:
predictions_nb = cvModel_nb.transform(testDF)
predictions_nb.select('label', 'prediction').show(5)
+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 5 rows

We see that this model missed some ham messages but it has better performance in identifying spam message than the logistic regression we built above.

In [62]:
predictions_nb.groupBy('label','prediction').count().show()
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       1.0|    5|
|  1.0|       1.0|  921|
|  0.0|       0.0|  140|
|  1.0|       0.0|   58|
+-----+----------+-----+

From the accuracy, F1 score and area under the curve values shown below, we notice that the performance of the logistic regression is better than the Naive Bayes Model.

In [63]:
my_eval_nb = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
my_eval_nb.evaluate(predictions_nb)
Out[63]:
0.9531365573597268
In [65]:
my_mc_nb = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
my_mc_nb.evaluate(predictions_nb)
Out[65]:
0.9475008620872061
In [66]:
my_mc_nb = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_nb.evaluate(predictions_nb)
Out[66]:
0.943950177935943
We will see the other classification models in the next post.
comments powered by Disqus