-
Notifications
You must be signed in to change notification settings - Fork 0
/
spam_filter.py
133 lines (102 loc) · 4.73 KB
/
spam_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
############ SPAM FILTERING PROGRAM CODE (.py file) #############
## Import required packages
from __future__ import print_function
import sys
from pyspark.sql import SparkSession
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
if __name__=="__main__":
print("This is the name of the script: ", sys.argv[0])
print("Number of arguments: ", len(sys.argv))
print("The arguments are: ", str(sys.argv))
## Exception handling
if len(sys.argv)!=4:
print("Usage: logit model: ", file=sys.stderr)
exit(-1)
## Define input path to files - spam, non-spam, and query
inputPath1=sys.argv[1]
inputPath2 = sys.argv[2]
inputPath3 = sys.argv[3]
## Create an instance of a SparkSession object
spark=SparkSession\
.builder\
.appName("SpamFiltering")\
.getOrCreate()
sc= spark.sparkContext
## Read input files
nospam_emails = sc.textFile(inputPath1)
spam_emails = sc.textFile(inputPath2)
query_emails=sc.textFile(inputPath3)
## Convert all words into tokens and calculate their frequency
tf=HashingTF(numFeatures=10000)
spam_Features=spam_emails.map(lambda x: tf.transform(x.split()))
nospam_Features=nospam_emails.map(lambda x: tf.transform(x.split()))
## Create LabeledPoint for spam as '1' and non-spam emails as '0'
spam=spam_Features.map(lambda x: LabeledPoint(1,x))
nospam=nospam_Features.map(lambda x: LabeledPoint(0,x))
## Create the training set by combining spam and non-spam emails together
combined_emails=spam.union(nospam)
## Use the combined data to train the Logistic regression model
model=LogisticRegressionWithSGD.train(combined_emails)
## Classify query emails into spam and non-spam based on trained model
## First apply the same HashingTF and create tokens/features
query_Features=query_emails.map(lambda x: tf.transform(x.split()))
query_classify=model.predict(query_Features)
## Concatenate the classification prediction with email text
query_output=query_classify.zip(query_emails)
## Calculate the accuracy of full model : spam + non-spam
pred=combined_emails.map(lambda x: (x.label,model.predict(x.features)))
accuracy_model=pred.filter(lambda (act,pred):act==pred).count()/float(combined_emails.count())*100.0
## Spam accuracy of the model
predSpam=spam.map(lambda x: (x.label,model.predict(x.features)))
accuracy_spam=predSpam.filter(lambda (act,pred):act==pred).count()/float(spam.count())*100.0
## Non-Spam accuracy of the model
predNoSpam=nospam.map(lambda x: (x.label,model.predict(x.features)))
accuracy_nospam=predNoSpam.filter(lambda (act,pred):act==pred).count()/float(nospam.count())*100.0
print("\n")
print("************* CLASSIFYING QUERY EMAILS ******************")
df=spark.createDataFrame(query_output)
df.show()
#print(query_output)
print("\n")
spark.stop()
print("************* SPAM FILTERING MODEL ACCURACY ******************")
print("Accuracy of the model: "+ str(accuracy_model)+ " %")
print("************* SPAM FILTERING SPAM ACCURACY *******************")
print("Accuracy of the model: "+ str(accuracy_spam)+ " %")
print("************* SPAM FILTERING NON-SPAM ACCURACY ***************")
print("Accuracy of the model: "+ str(accuracy_nospam)+ " %")
############ SPARK- SUBMIT COMMAND SET ##############
## First created four variables to add fexibility for argument path selection, then ran spark submit command
dhanshrivm@dhanshrivm-VirtualBox:~$ PROG="/media/sf_Ubuntu_Shared/spamfilter.py"
dhanshrivm@dhanshrivm-VirtualBox:~$
dhanshrivm@dhanshrivm-VirtualBox:~$ INPUT_PATH1="/media/sf_Ubuntu_Shared/emails_nospam.txt"
dhanshrivm@dhanshrivm-VirtualBox:~$
dhanshrivm@dhanshrivm-VirtualBox:~$ INPUT_PATH2="/media/sf_Ubuntu_Shared/emails_spam.txt"
dhanshrivm@dhanshrivm-VirtualBox:~$
dhanshrivm@dhanshrivm-VirtualBox:~$ INPUT_PATH3="/media/sf_Ubuntu_Shared/query.txt"
dhanshrivm@dhanshrivm-VirtualBox:~$
dhanshrivm@dhanshrivm-VirtualBox:~$ ./Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit $PROG $INPUT_PATH1 $INPUT_PATH2 $INPUT_PATH3
##################### OUTPUT ######################
************* CLASSIFYING QUERY EMAILS ******************
+---+--------------------+
| _1| _2|
+---+--------------------+
| 1|this is a year of...|
| 1|you are the lucky...|
| 1|Do not miss your ...|
| 1|Get real money fa...|
| 0|Dear Spark Learne...|
| 0|Hi Mom, Apologies...|
| 0|Wow, hey Fred, ju...|
| 0|Hi Spark user lis...|
| 1|Please do not rep...|
| 0|Hi Mahmoud, Are y...|
+---+--------------------+
************* SPAM FILTERING MODEL ACCURACY ******************
Accuracy of the model: 100.0 %
************* SPAM FILTERING SPAM ACCURACY *******************
Accuracy of the model: 100.0 %
************* SPAM FILTERING NON-SPAM ACCURACY ***************
Accuracy of the model: 100.0 %