-
Notifications
You must be signed in to change notification settings - Fork 969
/
whatsapp.py
112 lines (88 loc) · 3.36 KB
/
whatsapp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import base64
import logging
import os
import sqlite3
from typing import Optional, Union
from mvt.common.utils import check_for_links, convert_unix_to_iso
from .base import AndroidExtraction
WHATSAPP_PATH = "data/data/com.whatsapp/databases/msgstore.db"
class Whatsapp(AndroidExtraction):
"""This module extracts all WhatsApp messages containing links."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
text = record["data"].replace("\n", "\\n")
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": f"whatsapp_msg_{record['direction']}",
"data": f'"{text}"',
}
def check_indicators(self) -> None:
if not self.indicators:
return
for message in self.results:
if "data" not in message:
continue
message_links = check_for_links(message["data"])
if self.indicators.check_domains(message_links):
self.detected.append(message)
def _parse_db(self, db_path: str) -> None:
"""Parse an Android msgstore.db WhatsApp database file.
:param db_path: Path to the Android WhatsApp database file to process
"""
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute(
"""
SELECT * FROM messages;
"""
)
names = [description[0] for description in cur.description]
messages = []
for item in cur:
message = {}
for index, value in enumerate(item):
message[names[index]] = value
if not message["data"]:
continue
message["direction"] = "send" if message["key_from_me"] == 1 else "received"
message["isodate"] = convert_unix_to_iso(message["timestamp"])
# If we find links in the messages or if they are empty we add them
# to the list.
if check_for_links(message["data"]) or message["data"].strip() == "":
if message.get("thumb_image"):
message["thumb_image"] = base64.b64encode(message["thumb_image"])
messages.append(message)
cur.close()
conn.close()
self.log.info(
"Extracted a total of %d WhatsApp messages containing links", len(messages)
)
self.results = messages
def run(self) -> None:
self._adb_connect()
try:
self._adb_process_file(os.path.join("/", WHATSAPP_PATH), self._parse_db)
except Exception as exc:
self.log.error(exc)
self._adb_disconnect()