diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 09207f3..e8df0a8 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -8,7 +8,7 @@ jobs: strategy: matrix: # This workflow can be matrixed against multiple Python versions if desired. eg. [3.7, 3.8, 3.9, "3.10"] - python-version: [ 3.8 ] + python-version: [ "3.11" ] steps: # Get the code from the repository to be linted, packaged, and pushed diff --git a/.gitignore b/.gitignore index 8e67221..a0803f7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ *~ .DS_Store .project +.vscode/ # Byte-compiled / optimized / DLL files __pycache__/ @@ -64,3 +65,6 @@ target/ # PyCharm /.idea + +/scratch +TODO.txt \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index fd4a93d..3e2ee28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,24 @@ # Changelog All notable changes to this project will be documented in this file. +## [3.14.0] - 2024-05-29 + +### Added +- Added `secret` and `key_derivation` fields to `EncryptionKey` metadata for storing KDF information. +- `construct` utility: + - Added `RegexSearch` and `RegexMatch` helpers. + - Added ability to embed constructs using `*` unpacking. + +### Changed +- All pefileutils functions that are meant to return lists now return empty lists when empty instead of `None`. +- The temporary directory created by `FileObject.temp_path()` will not be deleted if `keep` is set to True + or the `--keep-tmp` command line flag is used. The last directory created will be symbolically + linked to `mwcp_current`. +- Relaxed `construct` version requirement to support 2.9 or 2.10 + +### Removed +- Dropped support for Python 3.8 + ## [3.13.1] - 2023-11-29 @@ -641,7 +659,8 @@ It is assumed if you are not updating/adding tests. - Fixed broken markdown headings from @bryant1410 -[Unreleased]: https://github.com/dod-cyber-crime-center/DC3-MWCP/compare/3.13.1...HEAD +[Unreleased]: https://github.com/dod-cyber-crime-center/DC3-MWCP/compare/3.14.0...HEAD +[3.14.0]: https://github.com/dod-cyber-crime-center/DC3-MWCP/compare/3.13.1...3.14.0 [3.13.1]: https://github.com/dod-cyber-crime-center/DC3-MWCP/compare/3.13.0...3.13.1 [3.13.0]: https://github.com/dod-cyber-crime-center/DC3-MWCP/compare/3.12.0...3.13.0 [3.12.0]: https://github.com/dod-cyber-crime-center/DC3-MWCP/compare/3.11.0...3.12.0 diff --git a/docs/ParserComponents.md b/docs/ParserComponents.md index 18476e3..2846672 100644 --- a/docs/ParserComponents.md +++ b/docs/ParserComponents.md @@ -56,6 +56,10 @@ with self.file_object.open() as fo: A temporary file path can be generated in a context manager using `.temp_path()`. This can be helpful for external utilities that require a real file path. +This directory will not be deleted after processing if the environment variable `MWCP_KEEP_TMP` +is set to `true` or `1`. +The last temporary directory created will be symbolically linked to `mwcp_current`. (In Windows, "Developer Mode" must be enabled.) + ```python with self.file_object.temp_path() as file_path: _some_library_that_needs_a_path(file_path) diff --git a/mwcp/__init__.py b/mwcp/__init__.py index f3f3694..78a46ca 100644 --- a/mwcp/__init__.py +++ b/mwcp/__init__.py @@ -24,4 +24,4 @@ from mwcp.exceptions import * -__version__ = "3.13.1" +__version__ = "3.14.0" diff --git a/mwcp/cli.py b/mwcp/cli.py index 68ab007..bb694a2 100644 --- a/mwcp/cli.py +++ b/mwcp/cli.py @@ -358,11 +358,16 @@ def _parse_parameters(params) -> dict: "(e.g. --param aes_key:secret) " "This flag can be provided multiple times for multiple parameters." ) +@click.option( + "--keep-tmp", + is_flag=True, + help="Keep temporary files generated by FileObject.temp_path()" +) @click.argument("parser", required=True) @click.argument("input", nargs=-1, type=click.Path()) def parse( parser, input, yara_repo, recursive, format, split, output_dir, output_files, prefix, string_report, - include_filename, legacy, param + include_filename, legacy, param, keep_tmp ): """ Parses given input with given parser. @@ -385,6 +390,8 @@ def parse( if yara_repo: mwcp.config["YARA_REPO"] = yara_repo + if keep_tmp: + mwcp.config["KEEP_TMP"] = True # Python won't process wildcards when used through Windows command prompt. if any("*" in path for path in input): @@ -656,7 +663,11 @@ def _run_tests(tester, silent=False, show_passed=False): help="DEPRECATED: Display test case details for passed tests as well." "By default only failed tests are shown.", ) -@click.option("-s", "--silent", is_flag=True, help="Limit output to statement saying whether all tests passed or not.") +@click.option( + "-s", "--silent", + is_flag=True, + help="Limit output to statement saying whether all tests passed or not." +) @click.option( "--legacy/--no-legacy", default=False, @@ -711,11 +722,16 @@ def _run_tests(tester, silent=False, show_passed=False): help="Whether to include code coverage information for parser files. " "After tests are complete, reports can be generated using `coverage`. (e.g. `coverage html`)." ) +@click.option( + "--keep-tmp", + is_flag=True, + help="Keep temporary files generated by FileObject.temp_path()" +) # Parser to process. @click.argument("parser", nargs=-1, required=False) def test( testcase_dir, malware_repo, nprocs, update, add, add_filelist, delete, yes, force, last_failed, show_passed, - silent, legacy, exit_on_first, command, full_diff, yara_repo, recursive, param, cov, parser, + silent, legacy, exit_on_first, command, full_diff, yara_repo, recursive, param, cov, keep_tmp, parser, ): """ Testing utility to create and execute parser test cases. @@ -750,6 +766,8 @@ def test( mwcp.config["MALWARE_REPO"] = malware_repo if yara_repo: mwcp.config["YARA_REPO"] = yara_repo + if keep_tmp: + mwcp.config["KEEP_TMP"] = True # Add files listed in filelist to add option. if add_filelist: @@ -893,6 +911,8 @@ def test( pytest_args += ["--malware-repo", malware_repo] if yara_repo: pytest_args += ["--yara-repo", yara_repo] + if keep_tmp: + pytest_args += ["--keep-tmp"] if exit_on_first: pytest_args += ["-x"] diff --git a/mwcp/config/config.yml b/mwcp/config/config.yml index 6b1b136..9aa78dd 100644 --- a/mwcp/config/config.yml +++ b/mwcp/config/config.yml @@ -26,3 +26,6 @@ LOG_CONFIG_PATH: ./log_config.yml # Directory containing yara signatures. #YARA_REPO: ~/yara_repo + +# Keep temporary directory created by FileObject.temp_path() +# KEEP_TMP: false diff --git a/mwcp/config/schema.json b/mwcp/config/schema.json index 127fa4b..e47b69b 100644 --- a/mwcp/config/schema.json +++ b/mwcp/config/schema.json @@ -1,6 +1,6 @@ { "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "https://github.com/raw/dod-cyber-crime-center/DC3-MWCP/3.13.1/mwcp/config/schema.json", + "$id": "https://github.com/raw/dod-cyber-crime-center/DC3-MWCP/3.14.0/mwcp/config/schema.json", "title": "Report", "description": "Defines the report of all metadata elements.", "type": "object", @@ -333,6 +333,23 @@ "type": "null" } ] + }, + "secret": { + "anyOf": [ + { + "type": "string", + "contentEncoding": "base64" + }, + { + "type": "null" + } + ] + }, + "key_derivation": { + "type": [ + "string", + "null" + ] } }, "additionalProperties": false, @@ -421,6 +438,23 @@ "type": "null" } ] + }, + "secret": { + "anyOf": [ + { + "type": "string", + "contentEncoding": "base64" + }, + { + "type": "null" + } + ] + }, + "key_derivation": { + "type": [ + "string", + "null" + ] } }, "additionalProperties": false, diff --git a/mwcp/file_object.py b/mwcp/file_object.py index bf5dcad..4ee9a02 100644 --- a/mwcp/file_object.py +++ b/mwcp/file_object.py @@ -19,7 +19,7 @@ import pefile -from mwcp import metadata +from mwcp import metadata, config from mwcp.utils import elffileutils, pefileutils from mwcp.utils.stringutils import convert_to_unicode, sanitize_filename @@ -369,7 +369,7 @@ def compile_time(self) -> Optional[datetime.datetime]: return datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc) @contextlib.contextmanager - def temp_path(self): + def temp_path(self, keep=False): """ Context manager for creating a temporary full file path to the file object. This is useful for when you want to use this file on libraries which require @@ -382,9 +382,29 @@ def temp_path(self): with file_object.temp_path() as file_path: _some_library_that_needs_a_path(file_path) """ - # TODO: Provide and option to change location of temporary files through the use - # of the configuration file. - with tempfile.TemporaryDirectory(prefix="mwcp_") as tmpdir: + keep = ( + keep + or config.get("KEEP_TMP", False) + or os.environ.get("MWCP_KEEP_TMP", "false").lower() in ("true", "t", "yes", "y", "1") + ) + if keep: + tmpdir = tempfile.mkdtemp(prefix="mwcp_") + context = contextlib.nullcontext(tmpdir) + # Warn user since this should not be left on in production code. + logger.warning(f"Temporary directory '{tmpdir}' not set for deletion.") + # Set link to current temporary directory. + try: + mwcp_current = os.path.join(tempfile.gettempdir(), "mwcp_current") + if os.path.lexists(mwcp_current): + os.unlink(mwcp_current) + os.symlink(tmpdir, mwcp_current, target_is_directory=True) + except OSError: + # We can fail to create a symlink in Windows if "Developer Mode" is not enabled. + pass + else: + context = tempfile.TemporaryDirectory(prefix="mwcp_") + + with context as tmpdir: temp_file = os.path.join(tmpdir, sanitize_filename(self.name) if self.name else self.md5) with open(temp_file, "wb") as fo: fo.write(self.data) @@ -486,7 +506,7 @@ def output(self): self.reporter.add(metadata.File.from_file_object(self)) @contextlib.contextmanager - def disassembly(self, disassembler: str = None, report: Report = None, **config) -> ContextManager["dragodis.Disassembler"]: + def disassembly(self, disassembler: str = None, report: Report = None, keep=False, **config) -> ContextManager["dragodis.Disassembler"]: """ Produces a Dragodis Disassembler object for the file. Dragodis must be installed for this work. @@ -504,11 +524,12 @@ def disassembly(self, disassembler: str = None, report: Report = None, **config) :param report: Provide the Report object if you want the annotated disassembler project file to be added after processing. This is usually only recommended if the parser plans to annotate the disassembly. e.g. API resolution + :param keep: Whether to prevent the temporary directory from being deleted. """ if not dragodis: raise RuntimeError("Please install Dragodis to use this function.") - with self.temp_path() as file_path: + with self.temp_path(keep=keep) as file_path: with dragodis.open_program(file_path, disassembler, **config) as dis: bit_size = dis.bit_size yield dis diff --git a/mwcp/metadata.py b/mwcp/metadata.py index 3d23c42..0906715 100644 --- a/mwcp/metadata.py +++ b/mwcp/metadata.py @@ -1282,7 +1282,7 @@ def as_stix(self, base_object, fixed_timestamp=None) -> STIXResult: elif self.query: result.add_linked(stix_extensions.ObservedString(purpose="url-query", value=self.query)) else: - warnings.warn("Skipped creation of STIX string since the parser provided no URL data") + logger.warning("Skipped creation of STIX string since the parser provided no URL data") return result else: result.add_linked(stix.URL(value=self.url)) @@ -1726,11 +1726,20 @@ class EncryptionKey(Metadata): mode="ecb", iv=b"\x00\x00\x00\x00\x00\x00\x00\x01", ) + EncryptionKey( + b"\xa0u\xd1\x7f=E0s\x85?\x8188\xc1[\x80#\xb8\xc4\x87\x03\x8465O\xe5\x99\xc3\x94.\x1f\x95", + algorithm="aes", + mode="cbc", + secret=b"p@ssw0rd", + key_derivation="sha256" + ) """ key: bytes algorithm: str = None mode: str = None iv: bytes = None + secret: bytes = None + key_derivation: str = None # Tests encodings in order by preference. TEST_ENCODINGS = [ @@ -1789,18 +1798,22 @@ def _num_raw_bytes(string: str) -> int: count += char.startswith(b"\\x") + char.startswith(b"\\u") * 2 return count - def _detect_encoding(self) -> Optional[str]: + def _detect_encoding(self, data: bytes = None) -> Optional[str]: """ - Attempts to determine if the key can be encoded as a string. + Attempts to determine if the key (or another value) can be encoded as a string. :returns: Best guess encoding if successful. """ + if not data: + data = self.key + if not data: + return + # If user gave us the encoding, use that. if self._encoding_set: return self._encoding # NOTE: Much of this is taken from rugosa.detect_encoding() - data = self.key best_score = len(data) # lowest score is best best_code_page = None for code_page in self.TEST_ENCODINGS: @@ -1818,24 +1831,30 @@ def _detect_encoding(self) -> Optional[str]: return best_code_page - def as_formatted_dict(self, flat=False) -> dict: - # Convert key into hex number - key = f"0x{self.key.hex()}" - + def formatted_bytes(self, data: bytes) -> str: + # Convert into hex number + value = f"0x{data.hex()}" # Add context if encoding can be detected from key. - if encoding := self._detect_encoding(): - key += f' ("{self.key.decode(encoding)}")' + if encoding := self._detect_encoding(data): + value += f' ("{data.decode(encoding)}")' + return value + def as_formatted_dict(self, flat=False) -> dict: return { "tags": self.tags, - "key": key, + "key": self.formatted_bytes(self.key), "algorithm": self.algorithm, "mode": self.mode, "iv": f"0x{self.iv.hex()}" if self.iv else None, + "secret": self.formatted_bytes(self.secret) if self.secret else None, + "key_derivation": self.key_derivation, } def as_stix(self, base_object, fixed_timestamp=None) -> STIXResult: - params = {"key_hex": self.key.hex()} + params = {} + + if self.key: + params["key_hex"] = self.key.hex() if self.algorithm: params["algorithm"] = self.algorithm @@ -1846,6 +1865,12 @@ def as_stix(self, base_object, fixed_timestamp=None) -> STIXResult: if self.iv: params["iv_hex"] = self.iv.hex() + if self.secret: + params["secret_hex"] = self.secret.hex() + + if self.key_derivation: + params["key_derivation"] = self.key_derivation + result = STIXResult(fixed_timestamp=fixed_timestamp) result.add_linked(stix_extensions.SymmetricEncryption(**params)) result.create_tag_note(self, result.linked_stix[-1]) diff --git a/mwcp/parser_config.yml b/mwcp/parser_config.yml index c7c78a6..ad67210 100644 --- a/mwcp/parser_config.yml +++ b/mwcp/parser_config.yml @@ -51,7 +51,7 @@ Quarantined: - .SymantecQB - .SymantecSubSDK - .AhnLab - - .Avast + - .Avast_AVG RSA: description: RSA artifacts diff --git a/mwcp/parsers/Python.py b/mwcp/parsers/Python.py index 4efa310..e1bb84b 100644 --- a/mwcp/parsers/Python.py +++ b/mwcp/parsers/Python.py @@ -3,9 +3,12 @@ """ import os +from typing import Optional + from construct import this -from mwcp import Parser, FileObject +from mwcp import FileObject, Parser +from mwcp.metadata import Version from mwcp.utils import construct @@ -18,7 +21,7 @@ class PyInstaller(Parser): "compressed_size" / construct.Int32ub, "final_size" / construct.Int32ub, "flag" / construct.Flag, - "type" / construct.Flag, + "type" / construct.String(1), "name" / construct.String(this.entry_size - 18), "data" / construct.Pointer( this.offset, @@ -33,21 +36,24 @@ class PyInstaller(Parser): @classmethod def identify(cls, file_object): """ - Validate the MAGIC data is at the appropriate location + Validate the MAGIC data is at the appropriate location and return the correct spec to use + for parsing. """ magic = b'MEI\x0C\x0B\x0A\x0B\x0E' + # pyinstaller 2.0 if file_object.data[-24:-24 + len(magic)] == magic: - file_object.knowledge_base["COOKIE_SPEC"] = construct.Struct( + cookie_spec = construct.Struct( "magic" / construct.Const(magic), "package_size" / construct.Int32ub, "toc_offset" / construct.Int32ub, "toc_entries" / construct.Int32ub, "python_version" / construct.Int32ub, ) - return True + return True, cookie_spec + # pyinstaller 2.1+ elif file_object.data[-88:-88+len(magic)] == magic: - file_object.knowledge_base["COOKIE_SPEC"] = construct.Struct( + cookie_spec = construct.Struct( "magic" / construct.Const(magic), "package_size" / construct.Int32ub, "toc_offset" / construct.Int32ub, @@ -55,39 +61,46 @@ def identify(cls, file_object): "python_version" / construct.Int32ub, "python_dll" / construct.String(64), ) - return True + return True, cookie_spec return False - def extract_entry(self, name: str, data: bytes, pyver: int): + def extract_entry(self, entry, hdr: bytes) -> Optional[FileObject]: """ - Extracts table of contents entry. + Extracts file data from table entry and returns it as a FileObject. """ - ext = ".pyc" - if data[:4] != b'\x63\x00\x00\x00': - # This indicates the sample is an uncompiled python script - magic_number = b'' - header = b'' - ext = ".py" - elif pyver >= 37: # PEP 552 -- Deterministic pycs - header = b'\0' * 12 # Bitfield, Timestamp, size - magic_number = b"\x42\x0d\x0d\x0a" - elif pyver >= 33: - header = b'\0' * 8 # (Timestamp + size) - magic_number = b"\x42\x0d\x0d\x0a" - else: - header = b'\0' * 4 # Timestamp - magic_number = b"\x03\xF3\x0D\x0A" - data = magic_number + header + data - self.dispatcher.add(FileObject(data, file_name=name + ext)) + if not entry.data: + return + + name = entry.name + data = entry.data + + if entry.type in ('s', 'm', 'M'): # python script/module/package + if entry.type == 's' and entry.data[1:4] != b"\x00\x00\x00": # uncompiled python code + name += ".py" + else: + # it is a marshalled code object + # we need to add the pyc header to the data so it can be decompiled + name += ".pyc" + data = hdr + data - def run(self): + # TODO: Create a PYZ parser for extracting out individually compressed components. + # This will require determining a way to safely unmarshal data. + # (PyInstaller/loader/pyimod01_archive.py) + # case 'z': # zlib archive (pyz) + # case 'n': # symbolic link + # case 'b': # binary + # case 'Z': # zlib (pyz) - frozen Python code (zipfile) + # case 'x': # data + # case 'l': # splash resource + + return FileObject(data, file_name=name) + + def run(self, cookie_spec: construct.Struct): """ Extract the cookie information in order to extract and parse the table of contents. Identify the .manifest filename in order to obtain the name of the target script to add to the dispatcher. - """ - cookie_spec = self.file_object.knowledge_base["COOKIE_SPEC"] cookie_size = cookie_spec.sizeof() cookie = cookie_spec.parse(self.file_object.data[-cookie_size:]) @@ -98,16 +111,35 @@ def run(self): ) info = package_spec.parse(package) + python_version = str(cookie.python_version)[0] + "." + str(cookie.python_version)[1:] + self.report.add(Version(python_version).add_tag("Python")) + # Extract files base on .manifest files. + pyz = None target_names = [] for entry in info.toc: - if entry.name.endswith(".manifest"): + if entry.name == "PYZ-00.pyz": + pyz = entry + elif entry.name.endswith(".manifest"): target_names.append(os.path.splitext(entry.name)[0].replace(".exe", '')) - for entry in info.toc: - if entry.name in target_names: - self.extract_entry(entry.name, entry.data, cookie.python_version) - # Extract PYZ archives. - for entry in info.toc: - if entry.data.startswith(b"PYZ\x00"): - self.dispatcher.add(FileObject(entry.data, file_name=entry.name)) + # Determine header for pyc files. + if pyz: + hdr = pyz.data[4:8] + b'\x00' * 12 + elif cookie.python_version >= 37: # PEP 552 -- Deterministic pycs + hdr = b"\x42\x0d\x0d\x0a" + b'\0' * 12 # Bitfield, Timestamp, size + elif cookie.python_version >= 33: + hdr = b"\x42\x0d\x0d\x0a" + b'\0' * 8 # (Timestamp + size) + else: + hdr = b"\x03\xF3\x0D\x0A" + b'\0' * 4 # Timestamp + + # If we had a .manifest, only extract those files. + if target_names: + for entry in info.toc: + if entry.name in target_names or entry.data.startswith(b"PYZ\x00"): + if file := self.extract_entry(entry, hdr): + self.dispatcher.add(file) + else: + for entry in info.toc: + if file := self.extract_entry(entry, hdr): + self.dispatcher.add(file) diff --git a/mwcp/parsers/Quarantined.py b/mwcp/parsers/Quarantined.py index 3da0f29..e2f110b 100644 --- a/mwcp/parsers/Quarantined.py +++ b/mwcp/parsers/Quarantined.py @@ -256,8 +256,8 @@ def run(self): self.dispatcher.add(FileObject(decrypted_data)) -class Avast(Parser): - DESCRIPTION = "Avast" +class Avast_AVG(Parser): + DESCRIPTION = "Avast or AVG" KEY = bytearray( (0x33, 0xB6, 0x59, 0x83, 0x8B, 0x43, 0x75, 0xFB, 0x35, 0xB6, 0x8A, 0x37, 0xAE, 0x29, 0x16, 0x47, 0xA2, 0x51, 0x41, 0x4F, 0x69, 0x9A, 0x07, 0xF5, 0xF1, 0x69, 0x80, 0x89, 0x60, 0x15, 0x8E, 0xF6, 0xB2, 0x3B, 0x89, 0xC4, @@ -549,16 +549,16 @@ class Avast(Parser): @classmethod def identify(cls, file_object): """ - Identify an Avast Quarantined. + Identify an Avast or AVG Quarantined File. :param file_object: FileObject object - :return: Boolean value indicating if file is an Avast Quarantine File. + :return: Boolean value indicating if file is an Avast or AVG Quarantine File. """ return file_object.data.startswith(b"-chest- ") def run(self): data = self.file_object.data[8:] - block_size = 0x1000 + block_size = 0x10000 decrypted_data = bytearray() for i in range(0, len(data), block_size): decrypted_data += bytearray(d ^ k for (d, k) in zip(data[i:i + block_size], cycle(self.KEY))) - self.dispatcher.add(FileObject(bytes(decrypted_data))) + self.dispatcher.add(FileObject(bytes(decrypted_data))) \ No newline at end of file diff --git a/mwcp/stix/extensions.py b/mwcp/stix/extensions.py index ca9a692..7c45e6a 100644 --- a/mwcp/stix/extensions.py +++ b/mwcp/stix/extensions.py @@ -126,7 +126,9 @@ class SymmetricEncryption(_Observable): ("id", IDProperty(_type, spec_version="2.1")), ("key_hex", HexProperty()), ("iv_hex", HexProperty()), + ("secret_hex", HexProperty()), ("algorithm", StringProperty()), + ("key_derivation", StringProperty()), ("mode", StringProperty()), ( "object_marking_refs", @@ -141,7 +143,7 @@ class SymmetricEncryption(_Observable): ] ) - _id_contributing_properties = ["key_hex", "iv_hex", "mode", "algorithm"] + _id_contributing_properties = ["key_hex", "iv_hex", "mode", "algorithm", "secret_hex", "key_derivation"] def __init__(self, *args, **kwargs) -> None: # always make sure the property extension details are populated @@ -155,7 +157,7 @@ def __init__(self, *args, **kwargs) -> None: def _check_object_constraints(self) -> None: super()._check_object_constraints() - self._check_at_least_one_property(["key_hex", "iv_hex", "mode", "algorithm"]) + self._check_at_least_one_property(self._id_contributing_properties) class TriggerComponent(_STIXBase21): diff --git a/mwcp/tests/conftest.py b/mwcp/tests/conftest.py index 6f8c7f4..2513768 100644 --- a/mwcp/tests/conftest.py +++ b/mwcp/tests/conftest.py @@ -12,9 +12,8 @@ def pytest_configure(config): """ Registers custom markers. """ - config.addinivalue_line( - "markers", "parsers: mark to only test parsers" - ) + config.addinivalue_line("markers", "parsers: mark to only test parsers") + config.addinivalue_line("markers", "framework: mark to only test framework") def pytest_addoption(parser): @@ -37,6 +36,10 @@ def pytest_addoption(parser): "--full-diff", action="store_true", help="Whether to disable the custom unified diff view and instead use pytest's default full diff." ) + parser.addoption( + "--keep-tmp", action="store_true", + help="Whether to keep temporary files in temporary directory made by FileObject.", + ) def pytest_make_parametrize_id(config, val, argname): @@ -236,6 +239,7 @@ def metadata_items() -> List[Metadata]: metadata.Interval(3), metadata.EncryptionKey(b"hello", algorithm="rc4"), metadata.EncryptionKey(b"\xff\xff\xff\xff", algorithm="aes", mode="ecb", iv=b"\x00\x00\x00\x00"), + metadata.EncryptionKey(b"\xff\xff\xff\xff", algorithm="aes", mode="cbc", secret=b"p@ssw0rd", key_derivation="sha256"), metadata.DecodedString("GetProcess"), # Github issue #31 metadata.DecodedString( diff --git a/mwcp/tests/test_cli.py b/mwcp/tests/test_cli.py index e6ad61a..e858f09 100644 --- a/mwcp/tests/test_cli.py +++ b/mwcp/tests/test_cli.py @@ -78,12 +78,12 @@ def test_list(tmp_path, make_sample_parser): print(ret.stderr, file=sys.stderr) assert ret.exit_code == 0 - results = json.loads(ret.stdout, encoding="utf8") + results = json.loads(ret.stdout) assert len(results) > 1 for name, source_name, author, description in results: - if name == u"foo" and source_name == u"dc3": - assert author == u"DC3" - assert description == u"example parser that works on any file" + if name == "foo" and source_name == "dc3": + assert author == "DC3" + assert description == "example parser that works on any file" break else: pytest.fail("Sample parser was not listed.") @@ -101,13 +101,13 @@ def test_list(tmp_path, make_sample_parser): assert ret.exit_code == 0 # FIXME: This breaks if user has set up a PARSER_SOURCE in the configuration file. - results = json.loads(ret.stdout, encoding="utf8") + results = json.loads(ret.stdout) assert len(results) > 1 for name, source_name, author, description in results: if source_name == str(parser_dir): - assert name == u"Sample" - assert author == u"Mr. Tester" - assert description == u"A test parser" + assert name == "Sample" + assert author == "Mr. Tester" + assert description == "A test parser" break else: pytest.fail("Sample parser from parser directory was not listed.") @@ -122,9 +122,9 @@ def test_list(tmp_path, make_sample_parser): print(ret.stderr, file=sys.stderr) assert ret.exit_code == 0 - results = json.loads(ret.stdout, encoding="utf8") + results = json.loads(ret.stdout) assert results == [ - [u"Sample", str(parser_dir), u"Mr. Tester", u"A test parser"] + ["Sample", str(parser_dir), "Mr. Tester", "A test parser"] ] # Now try adding the config_file path to the __init__.py file in order to avoid having @@ -139,9 +139,9 @@ def test_list(tmp_path, make_sample_parser): print(ret.stderr, file=sys.stderr) assert ret.exit_code == 0 - results = json.loads(ret.stdout, encoding="utf8") + results = json.loads(ret.stdout) assert results == [ - [u"Sample", str(parser_dir), u"Mr. Tester", u"A test parser"] + ["Sample", str(parser_dir), "Mr. Tester", "A test parser"] ] @@ -295,7 +295,7 @@ def test_add_filelist_testcase(tmp_path): filelist.append((str(file), hashlib.md5(data).hexdigest())) filelist_txt = tmp_path / "filelist.txt" - filelist_txt.write_text(u"\n".join(file_path for file_path, _ in filelist), "utf8") + filelist_txt.write_text("\n".join(file_path for file_path, _ in filelist), "utf8") # Add a test case for our sample parser. ret = runner.invoke(cli.main, [ diff --git a/mwcp/tests/test_construct.py b/mwcp/tests/test_construct.py index 320e317..62e7646 100644 --- a/mwcp/tests/test_construct.py +++ b/mwcp/tests/test_construct.py @@ -9,22 +9,21 @@ from mwcp.utils import construct -@pytest.mark.xfail( - raises=ValueError, - reason="Doctest is producing a 'wrapper loop when unwrapping obj_' error" -) -def test_helpers(): +# @pytest.mark.xfail( +# raises=ValueError, +# reason="Doctest is producing a 'wrapper loop when unwrapping obj_' error" +# ) +@pytest.mark.parametrize("module", [ + construct.helpers, + construct.datetime_, + construct.network, + construct.windows_enums, + construct.windows_structures, +]) +def test_helpers(module): """Tests that the doctests for the helpers work.""" - helper_modules = [ - construct.helpers, - construct.datetime_, - construct.network, - construct.windows_enums, - construct.windows_structures - ] - for module in helper_modules: - results = doctest.testmod(module) - assert not results.failed + results = doctest.testmod(module) + assert not results.failed def test_html(): diff --git a/mwcp/tests/test_parsers.py b/mwcp/tests/test_parsers.py index ab266d7..b135641 100644 --- a/mwcp/tests/test_parsers.py +++ b/mwcp/tests/test_parsers.py @@ -32,6 +32,8 @@ def _setup(config): mwcp.config["MALWARE_REPO"] = malware_repo if yara_repo: mwcp.config["YARA_REPO"] = yara_repo + if config.option.keep_tmp: + mwcp.config["KEEP_TMP"] = True mwcp.config.validate() @@ -293,6 +295,16 @@ class MockReport(list): if item["type"] == "command": item["cwd"] = None + # Version 3.14 adds secret and key_derivation to EncryptionKey + if expected_results_version < version.parse("3.14.0"): + for item in expected_results["metadata"]: + if item["type"] == "encryption_key": + item["secret"] = None + item["key_derivation"] = None + elif item["type"] == "decoded_string" and item["encryption_key"]: + item["encryption_key"]["secret"] = None + item["encryption_key"]["key_derivation"] = None + # The order the metadata comes in doesn't matter and shouldn't fail the test. # (Using custom repr to ensure dictionary keys are sorted before repr is applied.) custom_repr = lambda d: repr(dict(sorted(d.items())) if isinstance(d, dict) else d) diff --git a/mwcp/tests/test_report/report.json b/mwcp/tests/test_report/report.json index 9f391da..61e4492 100644 --- a/mwcp/tests/test_report/report.json +++ b/mwcp/tests/test_report/report.json @@ -654,7 +654,9 @@ "key": "aGVsbG8=", "algorithm": "rc4", "mode": null, - "iv": null + "iv": null, + "secret": null, + "key_derivation": null }, { "type": "encryption_key", @@ -662,7 +664,19 @@ "key": "/////w==", "algorithm": "aes", "mode": "ecb", - "iv": "AAAAAA==" + "iv": "AAAAAA==", + "secret": null, + "key_derivation": null + }, + { + "type": "encryption_key", + "tags": [], + "key": "/////w==", + "algorithm": "aes", + "mode": "cbc", + "iv": null, + "secret": "cEBzc3cwcmQ=", + "key_derivation": "sha256" }, { "type": "decoded_string", @@ -680,7 +694,9 @@ "key": "//8=", "algorithm": "xor", "mode": null, - "iv": null + "iv": null, + "secret": null, + "key_derivation": null } }, { @@ -689,7 +705,9 @@ "key": "//8=", "algorithm": "xor", "mode": null, - "iv": null + "iv": null, + "secret": null, + "key_derivation": null }, { "type": "mission_id", diff --git a/mwcp/tests/test_report/report.py b/mwcp/tests/test_report/report.py index c017ea6..a8c8465 100644 --- a/mwcp/tests/test_report/report.py +++ b/mwcp/tests/test_report/report.py @@ -574,7 +574,9 @@ "algorithm": "rc4", "iv": None, "key": b"hello", + "key_derivation": None, "mode": None, + "secret": None, "tags": [], "type": "encryption_key", }, @@ -582,10 +584,22 @@ "algorithm": "aes", "iv": b"\x00\x00\x00\x00", "key": b"\xff\xff\xff\xff", + "key_derivation": None, "mode": "ecb", + "secret": None, "tags": [], "type": "encryption_key", }, + { + "type": "encryption_key", + "tags": [], + "key": b"\xff\xff\xff\xff", + "algorithm": "aes", + "mode": "cbc", + "iv": None, + "secret": b"p@ssw0rd", + "key_derivation": "sha256" + }, { "encryption_key": None, "tags": [], @@ -597,7 +611,9 @@ "algorithm": "xor", "iv": None, "key": b"\xff\xff", + "key_derivation": None, "mode": None, + "secret": None, "tags": [], "type": "encryption_key", }, @@ -609,7 +625,9 @@ "algorithm": "xor", "iv": None, "key": b"\xff\xff", + "key_derivation": None, "mode": None, + "secret": None, "tags": [], "type": "encryption_key", }, diff --git a/mwcp/tests/test_report_writer/report.html b/mwcp/tests/test_report_writer/report.html index 84fe53a..f29bcda 100644 --- a/mwcp/tests/test_report_writer/report.html +++ b/mwcp/tests/test_report_writer/report.html @@ -91,12 +91,13 @@

