Multi-class Text Classification Using Apache Spark MLlib

Spark MLlib

MLlib is a library for machine learning in Spark that aims to make it easy to use and scalable for practical applications. It includes tools for common ML tasks, such as classification, regression, clustering, and collaborative filtering, as well as featurization methods for feature extraction, transformation, dimensionality reduction, and selection. MLlib also provides tools for building, evaluating, and tuning ML pipelines, as well as utilities for linear algebra, statistics, and data handling.

Spark offers APIs in several languages, including Java, Scala, Python, and R. These APIs allow developers to use Spark's powerful distributed computing capabilities to build a wide variety of applications, including data processing pipelines, machine learning models, and real-time streaming applications.

For this tutorial, we are going to use the Python API, which is called PySpark.

Read more about it from the official documentation.

Why using Spark over other ML frameworks?

One of the main benefits of using Spark MLlib over Tensorflow or PyTorch or any other ML frameworks is that it is designed to be highly scalable and efficient, especially when working with large datasets. Spark's distributed computing engine allows MLlib to distribute training and inference processes across multiple machines, which can significantly speed up the training and evaluation of machine learning models. Additionally, MLlib includes a wide range of machine learning algorithms and utilities, which makes it a convenient and comprehensive tool for building and deploying machine learning models in a production environment. By comparison, Tensorflow and PyTorch are primarily focused on providing low-level building blocks for developing machine learning models, and do not include as many high-level tools for distributed training and deployment.

Project Goal

In this project, we will make a classifier that can classify sentences into different emotions. As a target label, we have six different types of emotions:

1[joy, love, anger, fear, surprise, sadness]

Given an input text, our Machine Learning model should be able to tell which emotion the sentence expresses.

Dataset

The dataset we will use in this tutorial is emotional dataset for NLP and can be downloaded from Kaggle.

Install Dependencies

1!pip install pyspark
2!pip install pandas

Load the data

The dataset is divided into train test and validation, and all are in txt format. We will only use the train.txtfile. Let's first load the data using Pandas and convert it to csv so that we can load it using Spark. You can use Jupyter notebook or Google Colab executing the following code.

 1import pandas as pd
 2
 3# set panas to print full text
 4pd.set_option("display.max_columns", None)
 5pd.set_option("display.max_rows", None)
 6pd.set_option("display.max_colwidth", None)
 7
 8# hide pyspark warnings
 9import warnings
10warnings.filterwarnings("ignore")
11
12# read the data which is in txt format
13df = pd.read_csv("train.txt", names=["text", "emotion"], delimiter=";")
14
15# let's save the data in csv format with header
16df.to_csv("train.csv", index=False, header=True)
17
18# print the first 2 rows
19df.head(2)

output

text emotion
0 i didnt feel humiliated sadness
1 i can go from feeling so hopeless to so damned hopeful just from being around someone who cares and is awake sadness

Create Spark Session

 1# import all the required libraries for creating spark session
 2import pyspark
 3from pyspark import SparkContext
 4from pyspark.sql import SparkSession
 5
 6# Create a SparkContext instance
 7spark_context = SparkContext(master="local")
 8
 9# Creating a spark session.
10spark = SparkSession.builder.appName("Emotion Detection").getOrCreate()
11print(spark_context)

This will print something like below:

 1SparkContext
 2
 3Spark UI
 4
 5    Version
 6    v3.3.1
 7    Master
 8    local
 9    AppName
10    pyspark-shell

Load CSV data using Spark

1# read the train.csv file
2df = spark.read.csv("train.csv", header=True, inferSchema=True)
3df.show(2)

output

1+--------------------+-------+
2|                text|emotion|
3+--------------------+-------+
4|i didnt feel humi...|sadness|
5|i can go from fee...|sadness|
6+--------------------+-------+

Basic EDA

Let's see some facts about out data.

1# show how many null values are there in each column using pandas
2df.toPandas().isnull().sum()

output

1text       0
2emotion    0
3dtype: int64

code

1# display the data distribution of each emotion
2df.groupBy("emotion").count().show()

output

 1+--------+-----+
 2| emotion|count|
 3+--------+-----+
 4|     joy| 5362|
 5|    love| 1304|
 6|   anger| 2159|
 7|    fear| 1937|
 8|surprise|  572|
 9| sadness| 4666|
10+--------+-----+

Feature Engineering

Feature engineering is a crucial step in the process of building machine learning models. The reason for this is that most machine learning algorithms require input data in the form of numeric features, and therefore, we need to convert any categorical data into a numerical representation. In natural language processing, there are several techniques that can be used to convert text data into numerical features, such as CountVectorizer, BagOfWords, TFIDF, OneHotEncoder, WordEmbeddings, and HashingTF. To build a model that can predict a given outcome, we can create a pipeline that includes these feature engineering methods, as well as the machine learning model that will be used for prediction. The pipeline will take care of converting the raw input data into a format that the model can understand, making it easier to build and deploy machine learning models in a production environment.

code

 1from pyspark.ml.feature import (
 2    Tokenizer,
 3    StopWordsRemover,
 4    CountVectorizer,
 5    IDF,
 6    StringIndexer,
 7)
 8
 9# tokenize the text column
