From 75a168cdd9d289b79f048fa1c8d048c1287cdba2 Mon Sep 17 00:00:00 2001 From: Alex Kenion Date: Fri, 27 Sep 2024 11:45:49 -0400 Subject: [PATCH] Added support for filtering rules and fixed issue with %s placeholders in literal queries --- wordfence/cli/dbscan/dbscan.py | 10 +++++++ wordfence/cli/dbscan/definition.py | 24 +++++++++++++++ wordfence/databasescanning/scanner.py | 5 +++- wordfence/intel/database_rules.py | 43 +++++++++++++++++++++++++-- 4 files changed, 78 insertions(+), 4 deletions(-) diff --git a/wordfence/cli/dbscan/dbscan.py b/wordfence/cli/dbscan/dbscan.py index b53b5c6..c2910e1 100644 --- a/wordfence/cli/dbscan/dbscan.py +++ b/wordfence/cli/dbscan/dbscan.py @@ -161,6 +161,15 @@ def fetch_rules() -> DatabaseRuleSet: return cacheable.get(self.cache) + def _filter_rules(self, rule_set: DatabaseRuleSet) -> None: + included = None + if self.config.include_rules: + included = set(self.config.include_rules) + excluded = None + if self.config.exclude_rules: + excluded = set(self.config.exclude_rules) + rule_set.filter_rules(included, excluded) + def _load_rules(self) -> DatabaseRuleSet: rule_set = self._load_remote_rules() \ if self.config.use_remote_rules \ @@ -168,6 +177,7 @@ def _load_rules(self) -> DatabaseRuleSet: if self.config.rules_file is not not_set_token: for rules_file in self.config.rules_file: load_database_rules(rules_file, rule_set) + self._filter_rules(rule_set) return rule_set def invoke(self) -> int: diff --git a/wordfence/cli/dbscan/definition.py b/wordfence/cli/dbscan/definition.py index f833ae9..683a8e0 100644 --- a/wordfence/cli/dbscan/definition.py +++ b/wordfence/cli/dbscan/definition.py @@ -151,6 +151,30 @@ "meta": { "accepts_file": True } + }, + "exclude-rules": { + "short_name": "e", + "description": "Specify rule IDs to ignore when scanning. May be " + "comma-delimited and/or repeated.", + "context": "ALL", + "argument_type": "OPTION_REPEATABLE", + "default": None, + "meta": { + "separator": ",", + "value_type": int + } + }, + "include-rules": { + "short_name": "i", + "description": "Specify rule IDs to include when scanning. May be " + "comma-delimited and/or repeated.", + "context": "ALL", + "argument_type": "OPTION_REPEATABLE", + "default": None, + "meta": { + "separator": ",", + "value_type": int + } } } diff --git a/wordfence/databasescanning/scanner.py b/wordfence/databasescanning/scanner.py index fd820a1..4d86599 100644 --- a/wordfence/databasescanning/scanner.py +++ b/wordfence/databasescanning/scanner.py @@ -63,7 +63,10 @@ def _scan_table( f'{prefixed_table} WHERE ' + ' OR '.join(conditions) ) - for result in connection.query(query): + # Using a dict as the query parameters avoids %s from being + # interpreted as a placeholder (there is apparently no way + # to escape "%s" ("%%s" doesn't work) + for result in connection.query(query, {}): rule = self.rule_set.get_rule(result['rule_id']) del result['rule_id'] yield DatabaseScanResult( diff --git a/wordfence/intel/database_rules.py b/wordfence/intel/database_rules.py index 557aa5b..89fdcc9 100644 --- a/wordfence/intel/database_rules.py +++ b/wordfence/intel/database_rules.py @@ -18,6 +18,15 @@ def __init__( self.condition = condition self.description = description + def __hash__(self): + return hash(self.identifier) + + def __eq__(self, other) -> bool: + return ( + type(other) is type(self) + and other.identifier == self.identifier + ) + class DatabaseRuleSet: @@ -31,12 +40,27 @@ def add_rule(self, rule: DatabaseRule) -> None: raise Exception('Duplicate rule ID: {rule.identifier}') self.rules[rule.identifier] = rule if rule.tables is None: - self.global_rules.append(rule) + self.global_rules.add(rule) else: for table in rule.tables: if table not in self.table_rules: - self.table_rules[table] = [] - self.table_rules[table].append(rule) + self.table_rules[table] = set() + self.table_rules[table].add(rule) + + def remove_rule(self, rule_id: int) -> None: + try: + rule = self.rules.pop(rule_id) + if rule.tables is None: + self.global_rules.discard(rule) + else: + for table in rule.tables: + if table in list(self.table_rules.keys()): + table_rules = self.table_rules[table] + table_rules.discard(rule) + if len(table_rules) == 0: + del self.table_rules[table] + except KeyError: + pass # Rule doesn't exist, no need to remove def get_rules(self, table: str) -> List[DatabaseRule]: rules = [] @@ -53,6 +77,19 @@ def get_targeted_tables(self) -> List[str]: def get_rule(self, identifier: int) -> DatabaseRule: return self.rules[identifier] + def filter_rules( + self, + included: Optional[Set[int]] = None, + excluded: Optional[Set[int]] = None + ): + if included is not None: + for rule_id in list(self.rules.keys()): + if rule_id not in included: + self.remove_rule(rule_id) + if excluded is not None: + for rule_id in excluded: + self.remove_rule(rule_id) + JSON_VALIDATOR = ListValidator( DictionaryValidator({