-
Notifications
You must be signed in to change notification settings - Fork 0
/
websitewatcher.py
311 lines (240 loc) · 11.9 KB
/
websitewatcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
import os
import csv
import dns.resolver
import requests
from bs4 import BeautifulSoup
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
import sys
import re
list_file_domains = []
# Lists for actual Elements
html_tags = []
a_records = []
mx_records = []
# Lists for previous Elements
html_titles_compare = []
html_description_compare = []
a_records_compare = []
mx_records_compare = []
mail_account = {}
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36', 'Pragma': 'no-cache', 'Cache-Control': 'no-cache'}
desktop = os.path.join(os.path.expanduser('~'), 'websitewatcher')
def read_tracked_websites_file():
try:
file_domains = open(desktop + '/User Input/tracked_websites.txt', 'r', encoding='utf-8-sig')
for my_domains in file_domains:
domain = my_domains.replace("\n", "").lower().strip()
list_file_domains.append(domain)
file_domains.close()
except Exception as e:
print('Something went wrong with reading tracked_websites.txt Input File. Please check file.', e)
sys.exit()
def read_mail_account():
try:
file_domains = open(desktop + '/User Input/mail_account.txt', 'r', encoding='utf-8-sig')
for my_domains in file_domains:
(key, val) = my_domains.split(':', 1)
mail_account[key.strip()] = val.strip().lstrip('"').rstrip('"').replace("\n", "")
file_domains.close()
except Exception as e:
print('Something went wrong with reading mail_account.txt Input File. Please check file.', e)
sys.exit()
if mail_account['sender_address'] == '' or mail_account['password'] == '' or mail_account['recipient_address'] == '':
print('Something went wrong with reading mail_account.txt Input File. Please check if sender address, recipient address or password are not emtpy.')
sys.exit()
def html_tag_lookup(domain):
hey = []
domains = 'http://' + domain
request_session = requests.Session()
request_session.keep_alive = False
try:
response = request_session.get(domains, headers=headers, allow_redirects=True, timeout=(5, 30))
soup = BeautifulSoup(response.text, 'lxml')
hey.append(domain)
title = soup.find('title')
description = soup.find('meta', attrs={'name': 'description'})
if title is not None:
title_mod = re.sub(r'[\n\r\t\b\f\v]+', '', title.get_text())
hey.append(title_mod.lower().strip())
if description is not None:
description_mod = re.sub(r'[\n\r\t\b\f\v]+', '', description['content'])
hey.append(description_mod.lower().strip())
except (TypeError, AttributeError, requests.exceptions.ReadTimeout, KeyError):
print('Parsing Webpage Error. Something went wrong at scraping: ', domain)
except (requests.exceptions.ConnectionError, requests.exceptions.ConnectTimeout, requests.exceptions.TooManyRedirects):
print('Server Connection Error. Domain is probably not online: ', domain)
except Exception as e:
print('Other Error occured: ', e)
return list(filter(None, hey))
def mx_record(domain):
mx_temp = []
resolver = dns.resolver.Resolver()
resolver.timeout = 2
resolver.lifetime = 2
resolver.nameservers = ['8.8.8.8']
mx_temp.append(domain)
try:
content = ""
MX = resolver.resolve(domain, 'MX')
for answer in MX:
content = content + " " + str(answer)
mx_temp.append(sorted(content.lstrip().rstrip(".").split(". ")))
except Exception as e:
print(f'MX-Record lookup Error. Something went wrong by DNS lookup for domain {domain}', e)
return list(filter(None, mx_temp))
def a_record(domain):
a_temp = []
resolver = dns.resolver.Resolver()
resolver.timeout = 2
resolver.lifetime = 2
resolver.nameservers = ['8.8.8.8']
a_temp.append(domain)
try:
content = ""
A = resolver.resolve(domain, 'A')
for answer in A:
content = content + " " + str(answer)
a_temp.append(sorted(content.split()))
except Exception as e:
print(f'A-Record lookup Error. Something went wrong by DNS lookup for domain {domain}', e)
return list(filter(None, a_temp))
def html_tag_threading(n):
thread_ex_list = [y for y in list_file_domains]
print(len(thread_ex_list), 'Domains detected from file tracked_websites.txt\n')
with ThreadPoolExecutor(n) as executor:
results = executor.map(html_tag_lookup, thread_ex_list)
for result in results:
if result is not None and len(result) > 1:
html_tags.append(result)
return html_tags
def a_record_threading(n):
thread_ex_list = [y for y in list_file_domains]
with ThreadPoolExecutor(n) as executor:
results = executor.map(a_record, thread_ex_list)
for result in results:
if result is not None and len(result) > 1:
a_records.append(result)
return a_records
def mx_record_threading(n):
thread_ex_list = [y for y in list_file_domains]
with ThreadPoolExecutor(n) as executor:
results = executor.map(mx_record, thread_ex_list)
for result in results:
if result is not None and len(result) > 1:
mx_records.append(result)
return mx_records
def data_to_csv(input_data_csv, attribute_list):
for y in attribute_list:
if y[0] == input_data_csv and attribute_list == separate_into_html_title():
return y[1]
elif y[0] == input_data_csv and attribute_list == separate_into_html_description():
return y[1]
elif y[0] == input_data_csv:
dummy = ','.join([str(elem) for elem in y[1]])
return dummy
def postprocessing_outputfile():
df = pd.read_csv(f'{desktop}/website_changes.csv', delimiter=',', encoding='utf-8-sig')
df['MX-Record(s)'] = df.apply(lambda x: data_to_csv(x['Domains'], mx_records), axis=1)
#df['A-Record(s)'] = df.apply(lambda x: data_to_csv(x['Domains'], a_records), axis=1)
df['HTML-Title'] = df.apply(lambda x: data_to_csv(x['Domains'], separate_into_html_title()), axis=1)
df['HTML-Description'] = df.apply(lambda x: data_to_csv(x['Domains'], separate_into_html_description()), axis=1)
df.to_csv(f'{desktop}/website_changes.csv', index=False, encoding='utf-8-sig')
def model_csv_file():
console_file_path = f'{desktop}/website_changes.csv'
if not os.path.exists(console_file_path):
with open(console_file_path, mode='w', newline='', encoding='utf-8-sig') as f:
header = ['Domains', 'MX-Record(s)', 'HTML-Title', 'HTML-Description']
writer = csv.DictWriter(f, fieldnames=header)
writer.writeheader()
for domain in list_file_domains:
writer.writerow({'Domains': domain})
else:
file = open(console_file_path, mode='r', encoding='utf-8-sig')
csvreader = csv.DictReader(file, delimiter=',')
csv_domains = [row['Domains'] for row in csvreader]
added_input_domain = [k for k in list_file_domains if k not in csv_domains]
deleted_input_domains = [h for h in csv_domains if h not in list_file_domains]
print('New Website(s) added to tracked_websites.txt file: ', added_input_domain)
print('Old Website(s) deleted from tracked_websites.txt file: ', deleted_input_domains)
file.close()
if len(added_input_domain) > 0:
file_1 = open(console_file_path, mode='a', newline='', encoding='utf-8-sig')
writer = csv.writer(file_1, delimiter=',')
for k in added_input_domain:
writer.writerow([k])
file_1.close()
if len(deleted_input_domains) > 0:
df = pd.read_csv(f'{desktop}/website_changes.csv', delimiter=',', encoding='utf-8-sig')
for i in deleted_input_domains:
df = df.drop(df[df['Domains'] == i].index)
df.to_csv(f'{desktop}/website_changes.csv', index=False, encoding='utf-8-sig')
def load_old_attributes():
with open(f'{desktop}/website_changes.csv', 'r', encoding='utf-8-sig') as f:
csvreader = csv.DictReader(f, delimiter=',')
for row in csvreader:
mx_records_compare.append([row['Domains'], row['MX-Record(s)']])
#a_records_compare.append([row['Domains'], row['A-Record(s)']])
html_titles_compare.append([row['Domains'], row['HTML-Title']])
html_description_compare.append([row['Domains'], row['HTML-Description']])
def group_tuples_first_value(input):
out = {}
for elem in input:
try:
out[elem[0]].extend(elem[1:])
except KeyError:
out[elem[0]] = list(elem)
return [tuple(values) for values in out.values()]
def separate_into_html_title():
html_titles = [(y[0], y[1]) for y in html_tags]
return html_titles
def separate_into_html_description():
html_description = [(y[0], y[2]) for y in html_tags if len(y) > 2]
return html_description
def compare_changes():
#a_record_changes = [(i[0], 'A-Record has been changed or added. New Record: "{}"'.format(x)) for k in a_records_compare for i in a_records if k[0] == i[0] and k[1] is not None and k[1].split(',') != i[1] for x in i[1] if x not in k[1].split(',')]
mx_record_changes = [(i[0], 'MX-Record has been changed or added. New Record: "{}"'.format(x)) for k in mx_records_compare for i in mx_records if k[0] == i[0] and k[1] is not None and k[1].split(',') != i[1] for x in i[1] if x not in k[1].split(',')]
html_title_changes = [(i[0], 'Webpage Content has been changed or added. New Website Title: "{}"'.format(i[1])) for k in html_titles_compare for i in html_tags if k[0] == i[0] and k[1] != i[1]]
html_description_changes = [(i[0], 'Webpage Content has been changed or added. New Website Description: "{}"'.format(i[2])) for k in html_description_compare for i in html_tags if len(i) > 2 if k[0] == i[0] and k[1] != i[2]]
sum_changes = mx_record_changes + html_title_changes + html_description_changes
if len(sum_changes) > 0:
output_changes = group_tuples_first_value(sum_changes)
new_comprehension = "\n\n".join(str(row) for row in output_changes)
return new_comprehension
def send_email_fct():
fromaddr = mail_account['sender_address']
mdpfrom = mail_account['password']
toaddr = mail_account['recipient_address']
msg = MIMEMultipart()
msg['From'] = fromaddr
msg['To'] = toaddr
msg['Subject'] = "Notification Website Watcher - Changes were detected"
body_email = f"This Mail was automatically generated and provide information about detected changes of monitored websites in file tracked_websites.txt.\n\n" \
f"Quantity of current Websites to track changes: {len(list_file_domains)} Websites.\n\n" \
f"Following events have been changed or added: \n\n{compare_changes()}"
msg.attach(MIMEText(body_email, 'plain'))
try:
s = smtplib.SMTP('smtp.gmail.com', 587)
s.starttls()
s.login(fromaddr, mdpfrom)
try:
s.sendmail(fromaddr, toaddr, msg.as_string())
print('Email Sent')
finally:
s.quit()
except Exception as E:
print('Mail failed: {}'.format(str(E)))
if __name__=='__main__':
read_tracked_websites_file()
read_mail_account()
model_csv_file()
load_old_attributes()
html_tag_threading(50)
#a_record_threading(50)
mx_record_threading(50)
postprocessing_outputfile()
if compare_changes() is not None:
send_email_fct()