-
Notifications
You must be signed in to change notification settings - Fork 65
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,35 @@ | ||
import os | ||
import subprocess | ||
import time | ||
import click | ||
from pathlib import Path | ||
from urllib.parse import urlparse | ||
from slugify import slugify | ||
from ctfcli.utils.config import generate_session | ||
|
||
from ctfcli.utils.images import build_image, export_image, get_exposed_ports | ||
from ctfcli.utils.images import ( | ||
build_image, | ||
export_image, | ||
get_exposed_ports, | ||
push_image, | ||
login_registry, | ||
) | ||
|
||
|
||
def ssh(challenge, host): | ||
def format_connection_info(protocol, hostname, tcp_hostname, tcp_port): | ||
if protocol is None: | ||
connection_info = hostname | ||
elif protocol.startswith("http"): | ||
connection_info = f"{protocol}://{hostname}" | ||
elif protocol == "tcp": | ||
connection_info = f"nc {tcp_hostname} {tcp_port}" | ||
else: | ||
connection_info = hostname | ||
|
||
return connection_info | ||
|
||
|
||
def ssh(challenge, host, protocol): | ||
# Build image | ||
image_name = build_image(challenge=challenge) | ||
print(f"Built {image_name}") | ||
|
@@ -39,17 +62,111 @@ def ssh(challenge, host): | |
os.remove(image_path) | ||
print(f"Cleaned up {image_path}") | ||
|
||
return True, domain, exposed_port | ||
status = True | ||
domain = domain | ||
port = exposed_port | ||
connect_info = format_connection_info( | ||
protocol=protocol, hostname=domain, tcp_hostname=domain, tcp_port=port, | ||
) | ||
return status, domain, port, connect_info | ||
|
||
|
||
def registry(challenge, host): | ||
def registry(challenge, host, protocol): | ||
# Build image | ||
image_name = build_image(challenge=challenge) | ||
print(f"Built {image_name}") | ||
url = urlparse(host) | ||
tag = f"{url.netloc}{url.path}" | ||
subprocess.call(["docker", "tag", image_name, tag]) | ||
subprocess.call(["docker", "push", tag]) | ||
push_image(local_tag=image_name, location=tag) | ||
status = True | ||
domain = "" | ||
port = "" | ||
connect_info = format_connection_info( | ||
protocol=protocol, hostname=domain, tcp_hostname=domain, tcp_port=port, | ||
) | ||
return status, domain, port, connect_info | ||
|
||
|
||
def cloud(challenge, host, protocol): | ||
name = challenge["name"] | ||
slug = slugify(name) | ||
This comment has been minimized.
Sorry, something went wrong.
pl4nty
Contributor
|
||
|
||
s = generate_session() | ||
# Detect whether we have the appropriate endpoints | ||
check = s.get(f"/api/v1/images", json=True) | ||
if check.ok is False: | ||
click.secho( | ||
f"Target instance does not have deployment endpoints", fg="red", | ||
) | ||
return False, domain, port, connect_info | ||
This comment has been minimized.
Sorry, something went wrong.
pl4nty
Contributor
|
||
|
||
# Try to find an appropriate image. | ||
images = s.get(f"/api/v1/images", json=True).json()["data"] | ||
image = None | ||
for i in images: | ||
if i["location"].endswith(f"/{slug}"): | ||
image = i | ||
break | ||
else: | ||
# Create the image if we did not find it. | ||
image = s.post(f"/api/v1/images", json={"name": slug}).json()["data"] | ||
|
||
# Build image | ||
image_name = build_image(challenge=challenge) | ||
location = image["location"] | ||
|
||
# TODO: Authenticate to Registry | ||
|
||
# Push image | ||
push_image(image_name, location) | ||
|
||
# Look for existing service | ||
services = s.get(f"/api/v1/services", json=True).json()["data"] | ||
service = None | ||
for srv in services: | ||
if srv["name"] == slug: | ||
service = srv | ||
# Update the service | ||
s.patch( | ||
f"/api/v1/services/{service['id']}", json={"image": location} | ||
).raise_for_status() | ||
service = s.get(f"/api/v1/services/{service['id']}", json=True).json()[ | ||
"data" | ||
] | ||
break | ||
else: | ||
# Could not find the service. Create it using our pushed image. | ||
# Deploy the image by creating service | ||
service = s.post( | ||
f"/api/v1/services", json={"name": slug, "image": location,} | ||
).json()["data"] | ||
|
||
# Get connection details | ||
service_id = service["id"] | ||
service = s.get(f"/api/v1/services/{service_id}", json=True).json()["data"] | ||
|
||
while service["hostname"] is None: | ||
click.secho( | ||
f"Waiting for challenge hostname", fg="yellow", | ||
) | ||
service = s.get(f"/api/v1/services/{service_id}", json=True).json()["data"] | ||
time.sleep(10) | ||
|
||
# Expose port if we are using tcp | ||
if protocol == "tcp": | ||
service = s.patch(f"/api/v1/services/{service['id']}", json={"expose": True}) | ||
service.raise_for_status() | ||
service = s.get(f"/api/v1/services/{service_id}", json=True).json()["data"] | ||
|
||
status = True | ||
domain = "" | ||
port = "" | ||
connect_info = format_connection_info( | ||
protocol=protocol, | ||
hostname=service["hostname"], | ||
tcp_hostname=service["tcp_hostname"], | ||
tcp_port=service["tcp_port"], | ||
) | ||
return status, domain, port, connect_info | ||
|
||
|
||
DEPLOY_HANDLERS = {"ssh": ssh, "registry": registry} | ||
DEPLOY_HANDLERS = {"ssh": ssh, "registry": registry, "cloud": cloud} |
@ColdHeat I can't have
bool(target_host) is False
without triggeringinput("Target host URI: ")
, so I have to useecho '' | ctf challenge deploy chal
in my deploy pipeline