Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): inventory search and saved queries from cli #637

Merged
merged 11 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions censys/asm/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,11 @@ def _get_logbook_page(
args = {"cursor": res["nextCursor"]}

yield from res["events"]

def get_workspace_id(self) -> str:
grace-murphy marked this conversation as resolved.
Show resolved Hide resolved
"""Get the workspace ID.

Returns:
str: The workspace ID.
"""
return self._get("/integrations/v1/account")["workspaceId"]
27 changes: 21 additions & 6 deletions censys/asm/inventory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Interact with the Censys Inventory Search API."""
import warnings
from typing import List, Optional

from .api import CensysAsmAPI
Expand All @@ -11,7 +12,7 @@ class InventorySearch(CensysAsmAPI):

def search(
self,
workspaces: List[str],
workspaces: Optional[List[str]] = None,
query: Optional[str] = None,
page_size: Optional[int] = None,
cursor: Optional[str] = None,
Expand All @@ -21,7 +22,7 @@ def search(
"""Search inventory data.

Args:
workspaces (List[str]): List of workspace IDs to search.
workspaces (List[str], optional): List of workspace IDs to search. Deprecated. The workspace associated with `CENSYS-API-KEY` will be used automatically.
query (str, optional): Query string.
page_size (int, optional): Number of results to return. Defaults to 50.
cursor (str, optional): Cursor to start search from.
Expand All @@ -31,17 +32,31 @@ def search(
Returns:
dict: Inventory search results.
"""
if workspaces is None:
workspaces = [self.get_workspace_id()]
else:
warnings.warn(
"The field 'workspaces' is being deprecated. The workspace associated with `CENSYS-API-KEY` will be used automatically.",
category=DeprecationWarning,
stacklevel=2,
)
if page_size is None:
page_size = 50

args = {
"workspaces": workspaces,
"query": query,
"pageSize": page_size,
"cursor": cursor,
"sort": sort,
"fields": fields,
}

if query:
args["query"] = query
if cursor:
args["cursor"] = cursor
if sort:
args["sort"] = sort
if fields:
args["fields"] = fields

return self._get(self.base_path, args=args)

def aggregate(
Expand Down
192 changes: 192 additions & 0 deletions censys/cli/commands/asm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from rich.progress import Progress, TaskID
from rich.prompt import Confirm, Prompt

from censys.asm.api import CensysAsmAPI
from censys.asm.inventory import InventorySearch
from censys.asm.saved_queries import SavedQueries
from censys.asm.seeds import SEED_TYPES, Seeds
from censys.cli.utils import console
Expand Down Expand Up @@ -40,6 +42,14 @@ def cli_asm_config(_: argparse.Namespace): # pragma: no cover

api_key = Prompt.ask(api_key_prompt, console=console) or api_key

try:
c = CensysAsmAPI(api_key)
w = c.get_workspace_id()
console.print(f"Successfully authenticated to workspace {w}")
except CensysUnauthorizedException:
console.print("Failed to authenticate")
sys.exit(1)

if not api_key:
console.print("Please enter valid credentials")
sys.exit(1)
Expand Down Expand Up @@ -530,6 +540,75 @@ def cli_delete_saved_query_by_id(args: argparse.Namespace):
sys.exit(1)


def cli_execute_saved_query_by_name(args: argparse.Namespace):
"""Execute saved query by name subcommand.

Args:
args (Namespace): Argparse Namespace.
"""
s = InventorySearch(args.api_key)
q = SavedQueries(args.api_key)
# get query from name
queries = q.get_saved_queries(args.query_name, 1, 1)
results = queries.get("results")
if not results:
console.print("No saved query found with that name.")
sys.exit(1)
query = results[0]["query"]

try:
res = s.search(None, query, args.page_size, None, args.sort, args.fields)
console.print_json(json.dumps(res))
thehappydinoa marked this conversation as resolved.
Show resolved Hide resolved
except CensysAsmException:
console.print("Failed to execute saved query.")
sys.exit(1)


def cli_execute_saved_query_by_id(args: argparse.Namespace):
"""Execute saved query by id subcommand.

Args:
args (Namespace): Argparse Namespace.
"""
s = InventorySearch(args.api_key)
q = SavedQueries(args.api_key)
try:
query_json = q.get_saved_query_by_id(args.query_id)
query = query_json["result"]["query"]
except (KeyError, CensysAsmException):
console.print("No saved query found with that ID.")
sys.exit(1)
try:
res = s.search(None, query, args.page_size, None, args.sort, args.fields)
console.print_json(json.dumps(res))
except CensysAsmException:
console.print("Failed to execute saved query.")
sys.exit(1)


def cli_search(args: argparse.Namespace):
"""Inventory search subcommand.

Args:
args (Namespace): Argparse Namespace.
"""
s = InventorySearch(args.api_key)

try:
res = s.search(
args.workspaces,
args.query,
args.page_size,
args.cursor,
args.sort,
args.fields,
)
console.print_json(json.dumps(res))
except CensysAsmException:
console.print("Failed to execute query.")
sys.exit(1)


def include(parent_parser: argparse._SubParsersAction, parents: dict):
"""Include this subcommand into the parent parser.

Expand Down Expand Up @@ -775,3 +854,116 @@ def add_verbose(parser):
)
add_verbose(delete_saved_query_by_id_parser)
delete_saved_query_by_id_parser.set_defaults(func=cli_delete_saved_query_by_id)

execute_saved_query_by_name_parser = asm_subparser.add_parser(
"execute-saved-query-by-name",
description="Execute a saved query by name in inventory search",
help="execute saved query by name",
parents=[parents["asm_auth"]],
)
execute_saved_query_by_name_parser.add_argument(
"--query-name",
help="Query name",
type=str,
required=True,
)
execute_saved_query_by_name_parser.add_argument(
"--page-size",
help="Number of results to return. Defaults to 50.",
type=int,
default=50,
)
execute_saved_query_by_name_parser.add_argument(
"--sort",
help="Sort order for results",
type=List[str],
default=[],
)
execute_saved_query_by_name_parser.add_argument(
"--fields",
help="Fields to include in results",
type=List[str],
default=[],
)
add_verbose(execute_saved_query_by_name_parser)
execute_saved_query_by_name_parser.set_defaults(
func=cli_execute_saved_query_by_name
)

execute_saved_query_by_id_parser = asm_subparser.add_parser(
"execute-saved-query-by-id",
description="Execute a saved query by id in inventory search",
help="execute saved query by id",
parents=[parents["asm_auth"]],
)
execute_saved_query_by_id_parser.add_argument(
"--query-id",
help="Query ID",
type=str,
required=True,
)
execute_saved_query_by_id_parser.add_argument(
"--page-size",
help="Number of results to return. Defaults to 50.",
type=int,
default=50,
)
execute_saved_query_by_id_parser.add_argument(
"--sort",
help="Sort order for results",
type=List[str],
default=[],
)
execute_saved_query_by_id_parser.add_argument(
"--fields",
help="Fields to include in results",
type=List[str],
default=[],
)
add_verbose(execute_saved_query_by_id_parser)
execute_saved_query_by_id_parser.set_defaults(func=cli_execute_saved_query_by_id)

search_parser = asm_subparser.add_parser(
"search",
description="Execute a query in inventory search",
help="execute query in inventory search",
parents=[parents["asm_auth"]],
)
search_parser.add_argument(
"--query",
help="Query string",
type=str,
required=True,
)
search_parser.add_argument(
"--page-size",
help="Number of results to return. Defaults to 50.",
type=int,
default=50,
)
search_parser.add_argument(
"--cursor",
help="Cursor to use for pagination",
type=str,
default="",
)
search_parser.add_argument(
"--sort",
help="Sort order for results",
type=List[str],
default=[],
)
search_parser.add_argument(
"--fields",
help="Fields to include in results",
type=List[str],
default=[],
)
search_parser.add_argument(
"--workspaces",
help="Workspace IDs to search. Deprecated. The workspace associated with `CENSYS-API-KEY` will be used automatically.",
type=str,
required=False,
)
add_verbose(search_parser)
search_parser.set_defaults(func=cli_search)
2 changes: 2 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store", "**tests**"]

suppress_warnings = ["autosectionlabel.usage-cli"]


# -- Options for HTML output -------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
sphinx==7.2.6
sphinxcontrib-autoprogram==0.1.8
sphinxcontrib-autoprogram==0.1.9
sphinx-rtd-theme==1.3.0
sphinx-prompt==1.8.0
sphinx-tabs==3.4.1
Expand Down
33 changes: 33 additions & 0 deletions docs/usage-cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,36 @@ Below we show an example of deleting a saved query by ID from the CLI.
.. prompt:: bash

censys asm delete-saved-query-by-id --query-id 'Some ID'

``execute-saved-query-by-name``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

See CLI command :ref:`asm execute-saved-query-by-name<cli:censys asm execute-saved-query-by-name>` for detail documentation of parameters.

Below we show an example of executing a saved query by name from the CLI.

.. prompt:: bash

censys asm execute-saved-query-by-name --query-name 'Some query name'

``execute-saved-query-by-id``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

See CLI command :ref:`asm execute-saved-query-by-id<cli:censys asm execute-saved-query-by-id>` for detail documentation of parameters.

Below we show an example of executing a saved query by ID from the CLI.

.. prompt:: bash

censys asm execute-saved-query-by-id --query-id 'Some query ID'

``search``
^^^^^^^^^^

See CLI command :ref:`asm search<cli:censys asm search>` for detail documentation of parameters.

Below we show an example of executing an inventory search query from the CLI.

.. prompt:: bash

censys asm search --query 'Some query'
11 changes: 11 additions & 0 deletions examples/asm/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""Execute a saved query by ID."""
from censys.asm import InventorySearch

# Create an instance of the SavedQueries class
inventory_search = InventorySearch()

# Delete the saved query by ID
query = "host.services.service_name: HTTP"
res = inventory_search.search(query=query)

print(res)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "censys"
version = "2.2.12"
version = "2.2.13"
description = "An easy-to-use and lightweight API wrapper for Censys APIs (censys.io)."
authors = ["Censys, Inc. <support@censys.io>"]
license = "Apache-2.0"
Expand Down
14 changes: 14 additions & 0 deletions tests/asm/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,17 @@ def keyword_callback(request):
res = list(self.api._get_page(f"/{keyword}"))
# Assertions
assert res == page_json[keyword] + second_page

def test_get_workspace_id(self):
self.responses.add(
responses.GET,
f"{self.base_url}/integrations/v1/account",
status=200,
json={"workspaceId": "test-workspace-id"},
)

# Actual call
res = self.api.get_workspace_id()

# Assertions
assert res == "test-workspace-id"
Loading