10tokenizer = Tokenizer(inputCol="text", outputCol="words")
11
12# remove the stop words
13stopwords_remover = StopWordsRemover(
14    inputCol=tokenizer.getOutputCol(), outputCol="filtered"
15)
16
17# convert the words to vectors
18count_vectorizer = CountVectorizer(
19    inputCol=stopwords_remover.getOutputCol(), outputCol="raw_features"
20)
21
22# calculate the idf
23idf = IDF(inputCol=count_vectorizer.getOutputCol(), outputCol="features")
24
25
26# convert the emotion column to label
27label_stringIdx = StringIndexer(inputCol="emotion", outputCol="label")
28
29# import the required libraries for creating the pipeline
30from pyspark.ml import Pipeline
31
32# create the pipeline
33pipeline = Pipeline(
34    stages=[tokenizer, stopwords_remover, count_vectorizer, idf, label_stringIdx]
35)
36
37# fit the pipeline to the data
38pipeline_fit = pipeline.fit(df)
39
40# transform the data
41dataset = pipeline_fit.transform(df)
42
43# display the first 2 rows
44dataset.show(2)

output

1+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+
2|                text|emotion|               words|            filtered|        raw_features|            features|label|
3+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+
4|i didnt feel humi...|sadness|[i, didnt, feel, ...|[didnt, feel, hum...|(15082,[0,48,567]...|(15082,[0,48,567]...|  1.0|
5|i can go from fee...|sadness|[i, can, go, from...|[go, feeling, hop...|(15082,[1,29,42,5...|(15082,[1,29,42,5...|  1.0|
6+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+

code

1# show which emotions belongs to which encoded label
2dataset.select("emotion", "label").distinct().show()

output

 1+--------+-----+
 2| emotion|label|
 3+--------+-----+
 4|    love|  4.0|
 5|     joy|  0.0|
 6|surprise|  5.0|
 7|   anger|  2.0|
 8| sadness|  1.0|
 9|    fear|  3.0|
10+--------+-----+

code

1# split the data into train and test
2train, test = dataset.randomSplit([0.7, 0.3], seed=100)
3
4# print the number of rows in train and test
5print("Training Dataset Count: " + str(train.count()))
6print("Test Dataset Count: " + str(test.count()))

output

1Training Dataset Count: 11246
2Test Dataset Count: 4754

Build Model 1

code

 1# build the model
 2from pyspark.ml.classification import NaiveBayes
 3from pyspark.ml.evaluation import MulticlassClassificationEvaluator
 4
 5
 6# create the model
 7nb = NaiveBayes(featuresCol="features", labelCol="label", predictionCol="prediction")
 8
 9# fit the model to the train data
10model = nb.fit(train)
11
12# predict the test data
13predictions = model.transform(test)
14
15# create the evaluator
16evaluator = MulticlassClassificationEvaluator(
17    labelCol="label", predictionCol="prediction", metricName="accuracy"
18)
19
20# calculate the accuracy
21accuracy = evaluator.evaluate(predictions)
22print("Test set accuracy = " + str(accuracy))
23
24# create the evaluator
25evaluator = MulticlassClassificationEvaluator(
26    labelCol="label", predictionCol="prediction", metricName="f1"
27)
28
29# calculate the f1 score
30f1_score = evaluator.evaluate(predictions)
31print("Test set f1 score = " + str(f1_score))

output

1Test set accuracy = 0.6903660075725705
2Test set f1 score = 0.703776482543677

So, we are getting around 70% f1-score.

Let's see some correct predictions.

code

1# some predictions that are correct display only the emotion, label, prediction
2predictions.filter(predictions.label == predictions.prediction).select(
3    "emotion", "label", "prediction"
4).show(5)

output

1+-------+-----+----------+
2|emotion|label|prediction|
3+-------+-----+----------+
4|   fear|  3.0|       3.0|
5|   fear|  3.0|       3.0|
6|sadness|  1.0|       1.0|
7|    joy|  0.0|       0.0|
8|  anger|  2.0|       2.0|
9+-------+-----+----------+

Let's see how many are correct predictions and how many are wrong predictions.

1# how many predictions are correct and how many are wrong
2predictions.filter(predictions.label == predictions.prediction).count()  # outputs 3282
3
4# how many predictions are correct and how many are wrong
5predictions.filter(predictions.label != predictions.prediction).count()  # outputs 1472

Build Model 2

Let's try another simple model.

 1# Logistic Regression
 2from pyspark.ml.classification import LogisticRegression
 3
 4# create the model
 5lr = LogisticRegression(
 6    featuresCol="features", labelCol="label", predictionCol="prediction"
 7)
 8
 9# fit the model to the train data
10model = lr.fit(train)
11
12# predict the test data
13predictions = model.transform(test)
14
15# create the evaluator
16evaluator = MulticlassClassificationEvaluator(
17    labelCol="label", predictionCol="prediction", metricName="accuracy"
18)
19
20# calculate the accuracy
21accuracy = evaluator.evaluate(predictions)
22
23# print the accuracy
24print("Test set accuracy = " + str(accuracy))
25
26# create the evaluator
27evaluator = MulticlassClassificationEvaluator(
28    labelCol="label", predictionCol="prediction", metricName="f1"
29)
30
31# calculate the f1 score
32f1_score = evaluator.evaluate(predictions)
33
34# print the f1 score
35print("Test set f1 score = " + str(f1_score))

output

1Test set accuracy = 0.8300378628523348
2Test set f1 score = 0.8317629919611031

We can see that the accuracy and f1 have been increased by using LogisticRegression than the NaiveBayes.

You can try out other models as well. See all the available models that comes with Spark MLlib for classification and regression.

Author: Sadman Kabir Soumik

Posts in this Series