diff --git a/changelog.d/20240624_175518_aurelien.gateau_sarif.md b/changelog.d/20240624_175518_aurelien.gateau_sarif.md new file mode 100644 index 0000000000..589f76b41a --- /dev/null +++ b/changelog.d/20240624_175518_aurelien.gateau_sarif.md @@ -0,0 +1,3 @@ +### Added + +- `ggshield secret scan` commands can now output results in [SARIF format](https://sarifweb.azurewebsites.net/), using the new `--format sarif` option (#869). diff --git a/changelog.d/20240626_173927_aurelien.gateau_sarif.md b/changelog.d/20240626_173927_aurelien.gateau_sarif.md new file mode 100644 index 0000000000..92d534ce19 --- /dev/null +++ b/changelog.d/20240626_173927_aurelien.gateau_sarif.md @@ -0,0 +1,3 @@ +### Added + +- `ggshield secret scan` commands can now produce [SARIF](https://sarifweb.azurewebsites.net/) output, using the new `--format sarif` option. diff --git a/ggshield/cmd/hmsl/check.py b/ggshield/cmd/hmsl/check.py index c7ca5f81eb..f834d6a2d4 100644 --- a/ggshield/cmd/hmsl/check.py +++ b/ggshield/cmd/hmsl/check.py @@ -10,7 +10,11 @@ naming_strategy_option, ) from ggshield.cmd.hmsl.hmsl_utils import check_secrets -from ggshield.cmd.utils.common_options import add_common_options, json_option +from ggshield.cmd.utils.common_options import ( + add_common_options, + json_option, + text_json_format_option, +) from ggshield.core.text_utils import display_info from ggshield.verticals.hmsl.collection import ( InputType, @@ -26,6 +30,7 @@ @click.command() @click.pass_context @add_common_options() +@text_json_format_option @json_option @full_hashes_option @naming_strategy_option @@ -37,7 +42,6 @@ def check_cmd( full_hashes: bool, naming_strategy: NamingStrategy, input_type: InputType, - json_output: bool, **kwargs: Any, ) -> int: """ @@ -58,7 +62,6 @@ def check_cmd( check_secrets( ctx=ctx, prepared_secrets=prepared_data, - json_output=json_output, full_hashes=full_hashes, ) diff --git a/ggshield/cmd/hmsl/check_secret_manager/hashicorp_vault.py b/ggshield/cmd/hmsl/check_secret_manager/hashicorp_vault.py index 70023bc780..7e84a3bb5b 100644 --- a/ggshield/cmd/hmsl/check_secret_manager/hashicorp_vault.py +++ b/ggshield/cmd/hmsl/check_secret_manager/hashicorp_vault.py @@ -8,7 +8,11 @@ naming_strategy_option, ) from ggshield.cmd.hmsl.hmsl_utils import check_secrets -from ggshield.cmd.utils.common_options import add_common_options, json_option +from ggshield.cmd.utils.common_options import ( + add_common_options, + json_option, + text_json_format_option, +) from ggshield.cmd.utils.context_obj import ContextObj from ggshield.core.errors import UnexpectedError from ggshield.core.text_utils import display_error, display_info, pluralize @@ -88,6 +92,7 @@ def _split_vault_mount_and_path(initial_path: str) -> Tuple[str, str]: type=str, ) @add_common_options() +@text_json_format_option @json_option @full_hashes_option @naming_strategy_option @@ -100,7 +105,6 @@ def check_hashicorp_vault_cmd( vault_path: str, full_hashes: bool, naming_strategy: NamingStrategy, - json_output: bool, **kwargs: Any, ) -> int: """ @@ -168,7 +172,6 @@ def check_hashicorp_vault_cmd( check_secrets( ctx=ctx, prepared_secrets=prepared_secrets, - json_output=json_output, full_hashes=full_hashes, ) diff --git a/ggshield/cmd/hmsl/decrypt.py b/ggshield/cmd/hmsl/decrypt.py index db620f3e52..2ef0b3b73d 100644 --- a/ggshield/cmd/hmsl/decrypt.py +++ b/ggshield/cmd/hmsl/decrypt.py @@ -4,7 +4,12 @@ import click from ggshield.cmd.hmsl.hmsl_common_options import input_arg -from ggshield.cmd.utils.common_options import add_common_options, json_option +from ggshield.cmd.utils.common_options import ( + add_common_options, + json_option, + text_json_format_option, +) +from ggshield.cmd.utils.context_obj import ContextObj from ggshield.core.errors import ParseError from ggshield.verticals.hmsl import Match, Secret from ggshield.verticals.hmsl.crypto import make_hint @@ -12,6 +17,7 @@ @click.command() +@click.pass_context @add_common_options() @click.option( "--mapping", @@ -22,12 +28,15 @@ show_default=True, help="File containing the hashes and their names.", ) +@text_json_format_option @json_option @input_arg -def decrypt_cmd(path: str, mapping_file: TextIO, json_output: bool, **_: Any) -> int: +def decrypt_cmd(ctx: click.Context, path: str, mapping_file: TextIO, **_: Any) -> int: """ Decrypt `query`'s output and show secrets information. """ + ctx_obj = ContextObj.get(ctx) + # Opens the file or stdin input = cast(TextIO, click.open_file(path, "r")) mapping: Dict[str, str] = load_mapping(mapping_file) @@ -36,7 +45,7 @@ def decrypt_cmd(path: str, mapping_file: TextIO, json_output: bool, **_: Any) -> try: secrets = decrypt(input, mapping) # Display the secrets - show_results(secrets, mapping, json_output) + show_results(secrets, mapping, ctx_obj.use_json) except (json.JSONDecodeError, TypeError): raise ParseError("Invalid format in input file.") diff --git a/ggshield/cmd/hmsl/hmsl_utils.py b/ggshield/cmd/hmsl/hmsl_utils.py index 828a2278ef..22ace7c012 100644 --- a/ggshield/cmd/hmsl/hmsl_utils.py +++ b/ggshield/cmd/hmsl/hmsl_utils.py @@ -14,7 +14,6 @@ def check_secrets( ctx: click.Context, prepared_secrets: PreparedSecrets, - json_output: bool, full_hashes: bool, ): """ @@ -22,8 +21,8 @@ def check_secrets( """ # Query the API display_info("Querying HasMySecretLeaked...") - config = ContextObj.get(ctx).config - client = get_client(config, hmsl_command_path=ctx.command_path) + ctx_obj = ContextObj.get(ctx) + client = get_client(ctx_obj.config, hmsl_command_path=ctx.command_path) found: Iterable[Secret] = [] error: Optional[Exception] = None try: @@ -35,6 +34,8 @@ def check_secrets( ) # Display results and error - show_results(found, prepared_secrets.mapping, json_output, error) + show_results( + found, prepared_secrets.mapping, json_output=ctx_obj.use_json, error=error + ) if error: raise UnexpectedError(str(error)) diff --git a/ggshield/cmd/iac/scan/iac_scan_common_options.py b/ggshield/cmd/iac/scan/iac_scan_common_options.py index 5059922d03..0f554f5910 100644 --- a/ggshield/cmd/iac/scan/iac_scan_common_options.py +++ b/ggshield/cmd/iac/scan/iac_scan_common_options.py @@ -21,6 +21,7 @@ ignore_path_option, json_option, minimum_severity_option, + text_json_format_option, ) from ggshield.cmd.utils.context_obj import ContextObj from ggshield.core.client import create_client_from_config @@ -62,6 +63,7 @@ def decorator(cmd: AnyFunction) -> AnyFunction: _ignore_policy_option(cmd) ignore_path_option(cmd) json_option(cmd) + text_json_format_option(cmd) return cmd return decorator diff --git a/ggshield/cmd/quota.py b/ggshield/cmd/quota.py index 7e2efa3601..6eac3f8883 100644 --- a/ggshield/cmd/quota.py +++ b/ggshield/cmd/quota.py @@ -5,7 +5,11 @@ from pygitguardian import GGClient from pygitguardian.models import Detail, Quota, QuotaResponse -from ggshield.cmd.utils.common_options import add_common_options, json_option +from ggshield.cmd.utils.common_options import ( + add_common_options, + json_option, + text_json_format_option, +) from ggshield.cmd.utils.context_obj import ContextObj from ggshield.cmd.utils.quota import format_quota_color from ggshield.core.client import create_client_from_config @@ -13,6 +17,7 @@ @click.command() +@text_json_format_option @json_option @add_common_options() @click.pass_context diff --git a/ggshield/cmd/sca/scan/scan_common_options.py b/ggshield/cmd/sca/scan/scan_common_options.py index 59c03a5393..910e145a0b 100644 --- a/ggshield/cmd/sca/scan/scan_common_options.py +++ b/ggshield/cmd/sca/scan/scan_common_options.py @@ -22,6 +22,7 @@ ignore_path_option, json_option, minimum_severity_option, + text_json_format_option, ) from ggshield.cmd.utils.context_obj import ContextObj from ggshield.core.client import create_client_from_config @@ -51,6 +52,7 @@ def decorator(cmd: AnyFunction) -> AnyFunction: minimum_severity_option(cmd) ignore_path_option(cmd) json_option(cmd) + text_json_format_option(cmd) ignore_fixable(cmd) ignore_not_fixable(cmd) return cmd diff --git a/ggshield/cmd/secret/scan/path.py b/ggshield/cmd/secret/scan/path.py index 75f88219b8..c1195b3978 100644 --- a/ggshield/cmd/secret/scan/path.py +++ b/ggshield/cmd/secret/scan/path.py @@ -18,9 +18,7 @@ @click.command() -@click.argument( - "paths", nargs=-1, type=RealPath(exists=True, resolve_path=True), required=True -) +@click.argument("paths", nargs=-1, type=RealPath(exists=True), required=True) @click.option("--recursive", "-r", is_flag=True, help="Scan directory recursively.") @click.option("--yes", "-y", is_flag=True, help="Confirm recursive scan.") @click.option( diff --git a/ggshield/cmd/secret/scan/secret_scan_common_options.py b/ggshield/cmd/secret/scan/secret_scan_common_options.py index 2849530637..63b11cefdf 100644 --- a/ggshield/cmd/secret/scan/secret_scan_common_options.py +++ b/ggshield/cmd/secret/scan/secret_scan_common_options.py @@ -10,14 +10,17 @@ exit_zero_option, get_config_from_context, json_option, + text_json_sarif_format_option, ) from ggshield.cmd.utils.context_obj import ContextObj +from ggshield.cmd.utils.output_format import OutputFormat from ggshield.core.config.user_config import SecretConfig from ggshield.core.filter import init_exclusion_regexes from ggshield.utils.click import RealPath from ggshield.verticals.secret.output import ( SecretJSONOutputHandler, SecretOutputHandler, + SecretSARIFOutputHandler, SecretTextOutputHandler, ) @@ -122,6 +125,7 @@ def add_secret_scan_common_options() -> Callable[[AnyFunction], AnyFunction]: def decorator(cmd: AnyFunction) -> AnyFunction: add_common_options()(cmd) json_option(cmd) + text_json_sarif_format_option(cmd) _output_option(cmd) _show_secrets_option(cmd) exit_zero_option(cmd) @@ -133,13 +137,18 @@ def decorator(cmd: AnyFunction) -> AnyFunction: return decorator +OUTPUT_HANDLER_CLASSES = { + OutputFormat.TEXT: SecretTextOutputHandler, + OutputFormat.JSON: SecretJSONOutputHandler, + OutputFormat.SARIF: SecretSARIFOutputHandler, +} + + def create_output_handler(ctx: click.Context) -> SecretOutputHandler: """Read objects defined in ctx.obj and create the appropriate OutputHandler instance""" ctx_obj = ContextObj.get(ctx) - output_handler_cls = ( - SecretJSONOutputHandler if ctx_obj.use_json else SecretTextOutputHandler - ) + output_handler_cls = OUTPUT_HANDLER_CLASSES[ctx_obj.output_format] config = ctx_obj.config return output_handler_cls( show_secrets=config.user_config.secret.show_secrets, diff --git a/ggshield/cmd/status.py b/ggshield/cmd/status.py index 16c23c3559..6bc2b0db2b 100644 --- a/ggshield/cmd/status.py +++ b/ggshield/cmd/status.py @@ -4,7 +4,11 @@ import click from pygitguardian.models import HealthCheckResponse -from ggshield.cmd.utils.common_options import add_common_options, json_option +from ggshield.cmd.utils.common_options import ( + add_common_options, + json_option, + text_json_format_option, +) from ggshield.cmd.utils.context_obj import ContextObj from ggshield.core.client import create_client_from_config from ggshield.core.errors import UnexpectedError @@ -12,6 +16,7 @@ @click.command() +@text_json_format_option @json_option @add_common_options() @click.pass_context diff --git a/ggshield/cmd/utils/common_options.py b/ggshield/cmd/utils/common_options.py index 5fd703f89d..0de7a772e5 100644 --- a/ggshield/cmd/utils/common_options.py +++ b/ggshield/cmd/utils/common_options.py @@ -12,12 +12,13 @@ """ from pathlib import Path -from typing import Any, Callable, Optional, TypeVar +from typing import Any, Callable, List, Optional, TypeVar import click from ggshield.cmd.utils.context_obj import ContextObj from ggshield.cmd.utils.debug_logs import setup_debug_logs +from ggshield.cmd.utils.output_format import OutputFormat from ggshield.core.config.user_config import UserConfig @@ -187,13 +188,52 @@ def decorator(cmd: AnyFunction) -> AnyFunction: return decorator +def _set_json_output_format( + ctx: click.Context, param: click.Parameter, value: Optional[bool] +) -> Optional[bool]: + if value: + ctx_obj = ContextObj.get(ctx) + ctx_obj.output_format = OutputFormat.JSON + return value + + json_option = click.option( "--json", "json_output", is_flag=True, default=None, - help="Use JSON output.", - callback=create_ctx_callback("use_json"), + help="Shorthand for `--format json`.", + callback=_set_json_output_format, +) + + +def _set_output_format( + ctx: click.Context, param: click.Parameter, value: Optional[str] +) -> Optional[str]: + if value: + ctx_obj = ContextObj.get(ctx) + ctx_obj.output_format = OutputFormat(value) + return value + + +def _create_format_option( + formats: List[OutputFormat], +) -> Callable[[click.decorators.FC], click.decorators.FC]: + return click.option( + "--format", + type=click.Choice([x.value for x in formats]), + help="Format to use for the output.", + callback=_set_output_format, + ) + + +# If a command only supports text and json formats, it should use this option +text_json_format_option = _create_format_option([OutputFormat.TEXT, OutputFormat.JSON]) + + +# If a command supports text, sarif and json formats, it should use this option +text_json_sarif_format_option = _create_format_option( + [OutputFormat.TEXT, OutputFormat.JSON, OutputFormat.SARIF] ) diff --git a/ggshield/cmd/utils/context_obj.py b/ggshield/cmd/utils/context_obj.py index 0e25ddfeca..4f97d2a3ad 100644 --- a/ggshield/cmd/utils/context_obj.py +++ b/ggshield/cmd/utils/context_obj.py @@ -4,6 +4,7 @@ import click from pygitguardian import GGClient +from ggshield.cmd.utils.output_format import OutputFormat from ggshield.core.cache import Cache from ggshield.core.config import Config from ggshield.core.ui.ggshield_ui import GGShieldUI @@ -42,12 +43,15 @@ def __init__(self): # Set to false by the --no-check-for-updates option self.check_for_updates = True - # Set by the --json option - self.use_json = False + self.output_format = OutputFormat.TEXT # Set by the --output option self.output: Optional[Path] = None + @property + def use_json(self) -> bool: + return self.output_format == OutputFormat.JSON + @property def config(self) -> Config: assert self._config diff --git a/ggshield/cmd/utils/output_format.py b/ggshield/cmd/utils/output_format.py new file mode 100644 index 0000000000..295ff34fea --- /dev/null +++ b/ggshield/cmd/utils/output_format.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class OutputFormat(Enum): + """The output format used by the various commands.""" + + TEXT = "text" + JSON = "json" + SARIF = "sarif" diff --git a/ggshield/core/scan/commit.py b/ggshield/core/scan/commit.py index 526b36710c..3c7e6c050c 100644 --- a/ggshield/core/scan/commit.py +++ b/ggshield/core/scan/commit.py @@ -94,12 +94,13 @@ def parser(commit: "Commit") -> Iterable[Scannable]: def from_patch( patch: str, exclusion_regexes: Optional[Set[Pattern[str]]] = None, + sha: str = PATCH_PREFIX, ) -> "Commit": """This one is for tests""" info = CommitInformation.from_patch_header(patch) def parser(commit: "Commit") -> Iterable[Scannable]: - yield from parse_patch(PATCH_PREFIX, patch, exclusion_regexes) + yield from parse_patch(sha, patch, exclusion_regexes) return Commit(sha=None, patch_parser=parser, info=info) diff --git a/ggshield/core/scan/file.py b/ggshield/core/scan/file.py index b0f6e97b8c..2898cf264a 100644 --- a/ggshield/core/scan/file.py +++ b/ggshield/core/scan/file.py @@ -8,6 +8,7 @@ UnexpectedDirectoryError, get_filepaths, is_path_binary, + url_for_path, ) from .scannable import Scannable @@ -22,7 +23,7 @@ def __init__(self, path: Union[str, Path]): @property def url(self) -> str: - return f"file://{self._path.absolute().as_posix()}" + return url_for_path(self._path) @property def filename(self) -> str: diff --git a/ggshield/utils/files.py b/ggshield/utils/files.py index 8b29044f74..c057b6d6e0 100644 --- a/ggshield/utils/files.py +++ b/ggshield/utils/files.py @@ -1,6 +1,7 @@ from enum import Enum, auto -from pathlib import Path, PurePosixPath +from pathlib import Path, PurePath, PurePosixPath from typing import List, Pattern, Set, Union +from urllib.parse import quote from ggshield.utils._binary_extensions import BINARY_EXTENSIONS from ggshield.utils.git_shell import ( @@ -92,3 +93,16 @@ def is_path_binary(path: Union[str, Path]) -> bool: ext = Path(path).suffix # `[1:]` because `ext` starts with a "." but extensions in `BINARY_EXTENSIONS` do not return ext[1:] in BINARY_EXTENSIONS + + +def url_for_path(path: PurePath) -> str: + if not path.is_absolute(): + return quote(path.as_posix()) + + # Allow ':'. This is required to represent the Windows drive in an URL. + path_str = quote(path.as_posix(), safe="/:") + if path_str[0] == "/": + return f"file://{path_str}" + else: + # This happens for Windows paths: `path_str` is something like "c:/foo/bar" + return f"file:///{path_str}" diff --git a/ggshield/verticals/secret/output/__init__.py b/ggshield/verticals/secret/output/__init__.py index 1dc53e8fab..45655ad387 100644 --- a/ggshield/verticals/secret/output/__init__.py +++ b/ggshield/verticals/secret/output/__init__.py @@ -1,12 +1,14 @@ from .secret_gitlab_webui_output_handler import SecretGitLabWebUIOutputHandler from .secret_json_output_handler import SecretJSONOutputHandler from .secret_output_handler import SecretOutputHandler +from .secret_sarif_output_handler import SecretSARIFOutputHandler from .secret_text_output_handler import SecretTextOutputHandler __all__ = [ "SecretOutputHandler", "SecretJSONOutputHandler", + "SecretSARIFOutputHandler", "SecretTextOutputHandler", "SecretGitLabWebUIOutputHandler", ] diff --git a/ggshield/verticals/secret/output/secret_sarif_output_handler.py b/ggshield/verticals/secret/output/secret_sarif_output_handler.py new file mode 100644 index 0000000000..5a44c17346 --- /dev/null +++ b/ggshield/verticals/secret/output/secret_sarif_output_handler.py @@ -0,0 +1,139 @@ +import json +from typing import Any, Dict, Iterable, List, cast + +from pygitguardian.client import VERSIONS +from pygitguardian.models import PolicyBreak + +from ggshield import __version__ as ggshield_version +from ggshield.core.filter import get_ignore_sha +from ggshield.core.match_span import MatchSpan + +from ..extended_match import ExtendedMatch +from ..secret_scan_collection import Result, SecretScanCollection +from .secret_output_handler import SecretOutputHandler + + +SCHEMA_URL = "https://docs.oasis-open.org/sarif/sarif/v2.1.0/errata01/os/schemas/sarif-schema-2.1.0.json" + + +class SecretSARIFOutputHandler(SecretOutputHandler): + + def _process_scan_impl(self, scan: SecretScanCollection) -> str: + dct = { + "version": "2.1.0", + "$schema": SCHEMA_URL, + "runs": [ + { + "tool": { + "driver": { + "organization": "GitGuardian", + "name": "ggshield", + "informationUri": "https://github.com/GitGuardian/ggshield", + "version": ggshield_version, + }, + "extensions": [ + { + "name": "secret", + "version": VERSIONS.secrets_engine_version, + } + ], + }, + "results": list(_create_sarif_results(scan.get_all_results())), + } + ], + } + return json.dumps(dct) + + +def _create_sarif_results(results: Iterable[Result]) -> Iterable[Dict[str, Any]]: + """ + Creates SARIF result dicts for our Result instances. Creates one SARIF result dict + per policy break. + """ + for result in results: + for policy_break in result.scan.policy_breaks: + yield _create_sarif_result_dict(result.url, policy_break) + + +def _create_sarif_result_dict( + url: str, + policy_break: PolicyBreak, +) -> Dict[str, Any]: + # Prepare message with links to the related location for each match + matches_str = ", ".join( + f"[{m.match_type}]({id})" for id, m in enumerate(policy_break.matches) + ) + matches_li = "\n".join( + f"- [{m.match_type}]({id})" for id, m in enumerate(policy_break.matches) + ) + extended_matches = cast(List[ExtendedMatch], policy_break.matches) + message = f"Secret detected: {policy_break.break_type}.\nMatches: {matches_str}" + markdown_message = ( + f"Secret detected: {policy_break.break_type}\nMatches:\n{matches_li}" + ) + + # Create dict + dct = { + "ruleId": policy_break.break_type, + "level": "error", + "message": { + "text": message, + "markdown": markdown_message, + }, + "locations": [ + _create_location_dict(url, [m.span for m in extended_matches]), + ], + "relatedLocations": [ + _create_related_location_dict(url, id, m) + for id, m in enumerate(extended_matches) + ], + "partialFingerprints": { + "secret/v1": get_ignore_sha(policy_break), + }, + } + if policy_break.incident_url: + dct["hostedViewerUri"] = policy_break.incident_url + return dct + + +def _create_location_dict( + url: str, + match_spans: List[MatchSpan], +) -> Dict[str, Any]: + # Create a span from the start of the first match to the end of the last match + start_pos = min((x.line_index_start, x.column_index_start) for x in match_spans) + end_pos = max((x.line_index_end, x.column_index_end) for x in match_spans) + span = MatchSpan( + line_index_start=start_pos[0], + line_index_end=end_pos[0], + column_index_start=start_pos[1], + column_index_end=end_pos[1], + ) + + return {"physicalLocation": _create_physical_location_dict(url, span)} + + +def _create_related_location_dict( + url: str, + id: int, + match: ExtendedMatch, +) -> Dict[str, Any]: + return { + "id": id, + "physicalLocation": _create_physical_location_dict(url, match.span), + "message": {"text": match.match_type}, + } + + +def _create_physical_location_dict(url: str, match_span: MatchSpan) -> Dict[str, Any]: + return { + "artifactLocation": { + "uri": url, + }, + "region": { + "startLine": match_span.line_index_start + 1, + "startColumn": match_span.column_index_start + 1, + "endLine": match_span.line_index_end + 1, + "endColumn": match_span.column_index_end + 1, + }, + } diff --git a/tests/unit/core/scan/test_file.py b/tests/unit/core/scan/test_file.py index 5a45ad8060..58341bad79 100644 --- a/tests/unit/core/scan/test_file.py +++ b/tests/unit/core/scan/test_file.py @@ -190,7 +190,7 @@ def test_file_repr(): """ if is_windows(): str_path = r"c:\Windows" - expected_url = "file://c:/Windows" + expected_url = "file:///c:/Windows" else: str_path = "/usr" expected_url = "file:///usr" diff --git a/tests/unit/utils/test_files.py b/tests/unit/utils/test_files.py index 695298454d..094e4c8d2a 100644 --- a/tests/unit/utils/test_files.py +++ b/tests/unit/utils/test_files.py @@ -2,13 +2,18 @@ import sys import tarfile from io import BytesIO -from pathlib import Path +from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath from typing import Set, Union import pytest from ggshield.core.tar_utils import get_empty_tar -from ggshield.utils.files import ListFilesMode, get_filepaths, is_path_excluded +from ggshield.utils.files import ( + ListFilesMode, + get_filepaths, + is_path_excluded, + url_for_path, +) from tests.repository import Repository @@ -98,3 +103,19 @@ def test_get_filepaths_git_repo(tmp_path: Path): list_files_mode=ListFilesMode.ALL_BUT_GITIGNORED, ) ) == {committed_file, staged_file, unstaged_file, gitignore} + + +@pytest.mark.parametrize( + "path,expected_url", + [ + (PurePosixPath("/simple/path"), "file:///simple/path"), + (PureWindowsPath(r"c:\Windows"), "file:///c:/Windows"), + (PurePosixPath("relative/path"), "relative/path"), + (PureWindowsPath(r"relative\win\path"), "relative/win/path"), + (PurePosixPath("/path/with spaces"), "file:///path/with%20spaces"), + (PurePosixPath("/étoile"), "file:///%C3%A9toile"), + ], +) +def test_url_for_path(path: PurePath, expected_url: str): + url = url_for_path(path) + assert url == expected_url diff --git a/tests/unit/verticals/secret/output/test_sarif_output.py b/tests/unit/verticals/secret/output/test_sarif_output.py new file mode 100644 index 0000000000..44f331fd3f --- /dev/null +++ b/tests/unit/verticals/secret/output/test_sarif_output.py @@ -0,0 +1,291 @@ +import json +from typing import Any, Dict, TypedDict +from unittest import mock + +import pytest +from pygitguardian.models import PolicyBreak, ScanResult +from pytest_voluptuous import S +from voluptuous import Optional as VOptional +from voluptuous import validators + +from ggshield.core.scan import Commit +from ggshield.verticals.secret import Result, Results, SecretScanCollection +from ggshield.verticals.secret.output import SecretSARIFOutputHandler +from ggshield.verticals.secret.output.secret_sarif_output_handler import SCHEMA_URL +from tests.unit.conftest import ( + _MULTI_SECRET_ONE_LINE_FULL_PATCH, + _MULTI_SECRET_ONE_LINE_PATCH_SCAN_RESULT, + _MULTIPLE_SECRETS_PATCH, + _MULTIPLE_SECRETS_SCAN_RESULT, + _ONE_LINE_AND_MULTILINE_PATCH, + _ONE_LINE_AND_MULTILINE_PATCH_SCAN_RESULT, +) + + +VERSION_VALIDATOR = validators.Match(r"\d+\.\d+\.\d+") + + +TOOL_SCHEMA = S( + { + "driver": { + "organization": "GitGuardian", + "name": "ggshield", + "informationUri": str, + "version": VERSION_VALIDATOR, + }, + "extensions": [ + { + "name": str, + "version": VERSION_VALIDATOR, + } + ], + } +) + +EMPTY_RESULT_SCHEMA = S( + { + "version": "2.1.0", + "$schema": SCHEMA_URL, + "runs": [{"tool": TOOL_SCHEMA, "results": []}], + } +) + +MIN_1_INT = validators.Range(min=1) + +SARIF_PHYSICAL_LOCATION_DICT_SCHEMA = S( + { + "artifactLocation": { + "uri": str, + }, + "region": { + "startLine": MIN_1_INT, + "startColumn": MIN_1_INT, + "endLine": MIN_1_INT, + "endColumn": MIN_1_INT, + }, + } +) + +SARIF_RESULT_DICT_SCHEMA = S( + { + "ruleId": str, + "level": "error", + "message": { + "text": str, + "markdown": str, + }, + "locations": [ + { + "physicalLocation": SARIF_PHYSICAL_LOCATION_DICT_SCHEMA, + } + ], + "relatedLocations": [ + { + "id": int, + "physicalLocation": SARIF_PHYSICAL_LOCATION_DICT_SCHEMA, + "message": {"text": str}, + } + ], + "partialFingerprints": {"secret/v1": str}, + VOptional("hostedViewerUri"): str, + } +) + +SCHEMA_WITH_INCIDENTS = S( + { + "version": "2.1.0", + "$schema": SCHEMA_URL, + "runs": [{"tool": TOOL_SCHEMA, "results": [SARIF_RESULT_DICT_SCHEMA]}], + } +) + + +@pytest.fixture() +def init_secrets_engine_version(): + # Init secrets engine version: it's not set if we don't make an API call + with mock.patch( + "ggshield.verticals.secret.output.secret_sarif_output_handler.VERSIONS" + ) as versions: + versions.secrets_engine_version = "3.14.1" + yield + + +def test_sarif_output_no_secrets(init_secrets_engine_version): + """ + GIVEN an empty scan collection + WHEN SecretSARIFOutputHandler runs on it + THEN it outputs an empty SARIF document + """ + scan = SecretScanCollection(id="path", type="test", results=Results()) + handler = SecretSARIFOutputHandler(verbose=True, show_secrets=False) + output = handler._process_scan_impl(scan) + dct = json.loads(output) + + assert EMPTY_RESULT_SCHEMA == dct + + +@pytest.mark.parametrize( + "patch,scan_result", + [ + pytest.param( + _MULTIPLE_SECRETS_PATCH, + _MULTIPLE_SECRETS_SCAN_RESULT, + id="_MULTIPLE_SECRETS_PATCH", + ), + pytest.param( + _MULTI_SECRET_ONE_LINE_FULL_PATCH, + _MULTI_SECRET_ONE_LINE_PATCH_SCAN_RESULT, + id="_MULTI_SECRET_ONE_LINE_FULL_PATCH", + ), + pytest.param( + _ONE_LINE_AND_MULTILINE_PATCH, + _ONE_LINE_AND_MULTILINE_PATCH_SCAN_RESULT, + id="_ONE_LINE_AND_MULTILINE_PATCH", + ), + ], +) +def test_sarif_output_for_flat_scan_with_secrets( + init_secrets_engine_version, patch: str, scan_result: ScanResult +): + """ + GIVEN a patch containing secrets and a scan result + WHEN SecretSARIFOutputHandler runs on it + THEN it outputs a SARIF document pointing to the secrets + """ + handler = SecretSARIFOutputHandler(verbose=True, show_secrets=False) + + commit = Commit.from_patch(patch) + scannable = next(commit.get_files()) + + result = Result(file=scannable, scan=scan_result) + results = Results(results=[result]) + scan = SecretScanCollection(id="path", type="test", results=results) + + output = handler._process_scan_impl(scan) + json_dict = json.loads(output) + + assert SCHEMA_WITH_INCIDENTS == json_dict + + sarif_results = json_dict["runs"][0]["results"] + + # Check each found secret is correctly represented + for sarif_result, policy_break in zip(sarif_results, scan_result.policy_breaks): + check_sarif_result(sarif_result, scannable.content, policy_break) + + assert len(sarif_results) == len(scan_result.policy_breaks) + + +PATCHES_AND_RESULTS = [ + ( + _MULTIPLE_SECRETS_PATCH, + _MULTIPLE_SECRETS_SCAN_RESULT, + ), + ( + _MULTI_SECRET_ONE_LINE_FULL_PATCH, + _MULTI_SECRET_ONE_LINE_PATCH_SCAN_RESULT, + ), + ( + _ONE_LINE_AND_MULTILINE_PATCH, + _ONE_LINE_AND_MULTILINE_PATCH_SCAN_RESULT, + ), +] + + +def test_sarif_output_for_nested_scan(init_secrets_engine_version): + """ + GIVEN a scan results for 3 patches containing secrets + WHEN SecretSARIFOutputHandler runs on it + THEN it outputs a SARIF document pointing to the secrets + """ + handler = SecretSARIFOutputHandler(verbose=True, show_secrets=False) + + nested_scans = [] + contents = [] + for idx, (patch, scan_result) in enumerate(PATCHES_AND_RESULTS): + commit = Commit.from_patch(patch, sha=f"abcd{idx}") + scannable = next(commit.get_files()) + contents.append(scannable.content) + + result = Result(file=scannable, scan=scan_result) + results = Results(results=[result]) + scan = SecretScanCollection(id=f"nested{idx}", type="test", results=results) + nested_scans.append(scan) + + scan = SecretScanCollection(id="scan", type="test", scans=nested_scans) + + output = handler._process_scan_impl(scan) + json_dict = json.loads(output) + + assert SCHEMA_WITH_INCIDENTS == json_dict + + # Create a flat list of policy breaks + policy_breaks = sum( + (s.results.results[0].scan.policy_breaks for s in scan.scans), [] + ) + + # Check each found secret is correctly represented + sarif_results = json_dict["runs"][0]["results"] + for content, sarif_result, policy_break in zip( + contents, sarif_results, policy_breaks + ): + check_sarif_result(sarif_result, content, policy_break) + + assert len(sarif_results) == len(policy_breaks) + + +def check_sarif_result( + sarif_result: Dict[str, Any], content: str, policy_break: PolicyBreak +): + """Check sarif_result contains a representation of policy_break, applied to content""" + + # Check the secret name + secret_name = sarif_result["ruleId"] + assert secret_name == policy_break.break_type + + # Check the matches point to the right part of the content. `expected_matches` + # and `actual matches` are dicts of match_name => matched_text. + expected_matches = { + m.match_type: content[m.index_start : m.index_end + 1] + for m in policy_break.matches + } + + actual_matches = {} + for location in sarif_result["relatedLocations"]: + match_name = location["message"]["text"] + region = location["physicalLocation"]["region"] + matched_text = get_content_from_region(content, region) + actual_matches[match_name] = matched_text + + assert actual_matches == expected_matches + + +class RegionDict(TypedDict): + startLine: int + startColumn: int + endLine: int + endColumn: int + + +def get_content_from_region(content: str, region: RegionDict) -> str: + # Convert region values into 0-based indices + # Make end values point *after* the last element + start_line = region["startLine"] - 1 + + # endLine is 1-based but points to the line containing the end, so it does not need + # to be decreased by 1 + end_line = region["endLine"] + + start_column = region["startColumn"] - 1 + + # endColumn is 1-based and points to the character after the match, so it needs to + # be decreased by 1 + end_column = region["endColumn"] - 1 + + lines = content.splitlines()[start_line:end_line] + + # Cut start and end. Do the end first because if we cut the start first then + # `end_column` will be invalid for 1-line regions + lines[-1] = lines[-1][:end_column] + lines[0] = lines[0][start_column:] + + return "\n".join(lines)