+import os
+
+import requests
+
+from cirun.utils import _print_error, _print_error_data
+
+API_ENDPOINT = "https://api.cirun.io/api/v1"
+GITHUB_API = "https://api.github.com"
+GH_TOKEN_ENV_VAR = "GITHUB_TOKEN"
+
+
+class CirunAPIException(Exception):
+ pass
+
+
+
+
[docs]
+
class Cirun:
+
"""Cirun Client to interact to cirun's API"""
+
def __init__(self, token=None):
+
"""
+
:param token: cirun's API client token
+
"""
+
self.token = token
+
self._get_credentials()
+
self.api_endpoint = os.environ.get('CIRUN_API_ENDPOINT', API_ENDPOINT)
+
+
def _get_credentials(self):
+
if not self.token:
+
try:
+
token = os.environ['CIRUN_API_KEY']
+
self.token = token
+
except KeyError:
+
msg = "Could not find CIRUN_API_KEY in environment variables"
+
_print_error_data(msg)
+
raise KeyError(msg)
+
+
def _headers(self):
+
return {
+
"Content-Type": "application/json",
+
"Authorization": f"Bearer {self.token}"
+
}
+
+
def _get(self, path, *args, **kwargs):
+
return requests.get(f"{self.api_endpoint}/{path}", headers=self._headers(), *args, **kwargs)
+
+
def _post(self, path, *args, **kwargs):
+
return requests.post(f"{self.api_endpoint}/{path}", headers=self._headers(), *args, **kwargs)
+
+
def _put(self, path, *args, **kwargs):
+
return requests.put(f"{self.api_endpoint}/{path}", headers=self._headers(), *args, **kwargs)
+
+
+
[docs]
+
def get_repos(self, print_error=False):
+
"""Get all the repositories connected to cirun."""
+
response = self._get("repo")
+
if response.status_code not in [200, 201]:
+
if print_error:
+
return _print_error(response)
+
return response.json()
+
+
+
+
[docs]
+
def set_repo(
+
self,
+
name,
+
active=True,
+
print_error=False,
+
installation_id=None
+
):
+
"""
+
Activate repository for Cirun
+
+
Parameters
+
----------
+
name: str
+
Repository name
+
active: bool
+
``True`` to activate, ``False`` otherwise. Default is ``True``
+
installation_id: int
+
Cirun App's Installation ID for the Organization
+
+
Returns
+
-------
+
dict
+
"""
+
data = {
+
"repository": name,
+
"active": active
+
}
+
gh_response_json = {}
+
if installation_id:
+
gh_response_json = self.install_github_app(name, installation_id)
+
response = self._post("repo", json=data)
+
if response.status_code not in [200, 201]:
+
if print_error:
+
_print_error(response)
+
response.raise_for_status()
+
response = response.json()
+
if gh_response_json:
+
response = {
+
**response,
+
"github_installation": gh_response_json
+
}
+
return response
+
+
+
def _get_github_repo_id(self, owner, repo):
+
url = f"{GITHUB_API}/repos/{owner}/{repo}"
+
response = requests.get(url)
+
response.raise_for_status()
+
response_json = response.json()
+
return response_json["id"]
+
+
def install_github_app(self, name, installation_id):
+
owner, repo = name.split("/")
+
repository_id = self._get_github_repo_id(owner=owner, repo=repo)
+
url = f"{GITHUB_API}/user/installations/{installation_id}/repositories/{repository_id}"
+
if not os.environ.get(GH_TOKEN_ENV_VAR):
+
_print_error_data(f"ERROR: Environment variable: '{GH_TOKEN_ENV_VAR}'"
+
f" not found. Unable to install Cirun GitHub App on {name}")
+
return
+
gh_token = os.environ[GH_TOKEN_ENV_VAR]
+
headers = {
+
"Authorization": f"Bearer {gh_token}",
+
"Accept": "application/vnd.github+json",
+
}
+
response = requests.put(url, headers=headers)
+
if response.status_code not in [204, 304]:
+
_print_error(response)
+
response.raise_for_status()
+
response = {
+
"message": f"GitHub Installation done",
+
"status_code": response.status_code
+
}
+
return response
+
+
def update_access_control(self, org, repository_resource_access):
+
json = {
+
"org": org,
+
"repository_resource_access": repository_resource_access
+
}
+
response = self._put("access-control", json=json)
+
if response.status_code not in [200, 201]:
+
_print_error(response)
+
response.raise_for_status()
+
return response
+
+
def get_access_control(self, org):
+
response = self._get("access-control", json={"org": org})
+
if response.status_code != 200:
+
return
+
return response.json()
+
+
def _create_access_control_repo_resource_data(
+
self, repo,
+
resources,
+
action="add",
+
teams=None,
+
roles=None,
+
users=None,
+
users_from_json=None,
+
policy_args=None,
+
):
+
repository_resource_access = {
+
"repository": repo,
+
"resources": resources,
+
"action": action,
+
"policy_args": policy_args
+
}
+
repository_resource_access = {
+
**repository_resource_access,
+
"teams": teams,
+
"users": users,
+
"roles": roles,
+
"users_from_json": users_from_json,
+
}
+
return repository_resource_access
+
+
+
[docs]
+
def remove_repo_from_resources(self, org, repo, resources):
+
"""
+
Removes the access to the resource for the repository.
+
+
Parameters
+
----------
+
org: str
+
GitHub Organization
+
repo: str
+
GitHub Repository
+
resources: List[str]
+
List of resources
+
+
Returns
+
-------
+
requests.Response
+
"""
+
repository_resource_access = self._create_access_control_repo_resource_data(
+
repo, resources, action="remove",
+
)
+
return self.update_access_control(org, [repository_resource_access])
+
+
+
+
[docs]
+
def add_repo_to_resources(
+
self,
+
org,
+
repo,
+
resources,
+
teams=None,
+
roles=None,
+
users=None,
+
users_from_json=None,
+
policy_args=None,
+
):
+
"""
+
Grants access to the resource for the repository
+
+
Parameters
+
----------
+
org: str
+
GitHub Organization
+
repo: str
+
GitHub Repository
+
resources: List[str]
+
List of resources
+
teams: List[str]
+
List of teams
+
roles: List[str]
+
List of roles
+
users: List[str]
+
List of users
+
users_from_json: List[str]
+
List of users from a json url
+
policy_args: Optional[Dict[str, Any]]
+
Policy arguments, this is a dictionary of key values, currently the only
+
supported argument is ``{"pull_request": True}`` or ``{"pull_request": False}``
+
+
Returns
+
-------
+
requests.Response
+
"""
+
repository_resource_access = self._create_access_control_repo_resource_data(
+
repo, resources, action="add", teams=teams, roles=roles,
+
users=users, users_from_json=users_from_json, policy_args=policy_args
+
)
+
return self.update_access_control(org, [repository_resource_access])
+
+
+
def _get_repo_policy(self, access_yml, repo):
+
for policy in access_yml["policies"]:
+
if policy['repo'] == repo:
+
return policy['id']
+
+
def get_repo_resources(self, org, repo):
+
access_control = self.get_access_control(org)
+
if not access_control:
+
return
+
access_yml = access_control["access_yml"]
+
policy_id = self._get_repo_policy(access_yml, repo)
+
repo_resources = []
+
for access_item in access_yml["access_control"]:
+
if policy_id in access_item["policies"]:
+
repo_resources.append(access_item["resource"])
+
return repo_resources
+
+
+
[docs]
+
def clouds(self, print_error=False):
+
"""Returns all the connected cloud"""
+
response = self._get("cloud-connect")
+
if response.status_code not in [200, 201]:
+
if print_error:
+
return _print_error(response)
+
return response.json()
+
+
+
+
[docs]
+
def cloud_connect(self, name, credentials, print_error=False):
+
"""
+
Connect a cloud provider to cirun.
+
+
Parameters
+
----------
+
name: str
+
Name of cloud provider
+
credentials: str
+
Cloud Credentials
+
+
Returns
+
-------
+
dict:
+
Response json
+
"""
+
+
data = {
+
"cloud": name,
+
"credentials": credentials
+
}
+
response = self._post("cloud-connect", json=data)
+
if response.status_code not in [200, 201]:
+
if print_error:
+
_print_error(response)
+
return response.json()
+
return response.json()
+
+
+
+