Skip to content

Commit

Permalink
Merge pull request #55 from SimHeb/main
Browse files Browse the repository at this point in the history
added folder lookup
  • Loading branch information
tylerezimmerman authored Jul 3, 2024
2 parents 51e4005 + 6cdbdd2 commit 4056e1c
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ export TSS_TENANT=mytenant
export TSS_SECRET_ID=42
export TSS_SECRET_PATH=\Test Secrets\SecretName
export TSS_FOLDER_ID=1
export TSS_FOLDER_PATH=\Test Secrets
```

The tests assume that the user associated with the specified `TSS_USERNAME` and `TSS_PASSWORD` can read the secret to be fetched, and that the Secret itself contains `username` and `password` fields.
Expand Down
1 change: 1 addition & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def env_vars():
"secret_id": os.getenv("TSS_SECRET_ID"),
"secret_path": os.getenv("TSS_SECRET_PATH"),
"folder_id": os.getenv("TSS_FOLDER_ID"),
"folder_path": os.getenv("TSS_FOLDER_PATH")
}


Expand Down
190 changes: 190 additions & 0 deletions delinea/secrets/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,57 @@ def __init__(self, **kwargs):
}


@dataclass
class ServerFolder:
# Based on https://gist.github.com/jaytaylor/3660565
@staticmethod
def snake_case(camel_cased):
"""Transform to snake case
Transforms the keys of the given map from camelCase to snake_case.
"""
return [
(
re.compile("([a-z0-9])([A-Z])")
.sub(r"\1_\2", re.compile(r"(.)([A-Z][a-z]+)").sub(r"\1_\2", k))
.lower(),
v,
)
for (k, v) in camel_cased.items()
]

@dataclass
class Field:
item_id: int
value: str
slug: str

def __init__(self, **kwargs):
# The REST API returns attributes with camelCase names which we
# replace with snake_case per Python conventions
for k, v in ServerSecret.snake_case(kwargs):
if k == "item_value":
k = "value"
setattr(self, k, v)

id: int
folder_name: str
folder_path: str
parent_folder_id: int
folder_type_id: int
secret_policy_id: int
inherit_secret_policy: bool
inherit_permissions: bool
child_folders: list
secret_templates: list

def __init__(self, **kwargs):
# The REST API returns attributes with camelCase names which we replace
# with snake_case per Python conventions
for k, v in self.snake_case(kwargs):
setattr(self, k, v)


class SecretServerError(Exception):
"""An Exception that includes a message and the server response"""

Expand Down Expand Up @@ -311,6 +362,36 @@ def get_secret_json(self, id, query_params=None):
timeout=60)
).text

def get_folder_json(self, id, query_params=None, get_all_children=True):
"""Gets a Folder from Secret Server
:param id: the id of the folder
:type id: int
:param query_params: query parameters to pass to the endpoint
:type query_params: dict
:return: a JSON formatted string representation of the folder
:rtype: ``str``
:raise: :class:`SecretServerAccessError` when the caller does not have
permission to access the folder
:raise: :class:`SecretServerError` when the REST API call fails for
any other reason
"""
endpoint_url = f"{self.api_url}/folders/{id}"

if get_all_children:
query_params["getAllChildren"] = "true"

if query_params is None:
return self.process(requests.get(endpoint_url, headers=self.headers())).text
else:
return self.process(
requests.get(
endpoint_url,
params=query_params,
headers=self.headers(),
)
).text

def get_secret(self, id, fetch_file_attachments=True, query_params=None):
"""Gets a secret
Expand Down Expand Up @@ -355,6 +436,34 @@ def get_secret(self, id, fetch_file_attachments=True, query_params=None):
)
return secret

def get_folder(self, id, query_params=None, get_all_children=False):
"""Gets a folder
:param id: the id of the folder
:type id: int
:param getAllChildren: Whether to retrieve all child folders of the requested folder
:type fetch_file_attachments: bool
:param query_params: query parameters to pass to the endpoint
:type query_params: dict
:return: a ``dict`` representation of the folder
:rtype: ``dict``
:raise: :class:`SecretServerAccessError` when the caller does not have
permission to access the folder
:raise: :class:`SecretServerError` when the REST API call fails for
any other reason
"""

response = self.get_folder_json(
id, query_params=query_params, get_all_children=get_all_children
)

try:
folder = json.loads(response)
except json.JSONDecodeError:
raise SecretServerError(response)

return folder

def get_secret_by_path(self, secret_path, fetch_file_attachments=True):
"""Gets a secret by path
Expand All @@ -376,6 +485,23 @@ def get_secret_by_path(self, secret_path, fetch_file_attachments=True):
query_params=params,
)

def get_folder_by_path(self, folder_path, get_all_children=True):
"""Gets a folder by path
:param folder_path: full path of the folder
:type folder_path: str
:return: a ``dict`` representation of the folder
:rtype: ``dict``
"""
path = "\\" + re.sub(r"[\\/]+", r"\\", folder_path).lstrip("\\").rstrip("\\")

params = {"folderPath": path}
return self.get_folder(
id=0,
get_all_children=get_all_children,
query_params=params,
)

def search_secrets(self, query_params=None):
"""Get Secrets from Secret Server
Expand All @@ -401,6 +527,31 @@ def search_secrets(self, query_params=None):
timeout=60)
).text

def lookup_folders(self, query_params=None):
"""Lookup Folders from Secret Server
:param query_params: query parameters to pass to the endpoint
:type query_params: dict
:return: a JSON formatted string representation of the folders, containing only id and name
:rtype: ``str``
:raise: :class:`SecretServerAccessError` when the caller does not have
permission to access the secret
:raise: :class:`SecretServerError` when the REST API call fails for
any other reason
"""
endpoint_url = f"{self.api_url}/folders/lookup"

if query_params is None:
return self.process(requests.get(endpoint_url, headers=self.headers())).text
else:
return self.process(
requests.get(
endpoint_url,
params=query_params,
headers=self.headers(),
)
).text

def get_secret_ids_by_folderid(self, folder_id):
"""Gets a list of secrets ids by folder_id
Expand Down Expand Up @@ -432,6 +583,45 @@ def get_secret_ids_by_folderid(self, folder_id):

return secret_ids

def get_child_folder_ids_by_folderid(self, folder_id):
"""Gets a list of child folder ids by folder_id
:param folder_id: the id of the folder
:type id: int
:return: a ``list`` of the child folder id's
:rtype: ``list``
:raise: :class:`SecretServerAccessError` when the caller does not have
permission to access the secret
:raise: :class:`SecretServerError` when the REST API call fails for
any other reason
"""

params = {
"filter.parentFolderId": folder_id,
"filter.limitToDirectDescendents": True,
}
params["take"] = 1
endpoint_url = f"{self.api_url}/folders/lookup"

params["take"] = self.process(
requests.get(endpoint_url, params=params, headers=self.headers())
).json()["total"]
# Handle result of zero child folders
if params["take"] != 0:
response = self.lookup_folders(query_params=params)

try:
response = json.loads(response)
except json.JSONDecodeError:
raise SecretServerError(response)

child_folder_ids = []
for childFolder in response["records"]:
child_folder_ids.append(childFolder["id"])

return child_folder_ids
else:
return []


class SecretServerV0(SecretServer):
"""A class that uses an *OAuth2 Bearer Token* to access the Secret Server
Expand Down
20 changes: 19 additions & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
SecretServerClientError,
SecretServerError,
ServerSecret,
ServerFolder,
)


Expand All @@ -31,7 +32,6 @@ def test_api_url(secret_server, env_vars):
== f"https://{env_vars['tenant']}.secretservercloud.com/api/v1"
)


def test_access_token_authorizer(env_vars, authorizer):
assert SecretServer(
f"https://{env_vars['tenant']}.secretservercloud.com/",
Expand All @@ -51,10 +51,28 @@ def test_server_secret_by_path(env_vars, secret_server):
).id == int(env_vars["secret_id"])


def test_server_folder_by_path(env_vars, secret_server):
assert ServerFolder(
**secret_server.get_folder_by_path(env_vars["folder_path"])
).id == int(env_vars["folder_id"])


def test_nonexistent_secret(secret_server):
with pytest.raises(SecretServerClientError):
secret_server.get_secret(1000)


def test_nonexistent_folder(secret_server):
with pytest.raises(SecretServerClientError):
secret_server.get_folder(1000)


def test_server_secret_ids_by_folderid(env_vars, secret_server):
assert type(secret_server.get_secret_ids_by_folderid(env_vars["folder_id"])) is list


def test_server_child_folder_ids_by_folderid(env_vars, secret_server):
assert (
type(secret_server.get_child_folder_ids_by_folderid(env_vars["folder_id"]))
is list
)

0 comments on commit 4056e1c

Please sign in to comment.