Skip to content

Commit

Permalink
3.14.0 release
Browse files Browse the repository at this point in the history
  • Loading branch information
dc3-tsd committed Jun 6, 2024
1 parent 6e1d735 commit b870702
Show file tree
Hide file tree
Showing 41 changed files with 736 additions and 367 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
strategy:
matrix:
# This workflow can be matrixed against multiple Python versions if desired. eg. [3.7, 3.8, 3.9, "3.10"]
python-version: [ 3.8 ]
python-version: [ "3.11" ]

steps:
# Get the code from the repository to be linted, packaged, and pushed
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
*~
.DS_Store
.project
.vscode/

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down Expand Up @@ -64,3 +65,6 @@ target/

# PyCharm
/.idea

/scratch
TODO.txt
21 changes: 20 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
# Changelog
All notable changes to this project will be documented in this file.

## [3.14.0] - 2024-05-29

### Added
- Added `secret` and `key_derivation` fields to `EncryptionKey` metadata for storing KDF information.
- `construct` utility:
- Added `RegexSearch` and `RegexMatch` helpers.
- Added ability to embed constructs using `*` unpacking.

### Changed
- All pefileutils functions that are meant to return lists now return empty lists when empty instead of `None`.
- The temporary directory created by `FileObject.temp_path()` will not be deleted if `keep` is set to True
or the `--keep-tmp` command line flag is used. The last directory created will be symbolically
linked to `mwcp_current`.
- Relaxed `construct` version requirement to support 2.9 or 2.10

### Removed
- Dropped support for Python 3.8


## [3.13.1] - 2023-11-29

Expand Down Expand Up @@ -641,7 +659,8 @@ It is assumed if you are not updating/adding tests.
- Fixed broken markdown headings from @bryant1410


[Unreleased]: https://github.com/dod-cyber-crime-center/DC3-MWCP/compare/3.13.1...HEAD
[Unreleased]: https://github.com/dod-cyber-crime-center/DC3-MWCP/compare/3.14.0...HEAD
[3.14.0]: https://github.com/dod-cyber-crime-center/DC3-MWCP/compare/3.13.1...3.14.0
[3.13.1]: https://github.com/dod-cyber-crime-center/DC3-MWCP/compare/3.13.0...3.13.1
[3.13.0]: https://github.com/dod-cyber-crime-center/DC3-MWCP/compare/3.12.0...3.13.0
[3.12.0]: https://github.com/dod-cyber-crime-center/DC3-MWCP/compare/3.11.0...3.12.0
Expand Down
4 changes: 4 additions & 0 deletions docs/ParserComponents.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ with self.file_object.open() as fo:
A temporary file path can be generated in a context manager using `.temp_path()`.
This can be helpful for external utilities that require a real file path.

This directory will not be deleted after processing if the environment variable `MWCP_KEEP_TMP`
is set to `true` or `1`.
The last temporary directory created will be symbolically linked to `mwcp_current`. (In Windows, "Developer Mode" must be enabled.)

