Skip to content

Commit

Permalink
Removes unused utility code and moves around single use code
Browse files Browse the repository at this point in the history
- Fixes docstrings without linebreak when multiline
- Removes utilities and moves them to scripts using them
- Adds a test to iprange plugin
- Moves domain utils to certcheck plugin
  • Loading branch information
joniumGit committed May 10, 2023
1 parent 2c31543 commit fc71818
Show file tree
Hide file tree
Showing 16 changed files with 249 additions and 288 deletions.
3 changes: 2 additions & 1 deletion plugins/src/dnsmule_plugins/certcheck/certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ def collect_certificates(host: str, port: int, **kwargs) -> List[Certificate]:


def resolve_domain_from_certificate(cert: Certificate) -> List[str]:
"""Returns all names from a certificate retrieved from an ip-address
"""
Returns all names from a certificate retrieved from an ip-address
Common name is the first one if available followed by any alternative names
"""
Expand Down
70 changes: 70 additions & 0 deletions plugins/src/dnsmule_plugins/certcheck/domains.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from collections import deque
from typing import Iterable, TypeVar, Hashable

T = TypeVar('T')
R = TypeVar('R')
H = TypeVar('H', bound=Hashable)


def lagging_filter(lag: int):
"""
Filters our values from an iterator that were recently seen
The memory works using a first-in-first-out (FIFO) deque
"""
values = set()
latest = deque()

if lag <= 0:
raise ValueError('Invalid lag value')

def filter_function(item):
if item not in values:
if len(values) >= lag:
values.remove(latest.popleft())
latest.append(item)
values.add(item)
return True
else:
return False

return filter_function


def filter_locally_unique(iterable: Iterable[H], *, lag: int) -> Iterable[H]:
"""
Filters iterable to produce locally unique values
This could be useful for removing duplicate values from a very long,
but relatively sorted iterable.
Requires the iterable values to be hashable.
"""
return filter(lagging_filter(lag), iterable)


def spread_domain(domain: str) -> Iterable[str]:
"""Spreads a domain into all valid super domains
"""
parts = domain.strip().split('.')
if parts[0] == '*' or parts[0] == '':
parts = parts[1:]
if len(parts) > 2:
partial_domain = '.'.join(parts[-2:])
yield partial_domain
for i in range(-3, -len(parts) - 1, -1):
partial_domain = f'{parts[i]}.{partial_domain}'
yield partial_domain
elif len(parts) == 2:
yield '.'.join(parts)


def process_domains(domains: Iterable[str], *, lag: int = 100) -> Iterable[str]:
"""Best effort de-duplicates and removes star domains from the input and creates all valid super domains
"""
return filter_locally_unique((value for domain in domains for value in spread_domain(domain)), lag=lag)


__all__ = [
'process_domains',
]
3 changes: 2 additions & 1 deletion plugins/src/dnsmule_plugins/certcheck/rule.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Callable, Collection, List, Optional

from dnsmule import Rule, Result, Record
from dnsmule.utils import process_domains, extend_set, transform_set
from dnsmule.utils import extend_set, transform_set
from . import certificates
from .domains import process_domains


class CertChecker(Rule):
Expand Down
69 changes: 69 additions & 0 deletions plugins/test/test_certcheck_domains.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pytest

from dnsmule_plugins.certcheck.domains import lagging_filter, filter_locally_unique, process_domains, spread_domain


@pytest.mark.parametrize('i,lag,result', [
(
['a', 'a', 'a', 'b', 'b'],
1,
['a', 'b'],
),
(
['a', 'b', 'c', 'a', 'b', 'c'],
3,
['a', 'b', 'c'],
),
(
['a', 'b', 'c', 'd', 'a', 'b', 'c', 'd'],
3,
['a', 'b', 'c', 'd', 'a', 'b', 'c', 'd'],
),
(
['a', 'b', 'c', 'd', 'a', 'b', 'c', 'd'],
2,
['a', 'b', 'c', 'd', 'a', 'b', 'c', 'd'],
),
(
['a', 'b', 'c', 'd', 'a', 'a', 'b', 'b'],
2,
['a', 'b', 'c', 'd', 'a', 'b'],
),
])
def test_lagging_filter(i, lag, result):
assert [*filter_locally_unique(i, lag=lag)] == result, 'Did not filter'


@pytest.mark.parametrize('lag', [
0,
-1,
])
def test_lagging_filter_value_error(lag):
with pytest.raises(ValueError):
lagging_filter(lag)


@pytest.mark.parametrize('value,result', [
(
['a.b', 'b.c', 'd.e', 'a.e'],
['a.b', 'b.c', 'd.e', 'a.e'],
),
(
['a.b', '*.a.b', 'a.b', 'b.c.e'],
['a.b', 'b.c.e', 'c.e'],
)
])
def test_process_domains(value, result):
assert set(process_domains(value)) == set(result), 'Unexpected result'


def test_spread_domain_skips_tld():
assert [*spread_domain('.fi')] == [], 'Failed to skip TLD'


def test_spread_domain_short():
assert [*spread_domain('a.fi')] == ['a.fi'], 'Failed to short'


def test_spread_domain():
assert [*spread_domain('a.b.c.fi')] == ['c.fi', 'b.c.fi', 'a.b.c.fi'], 'Failed to produce all domains'
10 changes: 10 additions & 0 deletions plugins/test/test_ipranges.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,13 @@ def test_provider_empty_provider_fetch_ok():
rule = IpRangeChecker(providers=[], name='test_rule')
rule.check_fetch()
assert rule._last_fetch is not None, 'Failed fetch'


def test_fetcher_for_provider(monkeypatch):
sentinel = object()
with monkeypatch.context() as m:
from dnsmule_plugins.ipranges import ranges
m.setattr(ranges, 'fetch_mock_ip_ranges', lambda: sentinel, raising=False)
checker = IpRangeChecker(providers=['mock'])
checker.fetch_provider('mock')
assert checker._provider_ranges['mock'] is sentinel, 'Failed to get provider from package or failed to call'
18 changes: 16 additions & 2 deletions scripts/dstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,23 @@
"""
from argparse import ArgumentParser
from typing import List
from collections import Counter
from typing import List, Tuple, Callable, Iterable, TypeVar

