Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend-python): add basic telestion library for python #423

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend-python/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.venv/
21 changes: 21 additions & 0 deletions backend-python/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2024 WüSpace e. V.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
8 changes: 8 additions & 0 deletions backend-python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Telestion Python Backend

This repository contains everything to build a working Telestion backend in Python.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This repository contains everything to build a working Telestion backend in Python.
This repository contains everything to build Telestion backend services in Python.

Additional examples help in setting everything up.

The Python backend is mainly added to allow for easier interfacing with many scientific libraries, such as numpy,
scipy, tensorflow or pytorch.
Creating static graphs with Matplotlib that can be rendered in the frontend could also be created.
Empty file.
2 changes: 2 additions & 0 deletions backend-python/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
nats-py~=2.7.2 # TODO: decide if we want to specify versions
pydantic~=2.6.4
Empty file.
Empty file.
Empty file.
110 changes: 110 additions & 0 deletions backend-python/src/telestion/backend/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import argparse
import json
import os
from pathlib import Path
from typing import Any, TypeVar

from pydantic import BaseModel, Field, ConfigDict


class TelestionConfig(BaseModel):
dev: bool = False
nats_url: str = Field(alias="NATS_URL")
nats_user: str | None = Field(alias="NATS_USER", default=None)
nats_password: str | None = Field(alias="NATS_PASSWORD", default=None)
config_file: Path | None = Field(alias="CONFIG_FILE", default=None)
config_key: str | None = Field(alias="CONFIG_KEY", default=None)
service_name: str = Field(alias="SERVICE_NAME")
data_dir: Path = Field(alias="DATA_DIR")

unparsed_cli: list[str] = Field(alias="_telestion_validator_unparsed_cli")

# To include all envs and config parts -> it is recommended to add a custom subtype
model_config = ConfigDict(
extra='allow'
)


# With this we allow users to extend TelestionConfig for finer control over custom config fields
_TelestionConfigT = TypeVar("_TelestionConfigT", bound=TelestionConfig)


def build_config(clazz: type[_TelestionConfigT] = None) -> _TelestionConfigT:
if clazz is None:
clazz = TelestionConfig
Comment on lines +32 to +34
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why default to None instead of TelestionConfig if you re-set it immediately afterwards?


cli_args, additional_args = _parse_cli()

def _from_env_or_cli(key: str):
return cli_args.get(key, os.environ.get(key, None))

config_p = _from_env_or_cli('CONFIG_FILE')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why call it config_p instead of config_file (or, if you must, config_path)?

config_key = _from_env_or_cli('CONFIG_KEY')

config_assembly: dict[str, Any] = dict()
if 'dev' in cli_args and cli_args['dev']:
# 1. Add default config
config_assembly.update(defaults())

if config_p is not None:
config_p = Path(config_p)
# 2. Insert config file
config_assembly.update(_parse_config_file(config_p, config_key))

# 3. Add Environment Variables
config_assembly.update(os.environ)

# 4. Add CLI args
config_assembly.update(cli_args)

# 5. Add args that cannot be parsed by the pipeline, i.e. service specific config
config_assembly['_telestion_validator_unparsed_cli'] = additional_args

return clazz(**config_assembly)


def defaults() -> dict[str, Any]:
return {
'NATS_URL': "nats://localhost:4222",
'SERVICE_NAME': f"dev-{os.getpid()}",
'DATA_DIR': Path("./data")
}


def _parse_cli() -> tuple[dict[str, Any], list[str]]:
description = "CLI Interface for the Telestion Services. This is one way to setup your Telestion application."
epilog = "For more information please visit https://github.com/wuespace/telestion or \
https://telestion.wuespace.de/"
parser = argparse.ArgumentParser(
description=description,
epilog=epilog,
prog="Telestion-CLI (Python)",
argument_default=argparse.SUPPRESS
)

parser.add_argument("--dev", action='store_true', help="If set, program will start in development mode")
parser.add_argument("--version", action='version', version="%(prog)s v1.0-alpha")

parser.add_argument("--NATS_URL", help="NATS url of the server the service can connect to")
parser.add_argument("--NATS_USER", help="NATS user name for the authentication with the server")
parser.add_argument("--NATS_PASSWORD", help="NATS password for the authentication with the server \
(Note: It is recommended to set this via the environment variables or the config!)")

parser.add_argument("--CONFIG_FILE", help="file path to the config of the service", type=Path)
parser.add_argument("--CONFIG_KEY", help="object key of a config file")

parser.add_argument("--SERVICE_NAME", help="name of the service also used in the nats service registration")
parser.add_argument("--DATA_DIR", help="path where the service can store persistent data", type=Path)

namespace, args = parser.parse_known_args()
return vars(namespace), args


def _parse_config_file(config_p: Path, key: str = None) -> dict[str, Any]:
with open(config_p, 'r') as config_f:
content = json.load(config_f)

if key is None:
return content

return content[key]
162 changes: 162 additions & 0 deletions backend-python/src/telestion/backend/lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import json
from dataclasses import dataclass, replace
from pathlib import Path
from typing import Any

import nats
from nats.aio.client import Client as NatsClient, Msg as NatsMsg, DEFAULT_FLUSH_TIMEOUT # mostly for type hinting
from nats.aio.subscription import Subscription

from telestion.backend.config import TelestionConfig, build_config