```python
with self.file_object.temp_path() as file_path:
_some_library_that_needs_a_path(file_path)
Expand Down
2 changes: 1 addition & 1 deletion mwcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
from mwcp.exceptions import *


__version__ = "3.13.1"
__version__ = "3.14.0"
26 changes: 23 additions & 3 deletions mwcp/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,16 @@ def _parse_parameters(params) -> dict:
"(e.g. --param aes_key:secret) "
"This flag can be provided multiple times for multiple parameters."
)
@click.option(
"--keep-tmp",
is_flag=True,
help="Keep temporary files generated by FileObject.temp_path()"
)
@click.argument("parser", required=True)
@click.argument("input", nargs=-1, type=click.Path())
def parse(
parser, input, yara_repo, recursive, format, split, output_dir, output_files, prefix, string_report,
include_filename, legacy, param
include_filename, legacy, param, keep_tmp
):
"""
Parses given input with given parser.
Expand All @@ -385,6 +390,8 @@ def parse(

if yara_repo:
mwcp.config["YARA_REPO"] = yara_repo
if keep_tmp:
mwcp.config["KEEP_TMP"] = True

# Python won't process wildcards when used through Windows command prompt.
if any("*" in path for path in input):
Expand Down Expand Up @@ -656,7 +663,11 @@ def _run_tests(tester, silent=False, show_passed=False):
help="DEPRECATED: Display test case details for passed tests as well."
"By default only failed tests are shown.",
)
@click.option("-s", "--silent", is_flag=True, help="Limit output to statement saying whether all tests passed or not.")
@click.option(
"-s", "--silent",
is_flag=True,
help="Limit output to statement saying whether all tests passed or not."
)
@click.option(
"--legacy/--no-legacy",
default=False,
Expand Down Expand Up @@ -711,11 +722,16 @@ def _run_tests(tester, silent=False, show_passed=False):
help="Whether to include code coverage information for parser files. "
"After tests are complete, reports can be generated using `coverage`. (e.g. `coverage html`)."
)
@click.option(
"--keep-tmp",
is_flag=True,
help="Keep temporary files generated by FileObject.temp_path()"
)
# Parser to process.
@click.argument("parser", nargs=-1, required=False)
def test(
testcase_dir, malware_repo, nprocs, update, add, add_filelist, delete, yes, force, last_failed, show_passed,
silent, legacy, exit_on_first, command, full_diff, yara_repo, recursive, param, cov, parser,
silent, legacy, exit_on_first, command, full_diff, yara_repo, recursive, param, cov, keep_tmp, parser,
):
"""
Testing utility to create and execute parser test cases.
Expand Down Expand Up @@ -750,6 +766,8 @@ def test(
mwcp.config["MALWARE_REPO"] = malware_repo
if yara_repo:
mwcp.config["YARA_REPO"] = yara_repo
if keep_tmp:
mwcp.config["KEEP_TMP"] = True

# Add files listed in filelist to add option.
if add_filelist:
Expand Down Expand Up @@ -893,6 +911,8 @@ def test(
pytest_args += ["--malware-repo", malware_repo]
if yara_repo:
pytest_args += ["--yara-repo", yara_repo]
if keep_tmp:
pytest_args += ["--keep-tmp"]
if exit_on_first:
pytest_args += ["-x"]

Expand Down
3 changes: 3 additions & 0 deletions mwcp/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ LOG_CONFIG_PATH: ./log_config.yml

# Directory containing yara signatures.
#YARA_REPO: ~/yara_repo

# Keep temporary directory created by FileObject.temp_path()
# KEEP_TMP: false
36 changes: 35 additions & 1 deletion mwcp/config/schema.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"$id": "https://github.com/raw/dod-cyber-crime-center/DC3-MWCP/3.13.1/mwcp/config/schema.json",
"$id": "https://github.com/raw/dod-cyber-crime-center/DC3-MWCP/3.14.0/mwcp/config/schema.json",
"title": "Report",
"description": "Defines the report of all metadata elements.",
"type": "object",
Expand Down Expand Up @@ -333,6 +333,23 @@
"type": "null"
}
]
},
"secret": {
"anyOf": [
{
"type": "string",
"contentEncoding": "base64"
},
{
"type": "null"
}
]
},
"key_derivation": {
"type": [
"string",
"null"
]
}
},
"additionalProperties": false,
Expand Down Expand Up @@ -421,6 +438,23 @@
"type": "null"
}
]
},
"secret": {
"anyOf": [
{
"type": "string",
"contentEncoding": "base64"
},
{
"type": "null"
}
]
},
"key_derivation": {
"type": [
"string",
"null"
]
}
},
"additionalProperties": false,
Expand Down
35 changes: 28 additions & 7 deletions mwcp/file_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import pefile

from mwcp import metadata
from mwcp import metadata, config
from mwcp.utils import elffileutils, pefileutils
from mwcp.utils.stringutils import convert_to_unicode, sanitize_filename

Expand Down Expand Up @@ -369,7 +369,7 @@ def compile_time(self) -> Optional[datetime.datetime]:
return datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)

@contextlib.contextmanager
def temp_path(self):
def temp_path(self, keep=False):
"""
Context manager for creating a temporary full file path to the file object.
This is useful for when you want to use this file on libraries which require
Expand All @@ -382,9 +382,29 @@ def temp_path(self):
with file_object.temp_path() as file_path:
_some_library_that_needs_a_path(file_path)
"""
# TODO: Provide and option to change location of temporary files through the use
# of the configuration file.
with tempfile.TemporaryDirectory(prefix="mwcp_") as tmpdir:
keep = (
keep
or config.get("KEEP_TMP", False)
or os.environ.get("MWCP_KEEP_TMP", "false").lower() in ("true", "t", "yes", "y", "1")
)
if keep:
tmpdir = tempfile.mkdtemp(prefix="mwcp_")
context = contextlib.nullcontext(tmpdir)
# Warn user since this should not be left on in production code.
logger.warning(f"Temporary directory '{tmpdir}' not set for deletion.")
# Set link to current temporary directory.
try:
mwcp_current = os.path.join(tempfile.gettempdir(), "mwcp_current")
if os.path.lexists(mwcp_current):
os.unlink(mwcp_current)
os.symlink(tmpdir, mwcp_current, target_is_directory=True)
except OSError:
# We can fail to create a symlink in Windows if "Developer Mode" is not enabled.
pass
else:
context = tempfile.TemporaryDirectory(prefix="mwcp_")

with context as tmpdir:
temp_file = os.path.join(tmpdir, sanitize_filename(self.name) if self.name else self.md5)
with open(temp_file, "wb") as fo:
fo.write(self.data)
Expand Down Expand Up @@ -486,7 +506,7 @@ def output(self):
self.reporter.add(metadata.File.from_file_object(self))

@contextlib.contextmanager
def disassembly(self, disassembler: str = None, report: Report = None, **config) -> ContextManager["dragodis.Disassembler"]:
def disassembly(self, disassembler: str = None, report: Report = None, keep=False, **config) -> ContextManager["dragodis.Disassembler"]:
"""
Produces a Dragodis Disassembler object for the file.
Dragodis must be installed for this work.
Expand All @@ -504,11 +524,12 @@ def disassembly(self, disassembler: str = None, report: Report = None, **config)
:param report: Provide the Report object if you want the annotated disassembler project file to
be added after processing.
This is usually only recommended if the parser plans to annotate the disassembly. e.g. API resolution
:param keep: Whether to prevent the temporary directory from being deleted.
"""
if not dragodis:
raise RuntimeError("Please install Dragodis to use this function.")

with self.temp_path() as file_path:
with self.temp_path(keep=keep) as file_path:
with dragodis.open_program(file_path, disassembler, **config) as dis:
bit_size = dis.bit_size
yield dis
Expand Down
49 changes: 37 additions & 12 deletions mwcp/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,7 @@ def as_stix(self, base_object, fixed_timestamp=None) -> STIXResult:
elif self.query:
result.add_linked(stix_extensions.ObservedString(purpose="url-query", value=self.query))
else:
warnings.warn("Skipped creation of STIX string since the parser provided no URL data")
logger.warning("Skipped creation of STIX string since the parser provided no URL data")
return result
else:
result.add_linked(stix.URL(value=self.url))
Expand Down Expand Up @@ -1726,11 +1726,20 @@ class EncryptionKey(Metadata):
mode="ecb",
iv=b"\x00\x00\x00\x00\x00\x00\x00\x01",
)
EncryptionKey(
b"\xa0u\xd1\x7f=E0s\x85?\x8188\xc1[\x80#\xb8\xc4\x87\x03\x8465O\xe5\x99\xc3\x94.\x1f\x95",
algorithm="aes",
mode="cbc",
secret=b"p@ssw0rd",
key_derivation="sha256"
)
"""
key: bytes
algorithm: str = None
mode: str = None
iv: bytes = None
secret: bytes = None
key_derivation: str = None

# Tests encodings in order by preference.
TEST_ENCODINGS = [
Expand Down Expand Up @@ -1789,18 +1798,22 @@ def _num_raw_bytes(string: str) -> int:
count += char.startswith(b"\\x") + char.startswith(b"\\u") * 2
return count

def _detect_encoding(self) -> Optional[str]:
def _detect_encoding(self, data: bytes = None) -> Optional[str]:
"""
Attempts to determine if the key can be encoded as a string.
Attempts to determine if the key (or another value) can be encoded as a string.
:returns: Best guess encoding if successful.
"""
if not data:
data = self.key
if not data:
return

# If user gave us the encoding, use that.
if self._encoding_set:
return self._encoding

# NOTE: Much of this is taken from rugosa.detect_encoding()
data = self.key
best_score = len(data) # lowest score is best
best_code_page = None
for code_page in self.TEST_ENCODINGS:
Expand All @@ -1818,24 +1831,30 @@ def _detect_encoding(self) -> Optional[str]:

return best_code_page

def as_formatted_dict(self, flat=False) -> dict:
# Convert key into hex number
key = f"0x{self.key.hex()}"

def formatted_bytes(self, data: bytes) -> str:
# Convert into hex number
value = f"0x{data.hex()}"
# Add context if encoding can be detected from key.
if encoding := self._detect_encoding():
key += f' ("{self.key.decode(encoding)}")'
if encoding := self._detect_encoding(data):
value += f' ("{data.decode(encoding)}")'
return value

def as_formatted_dict(self, flat=False) -> dict:
return {
"tags": self.tags,
"key": key,
"key": self.formatted_bytes(self.key),
"algorithm": self.algorithm,
"mode": self.mode,
"iv": f"0x{self.iv.hex()}" if self.iv else None,
"secret": self.formatted_bytes(self.secret) if self.secret else None,
"key_derivation": self.key_derivation,
}

def as_stix(self, base_object, fixed_timestamp=None) -> STIXResult:
params = {"key_hex": self.key.hex()}
params = {}

if self.key:
params["key_hex"] = self.key.hex()

if self.algorithm:
params["algorithm"] = self.algorithm
Expand All @@ -1846,6 +1865,12 @@ def as_stix(self, base_object, fixed_timestamp=None) -> STIXResult:
if self.iv:
params["iv_hex"] = self.iv.hex()

if self.secret:
params["secret_hex"] = self.secret.hex()

if self.key_derivation:
params["key_derivation"] = self.key_derivation

result = STIXResult(fixed_timestamp=fixed_timestamp)
result.add_linked(stix_extensions.SymmetricEncryption(**params))
result.create_tag_note(self, result.linked_stix[-1])
Expand Down
Loading

0 comments on commit b870702

Please sign in to comment.