from dnsmule.utils import count_by, load_data
from dnsmule.utils import load_data

T = TypeVar('T')
R = TypeVar('R')


def count_by(
iterable: Iterable[T],
f: Callable[[T], R],
n: int = None,
) -> Iterable[Tuple[R, int]]:
"""Counts values in an iterable using a given transformation
"""
return iter(Counter(map(f, iterable)).most_common(n=n))


def main(file_lines: List[str], n: int = 10):
Expand Down
3 changes: 2 additions & 1 deletion src/dnsmule/backends/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
class NOOPBackend(Backend):

def _query(self, target: Domain, *types: RRType) -> Iterable[Record]:
"""No-op"""
"""No-op
"""
yield from empty()


Expand Down
4 changes: 2 additions & 2 deletions src/dnsmule/definitions/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .domain import Domain
from .rrtype import RRType
from .tag import Tag
from ..utils import lmerge, Comparable
from ..utils import left_merge, Comparable

ResultData = Dict[str, Union['ResultData', List, str, int, float]]

Expand Down Expand Up @@ -38,7 +38,7 @@ def __add__(self, other: 'Result') -> 'Result':
else:
self.type.update(other.type)
self.tags.update(other.tags)
lmerge(self.data, other.data)
left_merge(self.data, other.data)
return self

def __bool__(self):
Expand Down
6 changes: 4 additions & 2 deletions src/dnsmule/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@


def import_class(import_string: str, parent: Type[T], relative: bool = True, package: str = None) -> Type[T]:
"""Imports a module and gets the last part as attribute from it
"""
Imports a module and gets the last part as attribute from it
**Note:** This allows arbitrary imports and code execution
"""
Expand Down Expand Up @@ -64,7 +65,8 @@ def load_and_append_rule(
rule_type: str,
config: Dict[str, Any],
) -> None:
"""Creates a rule from rule definition
"""
Creates a rule from rule definition
Initializes any dynamic rules created and passes the add_rule callback to them
"""
Expand Down
6 changes: 4 additions & 2 deletions src/dnsmule/mule.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def __init__(
raise ValueError('Missing required attributes (%s)' % ','.join(missing))

def append(self, config: Union[str, Path]) -> 'DNSMule':
"""Loads rules and plugins from a yaml configuration and adds them to this mule
"""
Loads rules and plugins from a yaml configuration and adds them to this mule
:raises ValueError: On duplicate rules
"""
Expand All @@ -77,7 +78,8 @@ def append(self, config: Union[str, Path]) -> 'DNSMule':
return self

def scan(self, *domains: Union[Iterable[Union[str, Domain]], Union[str, Domain]]) -> None:
"""Scans a domain with included rules and stores te result
"""
Scans a domain with included rules and stores te result
Can be called with either a single argument that is an iterable of domains
or multiple single domains.
Expand Down
2 changes: 0 additions & 2 deletions src/dnsmule/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
from .baseclasses import *
from .misc import *
from .domains import *
from .iterables import *
30 changes: 0 additions & 30 deletions src/dnsmule/utils/domains.py

This file was deleted.

65 changes: 0 additions & 65 deletions src/dnsmule/utils/iterables.py

This file was deleted.

Loading

0 comments on commit fc71818

Please sign in to comment.