-
Notifications
You must be signed in to change notification settings - Fork 969
/
analytics.py
159 lines (136 loc) · 4.75 KB
/
analytics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import copy
import logging
import plistlib
import sqlite3
from typing import Optional, Union
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
ANALYTICS_DB_PATH = [
"private/var/Keychains/Analytics/*.db",
]
class Analytics(IOSExtraction):
"""This module extracts information from the
private/var/Keychains/Analytics/*.db files."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": record["artifact"],
"data": f"{record}",
}
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
for value in result.values():
if not isinstance(value, str):
continue
ioc = self.indicators.check_process(value)
if ioc:
self.log.warning(
'Found mention of a malicious process "%s" in %s file at %s',
value,
result["artifact"],
result["isodate"],
)
new_result = copy.copy(result)
new_result["matched_indicator"] = ioc
self.detected.append(new_result)
continue
ioc = self.indicators.check_domain(value)
if ioc:
self.log.warning(
'Found mention of a malicious domain "%s" in %s file at %s',
value,
result["artifact"],
result["isodate"],
)
new_result = copy.copy(result)
new_result["matched_indicator"] = ioc
self.detected.append(new_result)
def _extract_analytics_data(self):
artifact = self.file_path.split("/")[-1]
conn = self._open_sqlite_db(self.file_path)
cur = conn.cursor()
try:
cur.execute(
"""
SELECT
timestamp,
data
FROM hard_failures
UNION
SELECT
timestamp,
data
FROM soft_failures
UNION
SELECT
timestamp,
data
FROM all_events;
"""
)
except sqlite3.OperationalError:
cur.execute(
"""
SELECT
timestamp,
data
FROM hard_failures
UNION
SELECT
timestamp,
data
FROM soft_failures;
"""
)
for row in cur:
if row[0] and row[1]:
isodate = convert_mactime_to_iso(row[0], False)
data = plistlib.loads(row[1])
data["isodate"] = isodate
elif row[0]:
isodate = convert_mactime_to_iso(row[0], False)
data = {}
data["isodate"] = isodate
elif row[1]:
isodate = ""
data = plistlib.loads(row[1])
data["isodate"] = isodate
data["artifact"] = artifact
self.results.append(data)
cur.close()
conn.close()
def process_analytics_dbs(self):
for file_path in self._get_fs_files_from_patterns(ANALYTICS_DB_PATH):
self.file_path = file_path
self.log.info("Found Analytics database file at path: %s", file_path)
self._extract_analytics_data()
def run(self) -> None:
self.process_analytics_dbs()
self.log.info(
"Extracted %d records from analytics databases", len(self.results)
)
self.results = sorted(self.results, key=lambda entry: entry["isodate"])