Email Address

Encryption Key

- + - - - + + + +
Key Algorithm Mode Iv
Key Algorithm Mode Iv Secret Key Derivation
0x68656c6c6f ("hello")rc4
0xffffffff aes ecb 0x00000000
0xffff xor
0x68656c6c6f ("hello")rc4
0xffffffff aes ecb 0x00000000
0xffffffff aes cbc 0x7040737377307264 ("p@ssw0rd")sha256
0xffff xor
diff --git a/mwcp/tests/test_report_writer/report.md b/mwcp/tests/test_report_writer/report.md index b264c73..de312c0 100644 --- a/mwcp/tests/test_report_writer/report.md +++ b/mwcp/tests/test_report_writer/report.md @@ -54,11 +54,12 @@ | email@bad.com | ## Encryption Key -| Key | Algorithm | Mode | Iv | -|:-----------------------|:------------|:-------|:-----------| -| 0x68656c6c6f ("hello") | rc4 | | | -| 0xffffffff | aes | ecb | 0x00000000 | -| 0xffff | xor | | | +| Key | Algorithm | Mode | Iv | Secret | Key Derivation | +|:-----------------------|:------------|:-------|:-----------|:--------------------------------|:-----------------| +| 0x68656c6c6f ("hello") | rc4 | | | | | +| 0xffffffff | aes | ecb | 0x00000000 | | | +| 0xffffffff | aes | cbc | | 0x7040737377307264 ("p@ssw0rd") | sha256 | +| 0xffff | xor | | | | | ## Event | Value | diff --git a/mwcp/tests/test_report_writer/report.txt b/mwcp/tests/test_report_writer/report.txt index 71a7cc0..7f9574a 100644 --- a/mwcp/tests/test_report_writer/report.txt +++ b/mwcp/tests/test_report_writer/report.txt @@ -54,10 +54,11 @@ Value email@bad.com ---- Encryption Key ---- -Key Algorithm Mode Iv ----------------------- ----------- ------ ---------- +Key Algorithm Mode Iv Secret Key Derivation +---------------------- ----------- ------ ---------- ------------------------------- ---------------- 0x68656c6c6f ("hello") rc4 0xffffffff aes ecb 0x00000000 +0xffffffff aes cbc 0x7040737377307264 ("p@ssw0rd") sha256 0xffff xor ---- Event ---- diff --git a/mwcp/tests/test_stix/report.json b/mwcp/tests/test_stix/report.json index 89e06ec..6e535eb 100644 --- a/mwcp/tests/test_stix/report.json +++ b/mwcp/tests/test_stix/report.json @@ -1,6 +1,6 @@ { "type": "bundle", - "id": "bundle--00000000-0000-4006-9000-000000000253", + "id": "bundle--00000000-0000-4006-9000-000000000255", "objects": [ { "type": "file", @@ -730,6 +730,21 @@ } } }, + { + "type": "symmetric-encryption", + "spec_version": "2.1", + "id": "symmetric-encryption--edb619da-9899-5795-babd-2a4335a601d3", + "key_hex": "ffffffff", + "secret_hex": "7040737377307264", + "algorithm": "aes", + "key_derivation": "sha256", + "mode": "cbc", + "extensions": { + "extension-definition--fb989191-187f-4c11-81cd-4a699a00835d": { + "extension_type": "new-sco" + } + } + }, { "type": "observed-string", "spec_version": "2.1", @@ -769,7 +784,7 @@ { "type": "relationship", "spec_version": "2.1", - "id": "relationship--00000000-0000-4006-9000-000000000214", + "id": "relationship--00000000-0000-4006-9000-000000000216", "created": "2022-01-01T07:32:00.000Z", "modified": "2022-01-01T07:32:00.000Z", "relationship_type": "outputs", @@ -809,7 +824,7 @@ { "type": "note", "spec_version": "2.1", - "id": "note--00000000-0000-4006-9000-000000000224", + "id": "note--00000000-0000-4006-9000-000000000226", "created": "2022-01-01T07:32:00.000Z", "modified": "2022-01-01T07:32:00.000Z", "content": "MWCP Tags: something", @@ -875,7 +890,7 @@ { "type": "note", "spec_version": "2.1", - "id": "note--00000000-0000-4006-9000-000000000236", + "id": "note--00000000-0000-4006-9000-000000000238", "created": "2022-01-01T07:32:00.000Z", "modified": "2022-01-01T07:32:00.000Z", "content": "MWCP Tags: tag2", @@ -958,7 +973,7 @@ { "type": "note", "spec_version": "2.1", - "id": "note--00000000-0000-4006-9000-000000000249", + "id": "note--00000000-0000-4006-9000-000000000251", "created": "2022-01-01T07:32:00.000Z", "modified": "2022-01-01T07:32:00.000Z", "content": "Description: SuperMalware Implant\nAlphabet: 0123456789ABCDEF\nAlphabet Base: 16\nAlphabet: ABCDEFGHIJKLMNOPQRSTUVWXYZ234567=\nAlphabet Base: 32\nAlphabet: ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\nAlphabet Base: 64\nEvent Name: MicrosoftExist\nInjects Into: svchost\nInterval: 3.0\nkeylogger: True\nmisc_integer: 432\n misc_integer Tags: tag1\nVersion: 3.1\nVersion: 403.10", @@ -969,11 +984,11 @@ { "type": "malware-analysis", "spec_version": "2.1", - "id": "malware-analysis--00000000-0000-4006-9000-000000000251", + "id": "malware-analysis--00000000-0000-4006-9000-000000000253", "created": "2022-01-01T07:32:00.000Z", "modified": "2022-01-01T07:32:00.000Z", "product": "mwcp", - "version": "3.12.0", + "version": "3.14.0", "analysis_sco_refs": [ "crypto-currency-address--e2e281c7-65af-5545-970a-8348106e4e15", "directory--a85fb8f4-0b2b-5c61-9cff-fc777cfd6286", @@ -1019,6 +1034,7 @@ "symmetric-encryption--142febd3-2359-57f8-b864-179af8a65bc0", "symmetric-encryption--89973fff-4119-5075-b47f-90b5de7e98c2", "symmetric-encryption--e221dd6b-1b29-515a-b280-50b4e42f74aa", + "symmetric-encryption--edb619da-9899-5795-babd-2a4335a601d3", "url--0bdcaa6f-f4d8-5870-b34c-3499c52f6da8", "url--4ed0914c-e437-57b8-ad1b-cb681b6147f0", "url--967a2810-a1a7-5d17-95fa-9a1d91b21f91", @@ -1045,4 +1061,4 @@ ] } ] -} \ No newline at end of file +} diff --git a/mwcp/tests/test_string_report/strings.json b/mwcp/tests/test_string_report/strings.json index fec1632..2680f59 100644 --- a/mwcp/tests/test_string_report/strings.json +++ b/mwcp/tests/test_string_report/strings.json @@ -32,7 +32,9 @@ "key": "3q2+7w==", "algorithm": null, "mode": null, - "iv": null + "iv": null, + "secret": null, + "key_derivation": null } } ] diff --git a/mwcp/utils/construct/ARM.py b/mwcp/utils/construct/ARM.py index 303ef61..5f4b3a8 100644 --- a/mwcp/utils/construct/ARM.py +++ b/mwcp/utils/construct/ARM.py @@ -4,8 +4,8 @@ and accessible from the submodule "ARM". (e.g. construct.ARM.LDR) """ -from . import version28 as construct -from .version28 import this +from . import core as construct +from .core import this from . import helpers from mwcp.utils import elffileutils diff --git a/mwcp/utils/construct/MIPS.py b/mwcp/utils/construct/MIPS.py index ad3f7a5..86c685d 100644 --- a/mwcp/utils/construct/MIPS.py +++ b/mwcp/utils/construct/MIPS.py @@ -6,8 +6,8 @@ reference: github.com/MIPT-ILab/mipt-mips/wiki/MIPS-Instruction-Set """ -from .version28 import * -from .version28 import this +from .core import * +from .core import this _REGISTERS = { @@ -25,7 +25,7 @@ # I-type instruction _I_inst = Struct( - Embedded(BitStruct( + *BitStruct( 'opcode' / Enum( BitsInteger(6), # NOTE: Some opcode values are reserved for other instruction formats @@ -42,7 +42,7 @@ 'src_register' / _Register, 'target_register' / _Register, # 'imm_constant' / construct.BitsInteger(16) - )), + ), # Need to move immediate outside of BitStruct to create signed number. # (Luckly, the constant is byte aligned) 'imm_constant' / Int16sb diff --git a/mwcp/utils/construct/__init__.py b/mwcp/utils/construct/__init__.py index fd39b3d..bd8779c 100644 --- a/mwcp/utils/construct/__init__.py +++ b/mwcp/utils/construct/__init__.py @@ -3,7 +3,7 @@ # from __future__ import absolute_import # Import interface -from .version28 import * +from .core import * from .construct_html import html_hex from .helpers import * diff --git a/mwcp/utils/construct/construct_html.py b/mwcp/utils/construct/construct_html.py index 02fbe79..1489cd6 100644 --- a/mwcp/utils/construct/construct_html.py +++ b/mwcp/utils/construct/construct_html.py @@ -3,7 +3,6 @@ To use, run the html_hex with a construct and data: print html_hex(CONSTRUCT, data) """ -from __future__ import print_function import codecs import os @@ -11,17 +10,7 @@ import itertools import jinja2 import sys - - -PY3 = sys.version_info.major == 3 - - -try: - # Python 2 - from itertools import izip_longest -except ImportError: - # Python 3 - from itertools import zip_longest as izip_longest +from itertools import zip_longest COLORPALLETTE = [ @@ -62,7 +51,7 @@ def grouper(n, iterable, fillvalue=None): [('A', 'B', 'C'), ('D', 'E', 'F'), ('G', 'x', 'x')] """ args = [iter(iterable)] * n - return izip_longest(fillvalue=fillvalue, *args) + return zip_longest(fillvalue=fillvalue, *args) def _iter_colors(data, color_map, default=None): @@ -98,7 +87,7 @@ def __init__(self, member_map, subcon): :param subcon: """ self._member_map = member_map - super(Member, self).__init__(subcon) + super().__init__(subcon) # version 2.9 doesn't perpetuate the name past one level anymore. self.name = self.subcon.name @@ -121,7 +110,7 @@ def _generate_value_str(self, value, indent=0): return tabs + '{}'.format(value) def _parse(self, stream, context, path): - obj = super(Member, self)._parse(stream, context, path) + obj = super()._parse(stream, context, path) # Store offset, data, and size information then return original object like nothing happened... if self.name and not self.name.startswith('_'): @@ -160,7 +149,7 @@ def __init__(self, subcon): # member_map is a dictionary mapping the offsets of elements to a list of elements it portrays self._member_map = {} subcon = self._wrap_subcon(subcon) - super(MemberMap, self).__init__(subcon) + super().__init__(subcon) def _wrap_subcon(self, subcon): """Recursively wraps all subconstructs with Member.""" @@ -191,7 +180,7 @@ def _wrap_subcon(self, subcon): def _parse(self, stream, context, path): # Clear the member_table from previous use. self._member_map.clear() - return super(MemberMap, self)._parse(stream, context, path) + return super()._parse(stream, context, path) def _decode(self, obj, context, path): """Returns a copy of the member map.""" @@ -305,8 +294,6 @@ def edit_member(name, value): current_color = None if byte is not None: - if not PY3: - byte = ord(byte) hex_ = '{:02X}'.format(byte) ascii = chr(byte) if 32 < byte < 127 else '.' else: @@ -336,7 +323,7 @@ def edit_member(name, value): if __name__ == '__main__': # Run an example if called directly. - from mwcp.utils.construct import version28 as construct + from mwcp.utils.construct import core as construct from mwcp.utils.construct.network import IP4Address from mwcp.utils.construct.helpers import HexString from construct import this diff --git a/mwcp/utils/construct/version28.py b/mwcp/utils/construct/core.py similarity index 80% rename from mwcp/utils/construct/version28.py rename to mwcp/utils/construct/core.py index 7a61613..7493aa6 100644 --- a/mwcp/utils/construct/version28.py +++ b/mwcp/utils/construct/core.py @@ -1,17 +1,18 @@ """ -Collection of patches done to bring back some of the removed features of 2.8 back into 2.9 +Collection of patches done to bring back some of the removed features of 2.8 and 2.9 as well as generally fix lingering issues with construct. To activate, replace your standard import with this: - from mwcp.utils.construct import version28 as construct + from mwcp.utils import construct Patches: - slicing mechanism ([:], [min:], [:max], etc) - allows default value for pop() in Containers - allow any encoding for string constructs. - - patch Embedded to remove hardcoded limitation of supported classes. + - (<2.10) patch Embedded to remove hardcoded limitation of supported classes. - fixes issue with sizeof used with dynamic Structs (issue #771) - patch StringEncoded to make UnicodeDecodeErrors as StringError (issue #743) + - (<2.9.51) backport stream_*() functions to optionally accept a 'path' argument. Also contains fixes for few constructs. (Use these versions to get the benefits.) - Range() - was removed in 2.9 @@ -38,13 +39,13 @@ null characters should be stripped. - Add the path to ConstructError exceptions. This will greatly help with debugging. - Add deepcopy functionality for Container classes. - - Embedding should also embed the context. - - Also, Embedded should just be a function that toggles flagembedded instead of being it's own class. - remove _io from resulting Container objects after a parse. Doesn't look to be used for anything. """ from __future__ import absolute_import +import uuid + from future.builtins import bytes, str import codecs @@ -58,6 +59,46 @@ from construct.core import * +# Version 2.9.51 added 'path' to stream_*() functions. +# Backport older versions to optionally accept a path. +# Forwardport ability to default path to None, (since it is not required for errors) +def stream_read(stream, length, path=None): + if version < (2, 9, 51): + return construct.core.stream_read(stream, length) + else: + return construct.core.stream_read(stream, length, path) + + +def stream_read_entire(stream, path=None): + if version < (2, 9, 51): + return construct.core.stream_read_entire(stream) + else: + return construct.core.stream_read_entire(stream, path) + + +def stream_write(stream, data, length=None, path=None): + if length is None: + length = len(data) + if version < (2, 9, 51): + return construct.core.stream_write(stream, data, length) + else: + return construct.core.stream_write(stream, data, length, path) + + +def stream_seek(stream, offset, whence=0, path=None): + if version < (2, 9, 51): + return construct.core.stream_seek(stream, offset, whence) + else: + return construct.core.stream_seek(stream, offset, whence, path) + + +def stream_tell(stream, path=None): + if version < (2, 9, 51): + return construct.core.stream_tell(stream) + else: + return construct.core.stream_tell(stream, path) + + class Range(Subconstruct): r""" A homogenous array of elements. The array will iterate through between ``min`` to ``max`` times. If an exception occurs (EOF, validation error), the repeater exits cleanly. If less than ``min`` units have been successfully parsed, a RangeError is raised. @@ -89,7 +130,7 @@ class Range(Subconstruct): __slots__ = ["min", "max"] def __init__(self, min, max, subcon): - super(Range, self).__init__(subcon) + super().__init__(subcon) self.min = min self.max = max @@ -97,7 +138,7 @@ def _parse(self, stream, context, path): min_ = evaluate(self.min, context) max_ = evaluate(self.max, context) if not 0 <= min_ <= max_ <= sys.maxsize: - raise RangeError("[{}] unsane min {} and max {}".format(path, min_, max_)) + raise RangeError(f"[{path}] unsane min {min_} and max {max_}") obj = ListContainer() try: i = 0 @@ -106,7 +147,7 @@ def _parse(self, stream, context, path): fallback = stream.tell() obj.append(self.subcon._parsereport(stream, context, path)) if stream.tell() == fallback: - raise ExplicitError("[{}] Infinite loop detected.".format(path)) + raise ExplicitError(f"[{path}] Infinite loop detected.") i += 1 except StopIteration: pass @@ -114,7 +155,7 @@ def _parse(self, stream, context, path): raise except Exception: # TODO: catch ConstructError instead? if len(obj) < min_: - raise RangeError("[{}] expected {} to {}, found {}".format(path, min_, max_, len(obj))) + raise RangeError(f"[{path}] expected {min_} to {max_}, found {len(obj)}") stream.seek(fallback) return obj @@ -122,11 +163,11 @@ def _build(self, obj, stream, context, path): min_ = evaluate(self.min, context) max_ = evaluate(self.max, context) if not 0 <= min_ <= max_ <= sys.maxsize: - raise RangeError("[{}] unsane min {} and max {}".format(path, min_, max_)) + raise RangeError(f"[{path}] unsane min {min_} and max {max_}") if not isinstance(obj, collections.abc.Sequence): - raise RangeError("[{}] expected sequence type, found {}".format(path, type(obj))) + raise RangeError(f"[{path}] expected sequence type, found {type(obj)}") if not min_ <= len(obj) <= max_: - raise RangeError("[{}] expected from {} to {} elements, found {}".format(path, min_, max_, len(obj))) + raise RangeError(f"[{path}] expected from {min_} to {max_} elements, found {len(obj)}") retlist = ListContainer() try: for i, subobj in enumerate(obj): @@ -139,7 +180,7 @@ def _build(self, obj, stream, context, path): raise except Exception: if len(obj) < min_: - raise RangeError("[{}] expected {} to {}, found {}".format(path, min_, max_, len(obj))) + raise RangeError(f"[{path}] expected {min_} to {max_}, found {len(obj)}") else: raise return retlist @@ -152,7 +193,11 @@ def _sizeof(self, context, path): except (KeyError, AttributeError): raise SizeofError("cannot calculate size, key not found in context") if min_ == max_: - return min_ * self.subcon._sizeof(context, path) + size = 0 + for i in range(min_): + context._index = i + size += self.subcon._sizeof(context, path) + return size else: raise SizeofError("cannot calculate size") @@ -238,7 +283,7 @@ def PaddedString(length, encoding='utf-8'): def GreedyString(encoding='utf-8'): - """Adds default encoding option to PaddedString().""" + """Adds default encoding option to GreedyString().""" return construct.GreedyString(encoding) @@ -279,7 +324,7 @@ class Compressed(Adapter): __slots__ = ["lib", "wrap_exception"] def __init__(self, subcon, lib, wrap_exception=True): - super(Compressed, self).__init__(subcon) + super().__init__(subcon) self.wrap_exception = wrap_exception if hasattr(lib, "compress") and hasattr(lib, "decompress"): self.lib = lib @@ -293,14 +338,14 @@ def __init__(self, subcon, lib, wrap_exception=True): import bz2 self.lib = bz2 else: - raise ValueError('Invalid lib parameter: {}'.format(lib)) + raise ValueError(f'Invalid lib parameter: {lib}') def _decode(self, data, context, path): try: return self.lib.decompress(data) except Exception as e: if self.wrap_exception: - raise ConstructError('Decompression failed with error: {}'.format(e)) + raise ConstructError(f'Decompression failed with error: {e}') else: raise @@ -309,7 +354,7 @@ def _encode(self, data, context, path): return self.lib.compress(data) except Exception as e: if self.wrap_exception: - raise ConstructError('Compression failed with error: {}'.format(e)) + raise ConstructError(f'Compression failed with error: {e}') else: raise @@ -325,11 +370,11 @@ def __init__(self, parsefrom_or_subcon, *subcons, **subconskw): subcons = (parsefrom_or_subcon,) + subcons else: parsefrom = parsefrom_or_subcon - super(Union, self).__init__(parsefrom, *subcons, **subconskw) + super().__init__(parsefrom, *subcons, **subconskw) # Map an integer in the inclusive range 0-255 to its string byte representation -PRINTABLE = [bytes2str(int2byte(i)) if 32 <= i < 128 else '.' for i in range(256)] +PRINTABLE = [chr(i) if 32 <= i < 128 else '.' for i in range(256)] HEXPRINT = [format(i, '02X') for i in range(256)] # Copy of construct.lib.hex.hexdump but removes the "hexundump(" string. @@ -360,8 +405,8 @@ def hexdump(data, linesize): prettylines = [] for i in range(0, len(data), linesize): line = data[i:i+linesize] - hextext = " ".join(HEXPRINT[b] for b in iterateints(line)) - rawtext = "".join(PRINTABLE[b] for b in iterateints(line)) + hextext = " ".join(HEXPRINT[b] for b in line) + rawtext = "".join(PRINTABLE[b] for b in line) prettylines.append(fmt % (i, str(hextext), str(rawtext))) return "\n".join(prettylines) @@ -376,14 +421,14 @@ class Probe(construct.Probe): """ def __init__(self, into=None, lookahead=128, name=None): self.print_name = name - super(Probe, self).__init__(into=into, lookahead=lookahead) + super().__init__(into=into, lookahead=lookahead) def printout(self, stream, context, path): print("--------------------------------------------------") - print("Probe {}".format(self.print_name or '')) - print("Path: {}".format(path)) + print(f"Probe {self.print_name or ''}") + print(f"Path: {path}") if self.into: - print("Into: {!r}".format(self.into)) + print(f"Into: {self.into!r}") if self.lookahead and stream is not None: fallback = stream.tell() @@ -485,28 +530,30 @@ class Mapping(construct.Mapping): """ def __init__(self, subcon, dec_mapping, enc_mapping=None): - super(Mapping, self).__init__(subcon, {}) + super().__init__(subcon, {}) self.decmapping = dec_mapping self.encmapping = enc_mapping or {v: k for k, v in sorted(dec_mapping.items(), reverse=True)} - def _patch_pop(): """ Patches the pop() function in Container to allow for a default value. """ - def pop(self, key, *default): - try: - val = dict.pop(self, key, *default) - self.__keys_order__.remove(key) - return val - except ValueError: - if default: - return default[0] - else: - raise KeyError + # Only need to patch the older version of Container that implemented its own version of pop. + # (Updated in version 2.10.58) + if "__keys_order__" in Container.__slots__: + def pop(self, key, *default): + try: + val = dict.pop(self, key, *default) + self.__keys_order__.remove(key) + return val + except ValueError: + if default: + return default[0] + else: + raise KeyError - Container.pop = pop + Container.pop = pop def _patch_slice(): @@ -538,7 +585,7 @@ def _decode(self, obj, context, path): try: return orig_decode(self, obj, context, path) except UnicodeDecodeError as e: - raise StringError("[{}] string decoding failed: {}".format(path, e)) + raise StringError(f"[{path}] string decoding failed: {e}") construct.StringEncoded._decode = _decode @@ -556,7 +603,8 @@ def _sizeof(self, context, path): def isStruct(sc): return isStruct(sc.subcon) if isinstance(sc, Renamed) else isinstance(sc, Struct) def nest(context, sc): - if isStruct(sc) and not sc.flagembedded and sc.name in context: + # flagembedded was removed in 2.10 + if isStruct(sc) and not getattr(sc, "flagembedded", False) and sc.name in context: context2 = context[sc.name] context2["_"] = context return context2 @@ -621,8 +669,6 @@ def _patch_mergefields(): Patches the mergefields() function to remove the hardcoded list of embeddable classes. This fixes the issue of trying to wrap Embedded around a Bitwise component. - - Fixes: github.com/construct/construct/issues/TODO """ def mergefields(*subcons): def select(sc): @@ -645,14 +691,69 @@ def select(sc): construct.core.mergefields = mergefields +def _patch_embed(): + r""" + Patches in a way to embed a struct within another struct through the use of * unpacking. + + NOTE: This is just a fancy wrapper for allowing the automatic creation of 'Computed' constructs. + Therefore, it only is for report purposes, and will not be helpful for building. + + NOTE: This only works for structs built with positional arguments. The keyword argument method won't work. + + e.g.: + nested = construct.BitStruct( + "bit1" / construct.Bit, + "nibble" / construct.Nibble, + "bit3" / construct.BitsInteger(3), + ) + + construct.Struct( + 'a' / construct.Int32ul, + *nested, + 'b' / construct.Int32ul, + ) + """ + def _obtain_subcons(subcon): + while isinstance(subcon, Subconstruct): + subcon = subcon.subcon + return getattr(subcon, "subcons", []) + + def _embed(subcon): + # Peel back any layers to reveal the nested subcons. + subcons = _obtain_subcons(subcon) + + # Ignore and just yield the original subcon if we don't find any nested subcons. + if not subcons: + yield subcon + return + + # Discover name. + if subcon.name: + name = subcon.name + else: + name = f"_embed_{uuid.uuid4().hex}" + subcon = name / subcon + + yield subcon + for nested in subcons: + if nested.name: + yield nested.name / Computed(lambda ctx, root_name=name, nested_name=nested.name: ctx[root_name][nested_name]) + + def __iter__(self): + yield from _embed(self) + construct.core.Construct.__iter__ = __iter__ + + def _patch(): - """Patches 2.9 with 2.8 features and other general fixes.""" + """Patches construct with old features and other general fixes.""" _patch_pop() _patch_slice() _patch_StringEncoded() _patch_sizeof() _patch_encodingunit() - _patch_mergefields() + _patch_embed() + if version < (2, 10, 0): + _patch_mergefields() _patch() diff --git a/mwcp/utils/construct/datetime_.py b/mwcp/utils/construct/datetime_.py index 996bbc0..18a81ff 100644 --- a/mwcp/utils/construct/datetime_.py +++ b/mwcp/utils/construct/datetime_.py @@ -6,7 +6,7 @@ import datetime -from .version28 import * +from .core import * # TODO: Implement _encode. @@ -16,7 +16,7 @@ class _DateTimeDateDataAdapter(Adapter): property to format datetime. - >>> _DateTimeDateDataAdapter(Int64sl).parse('\x80\xb4N3\xd1\xd4\xd1H') + >>> _DateTimeDateDataAdapter(Int64sl).parse(b'\x80\xb4N3\xd1\xd4\xd1H') '2014-11-23 01:09:01 UTC' """ def _decode(self, obj, context, path): @@ -49,7 +49,7 @@ def __init__(self, subcon, tz=None): :param tz: Optional timezone object, default is localtime :param subcon: subcon to parse EpochTime. """ - super(EpochTimeAdapter, self).__init__(subcon) + super().__init__(subcon) self._tz = tz def _decode(self, obj, context, path): diff --git a/mwcp/utils/construct/dotnet.py b/mwcp/utils/construct/dotnet.py index 0697f3f..6f900e8 100644 --- a/mwcp/utils/construct/dotnet.py +++ b/mwcp/utils/construct/dotnet.py @@ -1,6 +1,6 @@ """Construct helpers for .NET""" -from .version28 import * +from .core import * class _DotNetUInt(Construct): diff --git a/mwcp/utils/construct/helpers.py b/mwcp/utils/construct/helpers.py index a322a53..8957e7f 100644 --- a/mwcp/utils/construct/helpers.py +++ b/mwcp/utils/construct/helpers.py @@ -6,13 +6,14 @@ import string import uuid import zlib +from typing import Iterable from mwcp.utils import custombase64, elffileutils, pefileutils # Patch with version 2.8 changes. -from mwcp.utils.construct import version28 as construct -from .version28 import * +from mwcp.utils.construct import core as construct +from .core import * BYTE = Byte @@ -67,12 +68,11 @@ class ErrorMessage(Construct): >>> d.parse(b"\xff\x05") Traceback (most recent call last): ... - ExplicitError: Failed if statement + construct.core.ExplicitError: Failed if statement """ - __slots__ = ['message'] def __init__(self, message="Error field was activated."): - super(self.__class__, self).__init__() + super().__init__() self.message = message def _parse(self, stream, context, path): @@ -89,11 +89,11 @@ def String16(length): Creates UTF-16 (little endian) encoded string. >>> String16(10).build(u'hello') - 'h\x00e\x00l\x00l\x00o\x00' + b'h\x00e\x00l\x00l\x00o\x00' >>> String16(10).parse(b'h\x00e\x00l\x00l\x00o\x00') - u'hello' + 'hello' >>> String16(16).parse(b'h\x00e\x00l\x00l\x00o\x00\x00\x00\x00\x00\x00\x00') - u'hello' + 'hello' """ return String(length, encoding='utf-16-le') @@ -103,9 +103,9 @@ def String32(length): Creates UTF-32 (little endian) encoded string. >>> String32(20).build(u'hello') - 'h\x00\x00\x00e\x00\x00\x00l\x00\x00\x00l\x00\x00\x00o\x00\x00\x00' + b'h\x00\x00\x00e\x00\x00\x00l\x00\x00\x00l\x00\x00\x00o\x00\x00\x00' >>> String32(20).parse(b'h\x00\x00\x00e\x00\x00\x00l\x00\x00\x00l\x00\x00\x00o\x00\x00\x00') - u'hello' + 'hello' """ return String(length, encoding='utf-32-le') @@ -117,23 +117,25 @@ class Printable(Validator): NOTE: A ValidationError is a type of ConstructError and will be cause if catching ConstructError. >>> Printable(String(5)).parse(b'hello') - u'hello' + 'hello' >>> Printable(String(5)).parse(b'he\x11o!') Traceback (most recent call last): ... - ValidationError: object failed validation: heo! + construct.core.ValidationError: Error in path (parsing) + object failed validation: heo! >>> Printable(Bytes(3)).parse(b'\x01NO') Traceback (most recent call last): ... - ValidationError: object failed validation: NO + construct.core.ValidationError: Error in path (parsing) + object failed validation: b'\x01NO' >>> Printable(Bytes(3)).parse(b'YES') - 'YES' + b'YES' """ def _validate(self, obj, context, path): if isinstance(obj, bytes): return all(chr(byte) in string.printable for byte in obj) - return isinstance(obj, stringtypes) and all(char in string.printable for char in obj) + return isinstance(obj, str) and all(char in string.printable for char in obj) # Continuously parses until it hits the first non-zero byte. @@ -151,7 +153,7 @@ class BytesTerminated(NullTerminated): See the NullTerminated documentation for the remainder of the functionality and options. >>> BytesTerminated(GreedyBytes, term=b'TERM').parse(b'helloTERM') - 'hello' + b'hello' """ # The only method we need to override is _parse. Everything else from NullTerminated works as-is. @@ -162,15 +164,15 @@ def _parse(self, stream, context, path): raise PaddingError("BytesTerminated term must be at least 1 byte") data = b'' while True: - pos = stream_tell(stream) + pos = stream_tell(stream, path) try: b = stream_read(stream, term_len) - stream_seek(stream, pos, 0) + stream_seek(stream, pos, 0, path) except StreamError: if self.require: raise else: - stream_seek(stream, pos, 0) + stream_seek(stream, pos, 0, path) data += stream_read_entire(stream) break @@ -178,10 +180,10 @@ def _parse(self, stream, context, path): if self.include: data += b if self.consume: - stream_read(stream, term_len) + stream_read(stream, term_len, path) break else: - data += stream_read(stream, 1) + data += stream_read(stream, 1, path) if self.subcon is GreedyBytes: return data if type(self.subcon) is GreedyString: @@ -201,33 +203,34 @@ class Stripped(Adapter): :param pad: The character/bytes to use for stripping. Defaults to null character. >>> Stripped(GreedyBytes).parse(b'hello\x00\x00\x00') - 'hello' + b'hello' >>> Stripped(Bytes(10)).parse(b'hello\x00\x00\x00\x00\x00') - 'hello' + b'hello' >>> Stripped(Bytes(14), pad=b'PAD').parse(b'helloPADPADPAD') - 'hello' + b'hello' >>> Stripped(Bytes(14), pad=b'PAD').build(b'hello') - 'helloPADPADPAD' + b'helloPADPADPAD' >>> Stripped(CString(), pad=u'PAD').parse(b'helloPADPAD\x00') - u'hello' + 'hello' >>> Stripped(String(14), pad=u'PAD').parse(b'helloPADPAD\x00\x00\x00') - u'hello' + 'hello' # WARNING: If padding doesn't fit in the perscribed data it will not strip it! >>> Stripped(Bytes(13), pad=b'PAD').parse(b'helloPADPADPA') - 'helloPADPADPA' + b'helloPADPADPA' >>> Stripped(Bytes(13), pad=b'PAD').build(b'hello') Traceback (most recent call last): ... - StreamError: bytes object of wrong length, expected 13, found 5 + construct.core.StreamError: Error in path (building) + bytes object of wrong length, expected 13, found 5 - # If the wrapped subconstruct's size can't be determine, if defaults to not providing a pad. + # If the wrapped subconstruct's size can't be determined, if defaults to not providing a pad. >>> Stripped(CString(), pad=u'PAD').build(u'hello') - 'hello\x00' + b'hello\x00' """ def __init__(self, subcon, pad=None): - super(Stripped, self).__init__(subcon) + super().__init__(subcon) self.pad = pad def _decode(self, obj, context, path): @@ -237,7 +240,7 @@ def _decode(self, obj, context, path): pad = u'\0' if isinstance(obj, unicodestringtype) else b'\x00' if not isinstance(pad, type(obj)): - raise PaddingError("NullStripped pad must be of the same type: {} vs {}".format(type(pad), type(obj))) + raise PaddingError(f"NullStripped pad must be of the same type: {type(pad)} vs {type(obj)}") unit = len(pad) if unit < 1: @@ -284,7 +287,7 @@ class HexString(Adapter): e.g. >>> HexString(Int32ul).build('0x123') - '#\x01\x00\x00' + b'#\x01\x00\x00' >>> HexString(Int32ul).parse(b'\x20\x01\x00\x00') '0x120' >>> HexString(Int16ub).parse(b'\x12\x34') @@ -314,31 +317,30 @@ class Base64(Adapter): e.g. >>> Base64(GreedyString()).build(b'hello') - 'aGVsbG8=' + b'aGVsbG8=' >>> Base64(GreedyString()).parse(b'aGVsbG8=') - 'hello' + b'hello' >>> Base64(GreedyBytes).build(b'\x01\x02\x03\x04') - 'AQIDBA==' + b'AQIDBA==' >>> Base64(GreedyBytes).parse(b'AQIDBA==') - '\x01\x02\x03\x04' + b'\x01\x02\x03\x04' NOTE: String size is based on the encoded version. - >>> Base64(String(16)).build(u'hello world') - 'aGVsbG8gd29ybGQ=' + >>> Base64(String(16)).build('hello world') + b'aGVsbG8gd29ybGQ=' >>> Base64(String(16)).parse(b'aGVsbG8gd29ybGQ=') - 'hello world' + b'hello world' Supplying a custom alphabet is also supported. - >>> spec = Base64(String(16), custom_alpha='EFGHQRSTUVWefghijklmnopIJKLMNOPABCDqrstuvwxyXYZabcdz0123456789+/=') - >>> spec.build(u'hello world') - 'LSoXMS8BO29dMSj=' + >>> spec = Base64(String(16), custom_alpha=b'EFGHQRSTUVWefghijklmnopIJKLMNOPABCDqrstuvwxyXYZabcdz0123456789+/=') + >>> spec.build('hello world') + b'LSoXMS8BO29dMSj=' >>> spec.parse(b'LSoXMS8BO29dMSj=') - 'hello world' + b'hello world' """ - __slots__ = ['subcon', 'custom_alpha'] def __init__(self, subcon, custom_alpha=None): - super(Base64, self).__init__(subcon) + super().__init__(subcon) self.custom_alpha = custom_alpha def _encode(self, obj, context, path): @@ -351,7 +353,10 @@ def _encode(self, obj, context, path): def _decode(self, obj, context, path): if isinstance(obj, str): obj = obj.encode('utf-8') - return custombase64.b64decode(obj, alphabet=self.custom_alpha) + try: + return custombase64.b64decode(obj, alphabet=self.custom_alpha) + except binascii.Error as e: + raise ConstructError(f"[{path}] {e}") class ZLIB(Adapter): @@ -364,16 +369,15 @@ class ZLIB(Adapter): :param int bufsize: The initial output buffer size >>> ZLIB(Bytes(12)).build(b'data') - 'x\x9cKI,I\x04\x00\x04\x00\x01\x9b' + b'x\x9cKI,I\x04\x00\x04\x00\x01\x9b' >>> ZLIB(GreedyBytes, level=0).build(b'data') - 'x\x01\x01\x04\x00\xfb\xffdata\x04\x00\x01\x9b' + b'x\x01\x01\x04\x00\xfb\xffdata\x04\x00\x01\x9b' >>> ZLIB(GreedyBytes).parse(b'x^KI,I\x04\x00\x04\x00\x01\x9b') - 'data' + b'data' """ - __slots__ = ["subcon", "wbits", "bufsize", "level"] def __init__(self, subcon, wbits=None, bufsize=None, level=None): - super(ZLIB, self).__init__(subcon) + super().__init__(subcon) self.wbits = wbits self.bufsize = bufsize self.level = level @@ -409,16 +413,15 @@ class UUIDAdapter(Adapter): e.g. >>> UUIDAdapter(Bytes(16)).build('{12345678-1234-5678-1234-567812345678}') - 'xV4\x124\x12xV\x124Vx\x124Vx' + b'xV4\x124\x12xV\x124Vx\x124Vx' >>> UUIDAdapter(Bytes(16), le=False).build('{12345678-1234-5678-1234-567812345678}') - '\x124Vx\x124Vx\x124Vx\x124Vx' + b'\x124Vx\x124Vx\x124Vx\x124Vx' >>> UUIDAdapter(Bytes(16)).parse(b'xV4\x124\x12xV\x124Vx\x124Vx') '{12345678-1234-5678-1234-567812345678}' """ - __slots__ = ['subcon', 'le'] def __init__(self, subcon, le=True): - super(UUIDAdapter, self).__init__(subcon) + super().__init__(subcon) self.le = le def _encode(self, obj, context, path): @@ -443,9 +446,9 @@ def UUID(le=True): e.g. >>> UUID().build('{12345678-1234-5678-1234-567812345678}') - 'xV4\x124\x12xV\x124Vx\x124Vx' + b'xV4\x124\x12xV\x124Vx\x124Vx' >>> UUID(le=False).build('{12345678-1234-5678-1234-567812345678}') - '\x124Vx\x124Vx\x124Vx\x124Vx' + b'\x124Vx\x124Vx\x124Vx\x124Vx' >>> UUID().parse(b'xV4\x124\x12xV\x124Vx\x124Vx') '{12345678-1234-5678-1234-567812345678}' >>> UUID(le=False).parse(b'\x124Vx\x124Vx\x124Vx\x124Vx') @@ -511,7 +514,7 @@ def __init__(self, subcon, pe=None): :param pe: Optional PE file object. (if not supplied here, this must be supplied during parse()/build() :param subcon: subcon to parse memory offset. """ - super(PEPhysicalAddress, self).__init__(subcon) + super().__init__(subcon) self._pe = pe def _encode(self, obj, context, path): @@ -617,7 +620,7 @@ class Delimited(Construct): >>> spec.parse(b'Hello\x00\x00|\x01\x00\x00\x00|world!!\x01\x02|\xff') Container(first=u'Hello', second=1, third=b'world!!\x01\x02', fourth=255) >>> spec.build(dict(first=u'Hello', second=1, third=b'world!!\x01\x02', fourth=255)) - 'Hello\x00|\x01\x00\x00\x00|world!!\x01\x02|\xff' + b'Hello\x00|\x01\x00\x00\x00|world!!\x01\x02|\xff' If you don't care about a particular element, you can leave it nameless just like in Structs. # NOTE: You can't build unless you have supplied every attribute. @@ -652,7 +655,7 @@ class Delimited(Construct): >>> spec.parse(b'Hello\x00\x00YOYO\x01\x00\x00\x00YOYOworld!!YO!!\x01\x02YOYO\xff') Container(first=u'Hello', second=1, third=b'world!!YO!!\x01\x02', fourth=255) >>> spec.build(dict(first=u'Hello', second=1, third=b'world!!YO!!\x01\x02', fourth=255)) - 'Hello\x00YOYO\x01\x00\x00\x00YOYOworld!!YO!!\x01\x02YOYO\xff' + b'Hello\x00YOYO\x01\x00\x00\x00YOYOworld!!YO!!\x01\x02YOYO\xff' # TODO: Add support for using a single construct for parsing an unknown number of times # (or within a min, max, or exact) @@ -667,18 +670,16 @@ class Delimited(Construct): # ['hello'] """ - __slots__ = ['delimiter', 'subcons'] - def __init__(self, delimiter, *subcons): """ - :param delimiter: single charactor or a function that takes context and returns the delimiter + :param delimiter: single character or a function that takes context and returns the delimiter :param subcons: constructs to use to parse each element. NOTE: The number of constructs will be the number of elements delimited. (ie. len(subcons) == number of delimiters + 1) :raises ValueError: If no subcons are defined. """ - super(Delimited, self).__init__() + super().__init__() self.delimiter = delimiter self.subcons = subcons if len(subcons) < 2: @@ -700,14 +701,14 @@ def _find_delimiter(self, stream, delimiter): return delimiter_offset else: stream.seek(delimiter_offset + 1) - raise ConstructError('Unable to find delimiter: {}'.format(delimiter)) + raise ConstructError(f'Unable to find delimiter: {delimiter}') finally: stream.seek(fallback) def _parse_subcon(self, subcon, stream, obj, context, path): """Parses and fills obj and context.""" subobj = subcon._parsereport(stream, context, path) - if subcon.flagembedded: + if getattr(subcon, "flagembedded", False): if subobj is not None: obj.update(subobj.items()) context.update(subobj.items()) @@ -733,7 +734,7 @@ def _parse(self, stream, context, path): delimiter_offset = self._find_delimiter(stream, delimiter) - # Temporaily fake the read() so that we can force EOF before delimiter. + # Temporarily fake the read() so that we can force EOF before delimiter. orig_read = stream.read def new_read(size=None): max_size = delimiter_offset - stream.tell() @@ -764,7 +765,7 @@ def _build(self, obj, stream, context, path): context = Container(_=context) context.update(obj) for i, sc in enumerate(self.subcons): - if sc.flagembedded: + if getattr(sc, "flagembedded", False): subobj = obj elif sc.flagbuildnone: subobj = obj.get(sc.name, None) @@ -772,7 +773,7 @@ def _build(self, obj, stream, context, path): subobj = obj[sc.name] buildret = sc._build(subobj, stream, context, path) if buildret is not None: - if sc.flagembedded: + if getattr(sc, "flagembedded", False): context.update(buildret) if sc.name is not None: context[sc.name] = buildret @@ -798,10 +799,10 @@ class Regex(Construct): The seek position is left at the end of the successful match (match.end()). - >>> regex = re.compile('\x01\x02(?P.{4})\x03\x04(?P[A-Za-z].*\x00)', re.DOTALL) - >>> data = 'GARBAGE!\x01\x02\x0A\x00\x00\x00\x03\x04C:\Windows\x00MORE GARBAGE!' + >>> regex = re.compile(b'\x01\x02(?P.{4})\x03\x04(?P[A-Za-z].*\x00)', re.DOTALL) + >>> data = b'GARBAGE!\x01\x02\x0A\x00\x00\x00\x03\x04C:\Windows\x00MORE GARBAGE!' >>> r = Regex(regex, size=Int32ul, path=CString()).parse(data) - >>> r == Container(path=u'C:\\Windows', size=10) + >>> r == Container(path='C:\\Windows', size=10) True >>> r = Regex(regex).parse(data) >>> r == Container(path=b'C:\\Windows\x00', size=b'\n\x00\x00\x00') @@ -811,50 +812,49 @@ class Regex(Construct): ... 'after_re' / Tell, ... 'garbage' / GreedyBytes ... ).parse(data) - >>> r == Container(re=Container(path=u'C:\\Windows', size=10), after_re=27L, garbage=b'MORE GARBAGE!') + >>> r == Container(re=Container(path='C:\\Windows', size=10), after_re=27, garbage=b'MORE GARBAGE!') True - # TODO: Unfortunately Embedded() no longer works with the update to 2.9 - # >>> Struct( - # ... Embedded(Regex(regex, size=Int32ul, path=CString())), - # ... 'after_re' / Tell, - # ... 'garbage' / GreedyBytes - # ... ).parse(data) - # Container(path=u'C:\\Windows', size=10, after_re=27L, garbage=b'MORE GARBAGE!') + >>> Struct( + ... *Regex(regex, size=Int32ul, path=CString()), + ... 'after_re' / Tell, + ... 'garbage' / GreedyBytes + ... ).parse(data) + Container(size=10, path=u'C:\\Windows', after_re=27, garbage=b'MORE GARBAGE!') You can use Regex as a trigger to find a particular piece of data before you start parsing. >>> Struct( - ... Regex('TRIGGER'), + ... Regex(b'TRIGGER'), ... 'greeting' / CString() - ... ).parse('\x01\x02\x04GARBAGE\x05TRIGGERhello world\x00') + ... ).parse(b'\x01\x02\x04GARBAGE\x05TRIGGERhello world\x00') Container(greeting=u'hello world') If no data is captured, the associated subcon will received a stream with the position set at the location of that captured group. Thus, allowing you to use it as an anchor point. - >>> r = Regex('hello (?P)world(?P.*)', anchor=Tell).parse('hello world!!!!') - >>> r == Container(extra_data=b'!!!!', anchor=6L) + >>> r = Regex(b'hello (?P)world(?P.*)', anchor=Tell).parse(b'hello world!!!!') + >>> r == Container(extra_data=b'!!!!', anchor=6) True If no named capture groups are used, you can instead parse the entire matched string by supplying a subconstruct as a positional argument. (If no subcon is provided, the raw bytes are returned instead. - >>> Regex('hello world\x00', CString()).parse('GARBAGE\x01\x03hello world\x00\x04') - u'hello world' - >>> Regex('hello world\x00').parse('GARBAGE\x01\x03hello world\x00\x04') - 'hello world\x00' + >>> Regex(b'hello world\x00', CString()).parse(b'GARBAGE\x01\x03hello world\x00\x04') + 'hello world' + >>> Regex(b'hello world\x00').parse(b'GARBAGE\x01\x03hello world\x00\x04') + b'hello world\x00' You can also set the regular expression to match in-place (instead of searching the data) by setting the keyword argument _match to True. >>> Regex('hello', _match=True).parse(b'hello world!') - 'hello' + b'hello' >>> Regex('hello').parse(b'bogus hello world') - 'hello' + b'hello' >>> Regex('hello', _match=True).parse(b'bogus hello world') Traceback (most recent call last): ... - ConstructError: [(parsing)] regex did not match + construct.core.ConstructError: [(parsing)] regex did not match """ - __slots__ = ['regex', 'subcon', 'group_subcons', 'match'] + __slots__ = ['regex', 'subcon', 'subcons', 'match'] def __init__(self, regex, *subcon, **group_subcons): """ @@ -872,22 +872,26 @@ def __init__(self, regex, *subcon, **group_subcons): :raises ValueError: If arguments are invalid. """ - super(Regex, self).__init__() + super().__init__() if isinstance(regex, str): regex = regex.encode() # force byte strings if isinstance(regex, bytestringtype): regex = re.compile(regex, re.DOTALL) self.regex = regex - # TODO: This feature seems backwards, perhaps make a _search keyword instead and default to match functionality. - # Alternatively, we could have RegexSearch and RegexMatch constructs instead. self.match = group_subcons.pop('_match', False) - self.group_subcons = group_subcons + self.subcons = [Renamed(sc, name) for name, sc in group_subcons.items()] + self._subcons = Container((sc.name, sc) for sc in self.subcons) if subcon and len(subcon) > 1: raise ValueError('Only one subcon can be supplied for the entire match.') if subcon and group_subcons: raise ValueError('subcon and group_subcons arguments cannot be used at the same time.') self.subcon = subcon[0] if subcon else None + def __getattr__(self, name): + if name in self._subcons: + return self._subcons[name] + raise AttributeError + def _parse(self, stream, context, path): start = stream.tell() # NOTE: we are going to have to read the entire stream due to regex requirements. @@ -897,7 +901,7 @@ def _parse(self, stream, context, path): else: match = self.regex.search(stream.read()) if not match: - raise ConstructError('[{}] regex did not match'.format(path)) + raise ConstructError(f'[{path}] regex did not match') try: group_dict = match.groupdict() @@ -915,7 +919,7 @@ def _parse(self, stream, context, path): obj._io = stream context = Container(_=context, _params=context._params, _root=None, _parsing=context._parsing, - _building=context._building, _sizing=context._sizing, _subcons=self.group_subcons, + _building=context._building, _sizing=context._sizing, _subcons=self.subcons, _io=stream, _index=context.get("_index", None)) context._root = context._.get("_root", context) @@ -924,7 +928,8 @@ def _parse(self, stream, context, path): context.update(group_dict) # Parse groups using supplied constructs. - for name, subcon in self.group_subcons.items(): + for subcon in self.subcons: + name = subcon.name try: data = match.group(name) except IndexError: @@ -970,6 +975,16 @@ def _emitseq(self, ksy, bitwise): raise NotImplementedError +def RegexSearch(regex, *subcon, **group_subcons) -> Regex: + """Performs search of given regex pattern starting at current stream position and then parses match groups.""" + return Regex(regex, *subcon, _match=False, **group_subcons) + + +def RegexMatch(regex, *subcon, **group_subcons) -> Regex: + """Peforms match of given regex pattern at current stream position and then parses match groups.""" + return Regex(regex, *subcon, _match=True, **group_subcons) + + class IterError(ConstructError): pass @@ -989,11 +1004,11 @@ class Iter(Construct): ... default=construct.Pass ... ) ... ) - >>> spec.parse('\x01\x02\x09\x03\x03\x03\x03\x06\x06') + >>> spec.parse(b'\x01\x02\x09\x03\x03\x03\x03\x06\x06') Container(types=ListContainer([1, 2, 9]), entries=ListContainer([50529027, 1542, None])) >>> C = _ >>> spec.build(C) - '\x01\x02\t\x03\x03\x03\x03\x06\x06' + b'\x01\x02\t\x03\x03\x03\x03\x06\x06' >>> spec.sizeof(**C) 9 @@ -1001,30 +1016,30 @@ class Iter(Construct): ... 'sizes' / Int16ul[4], ... 'entries' / Iter(this.sizes, Bytes) # equivalent to Iter(this.sizes, lambda size: Bytes(size)) ... ) - >>> spec.parse('\x01\x00\x03\x00\x00\x00\x05\x00abbbddddd') - Container(sizes=ListContainer([1, 3, 0, 5]), entries=ListContainer(['a', 'bbb', '', 'ddddd'])) + >>> spec.parse(b'\x01\x00\x03\x00\x00\x00\x05\x00abbbddddd') + Container(sizes=ListContainer([1, 3, 0, 5]), entries=ListContainer([b'a', b'bbb', b'', b'ddddd'])) >>> C = _ >>> spec.build(C) - '\x01\x00\x03\x00\x00\x00\x05\x00abbbddddd' + b'\x01\x00\x03\x00\x00\x00\x05\x00abbbddddd' >>> Iter(this.sizes, Bytes).sizeof(sizes=[1,2,3,0]) 6 >>> spec.sizeof(**C) 17 :param iterable: iterable items to act upon - :param cases: A dictionary of cases or a function that takes takes a key and returns a construct spec. + :param cases: A dictionary of cases or a function that takes a key and returns a construct spec. :param default: The default case (only if cases is a dict) """ - __slots__ = ['iterable', 'cases', 'default'] - + def __init__(self, iterable, cases, default=None): - super(Iter, self).__init__() + super().__init__() self.iterable = iterable self.cases = cases self.default = default or Pass if not callable(cases): self.flagbuildnone = all(sc.flagbuildnone for sc in cases.values()) - self.flagembedded = all(sc.flagembedded for sc in cases.values()) + if hasattr(self, "flagembedded"): + self.flagembedded = all(sc.flagembedded for sc in cases.values()) def _parse(self, stream, context, path): iterator = iter(self.iterable(context)) if callable(self.iterable) else iter(self.iterable) @@ -1071,7 +1086,7 @@ def find_constructs(struct, data): ... 'int' / Int16ul, ... 'string' / CString()) >>> list(find_constructs(struct, b'\x01\x02\x03MZ\x0A\x00hello\x00\x03\x04MZ\x0B\x00world\x00\x00')) - [(3L, Container(int=10, string=u'hello')), (15L, Container(int=11, string=u'world'))] + [(3, Container(int=10, string=u'hello')), (15, Container(int=11, string=u'world'))] >>> list(find_constructs(struct, b'nope')) [] @@ -1111,7 +1126,7 @@ class Backwards(Subconstruct): e.g. >>> (Bytes(14) >> Backwards(Int32ul) >> Tell).parse(b'junk stuff\x01\x02\x00\x00') - ListContainer(['junk stuff\x01\x02\x00\x00', 513, 10L]) + ListContainer([b'junk stuff\x01\x02\x00\x00', 513, 10]) >>> spec = Struct(Seek(0, os.SEEK_END), 'name' / Backwards(String(9)), 'number' / Backwards(Int32ul)) >>> spec.parse(b'A BUNCH OF JUNK DATA\x01\x00\x00\x00joe shmoe') Container(name=u'joe shmoe', number=1) @@ -1121,7 +1136,8 @@ class Backwards(Subconstruct): >>> spec.parse(b'A BUNCH OF JUNK DATA\x01\x00\x00\x00joe shmoe\x00') Traceback (most recent call last): ... - SizeofError + construct.core.SizeofError: Error in path (parsing) -> name + However, GreedyBytes and GreedyString are allowed. >>> spec = Struct(Seek(0, os.SEEK_END), 'name' / Backwards(String(9)), 'rest' / Backwards(GreedyBytes)) @@ -1132,15 +1148,14 @@ class Backwards(Subconstruct): Container(name=u'joe shmoe', rest=u'hello') WARNING: This will also break if you read more data that is behind the current position. - >>> (Seek(0, os.SEEK_END) >> Backwards(String(10))).parse('yo') + >>> (Seek(0, os.SEEK_END) >> Backwards(String(10))).parse(b'yo') Traceback (most recent call last): ... - FormatFieldError: could not read enough bytes, expected 10, found 2 + construct.core.FormatFieldError: could not read enough bytes, expected 10, found 2 """ - __slots__ = ['greedy'] def __init__(self, subcon): - super(Backwards, self).__init__(subcon) + super().__init__(subcon) # GreedyBytes and GreedyString are allowed special cases. self.greedy = self.subcon is GreedyBytes or ( isinstance(self.subcon, construct.StringEncoded) and self.subcon.subcon is GreedyBytes) @@ -1181,8 +1196,8 @@ def _parse(self, stream, context, path): if offset1 > offset2: offset1, offset2 = offset2, offset1 fallback = stream.tell() - stream_seek(stream, offset1) - data = stream_read(stream, offset2-offset1) + stream_seek(stream, offset1, 0, path) + data = stream_read(stream, offset2-offset1, path) stream.seek(fallback) return Container(data=data, value=obj, offset1=offset1, offset2=offset2, length=(offset2-offset1)) @@ -1196,7 +1211,7 @@ def FocusLast(*subcons, **kw): parse a bunch of subconstructs and then grab the last element. >>> FocusLast(Byte, Byte, String(2)).parse(b'\x01\x02hi') - u'hi' + 'hi' >>> spec = FocusLast( ... 'a' / Byte, @@ -1204,9 +1219,9 @@ def FocusLast(*subcons, **kw): ... String(this.a + this.b), ... ) >>> spec.parse(b'\x01\x02hi!') - u'hi!' + 'hi!' >>> spec.build(u'hi!', a=1, b=2) - '\x01\x02hi!' + b'\x01\x02hi!' e.g.: diff --git a/mwcp/utils/construct/network.py b/mwcp/utils/construct/network.py index b616c02..623e2ef 100644 --- a/mwcp/utils/construct/network.py +++ b/mwcp/utils/construct/network.py @@ -2,7 +2,7 @@ Network constructs """ -from .version28 import * +from .core import * class _MACAddressAdapter(Adapter): diff --git a/mwcp/utils/construct/windows_enums.py b/mwcp/utils/construct/windows_enums.py index 5227361..151c935 100644 --- a/mwcp/utils/construct/windows_enums.py +++ b/mwcp/utils/construct/windows_enums.py @@ -3,7 +3,7 @@ This module will be imported along with 'from mwcp.utils import construct' """ -from .version28 import * +from .core import * # Visible interface. Add the classes and functions you would like to be available for users of construct # library here. @@ -26,8 +26,8 @@ def RegHive(subcon): Converts an integer to registry hive enum. >>> RegHive(Int32ul).build("HKCU") - '\x01\x00\x00\x80' - >>> str(RegHive(Int32ul).parse('\x01\x00\x00\x80')) + b'\x01\x00\x00\x80' + >>> str(RegHive(Int32ul).parse(b'\x01\x00\x00\x80')) 'HKCU' """ return Enum(subcon, **REGHIVES) @@ -46,8 +46,8 @@ def LanguageIdentifier(subcon): Converts an integer to language identifer enum >>> LanguageIdentifier(Int32ul).build("English (United States)") - '\t\x04\x00\x00' - >>> str(LanguageIdentifier(Int32ul).parse("\x04\x08\x00\x00")) + b'\t\x04\x00\x00' + >>> str(LanguageIdentifier(Int32ul).parse(b"\x04\x08\x00\x00")) 'Chinese (PRC)' """ return Enum(subcon, **LANGUAGEIDENTIFIERS) @@ -107,8 +107,8 @@ def KnownFolderID(subcon): Converts an integer to a CSIDL (KNownFolderID) value >>> KnownFolderID(Int32ul).build("CSIDL_SYSTEM") - '%\x00\x00\x00' - >>> str(KnownFolderID(Int32ul).parse("\x18\x00\x00\x00")) + b'%\x00\x00\x00' + >>> str(KnownFolderID(Int32ul).parse(b"\x18\x00\x00\x00")) 'CSIDL_COMMON_STARTUP' """ return Enum(subcon, **CSIDL) @@ -169,9 +169,9 @@ def AlgorithmID(subcon): r""" Converts an integer to an AlgorithmID value - >>> str(AlgorithmID(Int16ul).parse("\x00\xa4")) + >>> str(AlgorithmID(Int16ul).parse(b"\x00\xa4")) 'CALG_RSA_KEYX' >>> AlgorithmID(Int16ul).build("CALG_RC4") - '\x01h' + b'\x01h' """ return Enum(subcon, **ALGIDS) \ No newline at end of file diff --git a/mwcp/utils/construct/windows_structures.py b/mwcp/utils/construct/windows_structures.py index 36b50b7..36d6304 100644 --- a/mwcp/utils/construct/windows_structures.py +++ b/mwcp/utils/construct/windows_structures.py @@ -7,8 +7,8 @@ import datetime -from . import version28 as construct -from .version28 import this, len_ +from . import core as construct +from .core import this, len_ from . import network, datetime_, windows_enums from .windows_constants import * @@ -310,14 +310,14 @@ class SystemTimeAdapter(construct.Adapter): >>> SystemTimeAdapter(SYSTEMTIME).parse(b'\xdd\x07\t\x00\x03\x00\x12\x00\t\x00.\x00\x15\x00\xf2\x02') '2013-09-18T09:46:21.754000' >>> SystemTimeAdapter(SYSTEMTIME, tzinfo=datetime.timezone.utc).parse(b'\xdd\x07\t\x00\x03\x00\x12\x00\t\x00.\x00\x15\x00\xf2\x02') - '2013-09-18T09:46:21.754000+00:00 + '2013-09-18T09:46:21.754000+00:00' """ def __init__(self, subcon, tzinfo=None): """ :param tzinfo: Optional timezone object, default is localtime :param subcon: subcon to parse SystemTime """ - super(SystemTimeAdapter, self).__init__(subcon) + super().__init__(subcon) self._tzinfo = tzinfo def _decode(self, obj, context, path): @@ -346,7 +346,7 @@ class FileTimeAdapter(construct.Adapter): Technically FILETIME is two 32-bit integers as dwLowDateTime and dwHighDateTime, but there is no need to do that >>> FileTimeAdapter(construct.Int64ul).parse(b'\x00\x93\xcc\x11\xa7\x88\xd0\x01') - '2015-05-07T05:20:33' + '2015-05-07T05:20:33.328000' >>> FileTimeAdapter(construct.Int64ul, tz=datetime.timezone.utc).parse(b'\x00\x93\xcc\x11\xa7\x88\xd0\x01') '2015-05-07T09:20:33.328000+00:00' """ @@ -355,7 +355,7 @@ def __init__(self, subcon, tz=None): :param tz: Optional timezone object, default is localtime :param subcon: subcon to parse FileTime """ - super(FileTimeAdapter, self).__init__(subcon) + super().__init__(subcon) self._tz = tz def _decode(self, obj, context, path): diff --git a/mwcp/utils/pefileutils.py b/mwcp/utils/pefileutils.py index 68cb241..a626f95 100644 --- a/mwcp/utils/pefileutils.py +++ b/mwcp/utils/pefileutils.py @@ -5,7 +5,7 @@ import logging from pathlib import Path -from typing import List, Optional +from typing import List, Optional, Dict import pefile @@ -174,7 +174,7 @@ def obtain_exports_list(pe=None, file_data=None) -> List[bytes]: :param pe: pefile.PE object :param file_data: Input file data - :return: A list of export names, or None. + :return: A list of export names or an empty list. """ if file_data: pe = obtain_pe(file_data) @@ -203,14 +203,14 @@ def check_export(export_name, pe=None, file_data=None): return export_name in exports -def obtain_imported_dlls(pe=None, file_data=None) -> Optional[List[bytes]]: +def obtain_imported_dlls(pe=None, file_data=None) -> List[bytes]: """ Obtain a list of imported DLL names for the input PE file. :param pe: pefile.PE object, or None by default :param file_data: file data from which to create a pefile.PE object, or None by default - :return: List of imported DLLs, or None + :return: List of imported DLLs or an empty list """ if file_data: pe = obtain_pe(file_data) @@ -218,9 +218,9 @@ def obtain_imported_dlls(pe=None, file_data=None) -> Optional[List[bytes]]: try: return [imp.dll for imp in pe.DIRECTORY_ENTRY_IMPORT] except AttributeError: - return None + return [] else: - return None + return [] def obtain_imports_list(dll_name, pe=None, file_data=None): @@ -231,7 +231,7 @@ def obtain_imports_list(dll_name, pe=None, file_data=None): :param pe: pefile.PE object, or None by default :param file_data: file data from which to create a pefile.PE object, or None by default - :return: List of imports from the specified DLL, or None + :return: List of imports from the specified DLL or an empty list """ if file_data: pe = obtain_pe(file_data) @@ -241,9 +241,9 @@ def obtain_imports_list(dll_name, pe=None, file_data=None): if imp.dll.lower() == dll_name.lower(): return [imp_func.name for imp_func in imp.imports] except AttributeError: - return None + return [] else: - return None + return [] def is_imported(dll_name, func_name, pe=None, file_data=None): @@ -638,6 +638,24 @@ def offset(self): self._offset = self._pe.get_physical_by_rva(rva) return self._offset + @property + def strings_dict(self) -> Dict[int, str]: + """ + Dictionary of resource strings, mapped by their ID. Only applicable for RT_STRING resources. + """ + if hasattr(self._entry, "directory"): + directory = self._entry.directory + if hasattr(directory, "strings"): + return directory.strings + return {} + + @property + def strings(self) -> List[str]: + """ + List of strings found within the resource. Only applicable for RT_STRING resources. + """ + return list(self.strings_dict.values()) + def iter_rsrc(pe, dirtype=None): """ @@ -675,7 +693,7 @@ def extract_all_rsrc(pe=None, file_data=None): :param pe: pefile.PE object :param file_data: Input file data - :return: List of pefileutils.Resource objects, or an empty list. + :return: List of pefileutils.Resource objects or an empty list. """ if file_data: pe = obtain_pe(file_data) @@ -693,7 +711,7 @@ def extract_rsrc_dir(dirtype, pe=None, file_data=None): :param pe: pefile.PE object :param file_data: Input file data - :return: List of pefileutils.Resource objects matching the dirtype, or an empty list. + :return: List of pefileutils.Resource objects matching the dirtype or an empty list. """ if file_data: pe = obtain_pe(file_data) diff --git a/mwcp/utils/stringutils.py b/mwcp/utils/stringutils.py index 528b84c..045804c 100644 --- a/mwcp/utils/stringutils.py +++ b/mwcp/utils/stringutils.py @@ -3,6 +3,7 @@ """ import string +import sys import unicodedata @@ -18,9 +19,16 @@ def convert_to_unicode(input_value): VALID_FILENAME_CHARS = "-_.() {}{}".format(string.ascii_letters, string.digits).encode("ascii") -def sanitize_filename(filename): - """Convert given filename to sanitized version.""" +def sanitize_filename(filename: str) -> str: + """ + Convert given filename to sanitized version that is safe to be used to write to the file system. + """ filename = convert_to_unicode(filename) filename = unicodedata.normalize("NFKD", filename) # convert accented characters + filename = convert_to_unicode(bytes(c for c in filename.encode("ascii", "ignore") if c in VALID_FILENAME_CHARS)) - return convert_to_unicode(bytes(c for c in filename.encode("ascii", "ignore") if c in VALID_FILENAME_CHARS)) + # If in Windows, remove any `.lnk` extension to prevent issues with the file explorer. + if sys.platform == "win32" and filename.lower().endswith(".lnk"): + filename = filename[:-len(".lnk")] + "_lnk" + + return filename diff --git a/noxfile.py b/noxfile.py index 156a570..15cdae0 100644 --- a/noxfile.py +++ b/noxfile.py @@ -9,14 +9,14 @@ import nox -@nox.session(python="3.8") +@nox.session(python="3.10") def test(session): """Run pytests""" session.install("-e", ".[testing]") session.run("pytest") -@nox.session(python="3.8") +@nox.session(python="3.10") def build(session): """Build source and wheel distribution""" session.run("python", "setup.py", "sdist") diff --git a/setup.cfg b/setup.cfg index 2701380..73e8190 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,8 +1,8 @@ [metadata] version = attr:mwcp.__version__ description = A framework for malware configuration parsers. -long-description-content-type = text/markdown -long-description = file:README.md +long_description_content_type = text/markdown +long_description = file:README.md [tool:pytest] testpaths = mwcp/tests diff --git a/setup.py b/setup.py index cd41968..c9941fa 100755 --- a/setup.py +++ b/setup.py @@ -3,15 +3,15 @@ A framework for malware configuration parsers. """ -from setuptools import setup, find_packages +from setuptools import setup, find_namespace_packages setup( name="mwcp", author="DC3", - author_email="dcci@dc3.mil", + author_email="dc3.tsd@us.af.mil", keywords="malware", - url="http://github.com/Defense-Cyber-Crime-Center/DC3-MWCP/", - packages=find_packages(), + url="https://github.com/dod-cyber-crime-center/DC3-MWCP/", + packages=find_namespace_packages(), include_package_data=True, license='MIT', classifiers=[ @@ -19,8 +19,12 @@ 'Intended Audience :: Developers', 'License :: OSI Approved :: MIT License', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', ], + python_requires=">=3.9", entry_points={ 'console_scripts': [ 'mwcp = mwcp.cli:main', @@ -38,7 +42,7 @@ 'bitarray', 'cattrs', 'click>=8.0.1', - 'construct==2.9.45', # pin because parsers are very dependent on this. + 'construct >=2.9.45, <2.11', 'defusedxml', 'future', 'isodate',