@dataclass
class Service:
"""Helper Class for starting NATS clients also exposing the parsed config."""
nc: NatsClient | None # None if Options.nats = False
"""Configured and started NATS client for this service."""
data_dir: Path
"""Directory where all data (temporary and persistent) should be stored."""
service_name: str
"""Name of this service. Note that it is not necessarily unique!"""
config: TelestionConfig
"""TelestionConfig instance for this service """

# wrapper methods for NatsClient instance for convenience
async def publish(self, **kwargs) -> None:
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.publish"""
await self.nc.publish(**kwargs)

async def subscribe(self, **kwargs) -> Subscription:
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.subscribe"""
return await self.nc.subscribe(**kwargs)

async def request(self, **kwargs) -> NatsMsg:
"""Wrapper for Client.request(subject, payload, timeout, old_style, headers)"""
return await self.nc.request(**kwargs)

async def flush(self, timeout: int = DEFAULT_FLUSH_TIMEOUT) -> None:
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.flush"""
await self.nc.flush(timeout)

async def drain(self) -> None:
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.drain"""
await self.nc.drain()

async def close(self) -> None:
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.close"""
await self.nc.close()


@dataclass
class Options:
"""Storing a custom configuration which overwrites the parsed config during startup of the Telestion service."""
nats: bool = True
"""Whether a service should use nats. If set to False no nats client is set up during startup"""
# (officially) we don't support int keys, btw...
overwrite_args: dict[str, Any] | None = None
"""Arguments overwriting the parsed config of a service."""
custom_nc: NatsClient | None = None
"""Custom nats client. During startup no configuration takes place if present."""

def without_nats(self) -> 'Options':
"""Returns a copy of this Options instance with nats switched off."""
return replace(self, nats=False, custom_nc=None)

def with_overwrite_args(self, **kwargs) -> 'Options':
"""Returns a copy of this Options instance with different custom arguments."""
return replace(self, overwrite_args=kwargs)

def with_custom_nc(self, nats_client: NatsClient) -> 'Options':
"""Returns a copy of this Options instance with a custom client."""
return replace(self, custom_nc=nats_client)


async def setup_healthcheck(nc: NatsClient, service_name: str) -> NatsClient:
"""Sets up __telestion__.health for a NatsClient. Returns NatsClient for fluent API."""
async def _respond_hc(msg):
msg.respond(
json_encode({
"errors": 0,
"name": service_name
})
)

await nc.subscribe(
'__telestion__.health',
cb=_respond_hc
)

return nc


async def start_service(opts: Options = None) -> Service:
"""Creates a Service for the given Options and the parsed config and spins up a new NATS client if configured so."""
if opts is None:
opts = Options()

config = build_config()
if opts.overwrite_args is not None:
config = config.model_copy(update=opts.overwrite_args)

service = Service(opts.custom_nc, config.data_dir, config.service_name, config)

if not opts.nats or opts.custom_nc is not None:
return service
Comment on lines +104 to +105
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also custom NATS proxies / mocks can connect => this differentiation is unneeded. Quite the opposite: you still need the health check setup to enable testing of that feature.


nc = await nats.connect(servers=_prepare_nats_url(config))

return replace(service, nc=await setup_healthcheck(nc, config.service_name))


# Macros
def json_encode(msg: Any, encoding='utf-8', **dumps_kwargs) -> bytes:
"""
Helper function to encode messages to json.
This convenience macro helps to reduce encoding/decoding boilerplate.
For finer control implement this function by your own and customize to your needs.

:param msg: message to encode
:param encoding: encoding to use (default: utf-8)
:param dumps_kwargs: additional arguments to pass to json.dumps
:return: encoded json message as utf-8 bytes
"""
return json.dumps(msg, **dumps_kwargs).encode(encoding=encoding)


def json_decode(msg: str | bytes | bytearray, encoding='utf-8', **loads_kwargs) -> Any:
"""
Helper function to decode messages from json into an object.
This convenience macro helps to reduce encoding/decoding boilerplate.
For finer control implement this function by your own and customize to your needs.

:param msg: message to decode
:param encoding: encoding used to encode the bytes
:param loads_kwargs:
:return:
"""
if not isinstance(msg, str):
# ensure to support any encoding supported by python
msg = msg.decode(encoding=encoding)

return json.loads(msg, **loads_kwargs)


def _prepare_nats_url(config: TelestionConfig) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I think we should look at pre-existing URL handling APIs (may that be in the Python core or a library), due to the numerous potential attack vectors currently present in the code.

"""
Helper function that creates the valid url for the NATS client.
Because the Python interface does not support user authentication out of the box with a separate design this is done
via the connecting url.

:param config: parsed config from all sources
:return: created url from parsed config url, user and password
"""
url = config.nats_url

if config.nats_user is None or config.nats_password is None:
return url

if '://' in url:
_, url = url.split('://', 1)

return f"nats://{config.username}:{config.password}@{url}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaks if the username contains a colon (e.g., database:backup)

19 changes: 19 additions & 0 deletions backend-python/src/telestion/backend/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import asyncio

from telestion.backend.config import build_config, TelestionConfig
from telestion.backend.lib import start_service, Options


async def main():
import sys
# Put dev server address here for testing
sys.argv.extend(['--dev', '--NATS_URL', 'nats://172.21.73.221:4222'])
_config = build_config(TelestionConfig)
print(_config)
service = await start_service(Options().with_overwrite_args(test="foo"))
await service.nc.close()
print(service.config)


if __name__ == '__main__':
asyncio.run(main())
Loading