Skip to content

Commit

Permalink
Added support for filtering rules and fixed issue with %s placeholder…
Browse files Browse the repository at this point in the history
…s in literal queries
  • Loading branch information
akenion committed Sep 27, 2024
1 parent b4c9ead commit 75a168c
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 4 deletions.
10 changes: 10 additions & 0 deletions wordfence/cli/dbscan/dbscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,23 @@ def fetch_rules() -> DatabaseRuleSet:

return cacheable.get(self.cache)

def _filter_rules(self, rule_set: DatabaseRuleSet) -> None:
included = None
if self.config.include_rules:
included = set(self.config.include_rules)
excluded = None
if self.config.exclude_rules:
excluded = set(self.config.exclude_rules)
rule_set.filter_rules(included, excluded)

def _load_rules(self) -> DatabaseRuleSet:
rule_set = self._load_remote_rules() \
if self.config.use_remote_rules \
else DatabaseRuleSet()
if self.config.rules_file is not not_set_token:
for rules_file in self.config.rules_file:
load_database_rules(rules_file, rule_set)
self._filter_rules(rule_set)
return rule_set

def invoke(self) -> int:
Expand Down
24 changes: 24 additions & 0 deletions wordfence/cli/dbscan/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,30 @@
"meta": {
"accepts_file": True
}
},
"exclude-rules": {
"short_name": "e",
"description": "Specify rule IDs to ignore when scanning. May be "
"comma-delimited and/or repeated.",
"context": "ALL",
"argument_type": "OPTION_REPEATABLE",
"default": None,
"meta": {
"separator": ",",
"value_type": int
}
},
"include-rules": {
"short_name": "i",
"description": "Specify rule IDs to include when scanning. May be "
"comma-delimited and/or repeated.",
"context": "ALL",
"argument_type": "OPTION_REPEATABLE",
"default": None,
"meta": {
"separator": ",",
"value_type": int
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion wordfence/databasescanning/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ def _scan_table(
f'{prefixed_table} WHERE '
+ ' OR '.join(conditions)
)
for result in connection.query(query):
# Using a dict as the query parameters avoids %s from being
# interpreted as a placeholder (there is apparently no way
# to escape "%s" ("%%s" doesn't work)
for result in connection.query(query, {}):
rule = self.rule_set.get_rule(result['rule_id'])
del result['rule_id']
yield DatabaseScanResult(
Expand Down
43 changes: 40 additions & 3 deletions wordfence/intel/database_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ def __init__(
self.condition = condition
self.description = description

def __hash__(self):
return hash(self.identifier)

def __eq__(self, other) -> bool:
return (
type(other) is type(self)
and other.identifier == self.identifier
)


class DatabaseRuleSet:

Expand All @@ -31,12 +40,27 @@ def add_rule(self, rule: DatabaseRule) -> None:
raise Exception('Duplicate rule ID: {rule.identifier}')
self.rules[rule.identifier] = rule
if rule.tables is None:
self.global_rules.append(rule)
self.global_rules.add(rule)
else:
for table in rule.tables:
if table not in self.table_rules:
self.table_rules[table] = []
self.table_rules[table].append(rule)
self.table_rules[table] = set()
self.table_rules[table].add(rule)

def remove_rule(self, rule_id: int) -> None:
try:
rule = self.rules.pop(rule_id)
if rule.tables is None:
self.global_rules.discard(rule)
else:
for table in rule.tables:
if table in list(self.table_rules.keys()):
table_rules = self.table_rules[table]
table_rules.discard(rule)
if len(table_rules) == 0:
del self.table_rules[table]
except KeyError:
pass # Rule doesn't exist, no need to remove

def get_rules(self, table: str) -> List[DatabaseRule]:
rules = []
Expand All @@ -53,6 +77,19 @@ def get_targeted_tables(self) -> List[str]:
def get_rule(self, identifier: int) -> DatabaseRule:
return self.rules[identifier]

def filter_rules(
self,
included: Optional[Set[int]] = None,
excluded: Optional[Set[int]] = None
):
if included is not None:
for rule_id in list(self.rules.keys()):
if rule_id not in included:
self.remove_rule(rule_id)
if excluded is not None:
for rule_id in excluded:
self.remove_rule(rule_id)


JSON_VALIDATOR = ListValidator(
DictionaryValidator({
Expand Down

0 comments on commit 75a168c

Please sign in to comment.