-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(backend-python): add basic telestion library for python #423
base: main
Are you sure you want to change the base?
Changes from all commits
fa72884
944d2d0
6ec272b
922acab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
.venv/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
MIT License | ||
|
||
Copyright (c) 2024 WüSpace e. V. | ||
|
||
Permission is hereby granted, free of charge, to any person obtaining a copy | ||
of this software and associated documentation files (the "Software"), to deal | ||
in the Software without restriction, including without limitation the rights | ||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
copies of the Software, and to permit persons to whom the Software is | ||
furnished to do so, subject to the following conditions: | ||
|
||
The above copyright notice and this permission notice shall be included in all | ||
copies or substantial portions of the Software. | ||
|
||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
SOFTWARE. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# Telestion Python Backend | ||
|
||
This repository contains everything to build a working Telestion backend in Python. | ||
Additional examples help in setting everything up. | ||
|
||
The Python backend is mainly added to allow for easier interfacing with many scientific libraries, such as numpy, | ||
scipy, tensorflow or pytorch. | ||
Creating static graphs with Matplotlib that can be rendered in the frontend could also be created. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
nats-py~=2.7.2 # TODO: decide if we want to specify versions | ||
pydantic~=2.6.4 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
import argparse | ||
import json | ||
import os | ||
from pathlib import Path | ||
from typing import Any, TypeVar | ||
|
||
from pydantic import BaseModel, Field, ConfigDict | ||
|
||
|
||
class TelestionConfig(BaseModel): | ||
dev: bool = False | ||
nats_url: str = Field(alias="NATS_URL") | ||
nats_user: str | None = Field(alias="NATS_USER", default=None) | ||
nats_password: str | None = Field(alias="NATS_PASSWORD", default=None) | ||
config_file: Path | None = Field(alias="CONFIG_FILE", default=None) | ||
config_key: str | None = Field(alias="CONFIG_KEY", default=None) | ||
service_name: str = Field(alias="SERVICE_NAME") | ||
data_dir: Path = Field(alias="DATA_DIR") | ||
|
||
unparsed_cli: list[str] = Field(alias="_telestion_validator_unparsed_cli") | ||
|
||
# To include all envs and config parts -> it is recommended to add a custom subtype | ||
model_config = ConfigDict( | ||
extra='allow' | ||
) | ||
|
||
|
||
# With this we allow users to extend TelestionConfig for finer control over custom config fields | ||
_TelestionConfigT = TypeVar("_TelestionConfigT", bound=TelestionConfig) | ||
|
||
|
||
def build_config(clazz: type[_TelestionConfigT] = None) -> _TelestionConfigT: | ||
if clazz is None: | ||
clazz = TelestionConfig | ||
Comment on lines
+32
to
+34
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why default to |
||
|
||
cli_args, additional_args = _parse_cli() | ||
|
||
def _from_env_or_cli(key: str): | ||
return cli_args.get(key, os.environ.get(key, None)) | ||
|
||
config_p = _from_env_or_cli('CONFIG_FILE') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why call it |
||
config_key = _from_env_or_cli('CONFIG_KEY') | ||
|
||
config_assembly: dict[str, Any] = dict() | ||
if 'dev' in cli_args and cli_args['dev']: | ||
# 1. Add default config | ||
config_assembly.update(defaults()) | ||
|
||
if config_p is not None: | ||
config_p = Path(config_p) | ||
# 2. Insert config file | ||
config_assembly.update(_parse_config_file(config_p, config_key)) | ||
|
||
# 3. Add Environment Variables | ||
config_assembly.update(os.environ) | ||
|
||
# 4. Add CLI args | ||
config_assembly.update(cli_args) | ||
|
||
# 5. Add args that cannot be parsed by the pipeline, i.e. service specific config | ||
config_assembly['_telestion_validator_unparsed_cli'] = additional_args | ||
|
||
return clazz(**config_assembly) | ||
|
||
|
||
def defaults() -> dict[str, Any]: | ||
return { | ||
'NATS_URL': "nats://localhost:4222", | ||
'SERVICE_NAME': f"dev-{os.getpid()}", | ||
'DATA_DIR': Path("./data") | ||
} | ||
|
||
|
||
def _parse_cli() -> tuple[dict[str, Any], list[str]]: | ||
description = "CLI Interface for the Telestion Services. This is one way to setup your Telestion application." | ||
epilog = "For more information please visit https://github.com/wuespace/telestion or \ | ||
https://telestion.wuespace.de/" | ||
parser = argparse.ArgumentParser( | ||
description=description, | ||
epilog=epilog, | ||
prog="Telestion-CLI (Python)", | ||
argument_default=argparse.SUPPRESS | ||
) | ||
|
||
parser.add_argument("--dev", action='store_true', help="If set, program will start in development mode") | ||
parser.add_argument("--version", action='version', version="%(prog)s v1.0-alpha") | ||
|
||
parser.add_argument("--NATS_URL", help="NATS url of the server the service can connect to") | ||
parser.add_argument("--NATS_USER", help="NATS user name for the authentication with the server") | ||
parser.add_argument("--NATS_PASSWORD", help="NATS password for the authentication with the server \ | ||
(Note: It is recommended to set this via the environment variables or the config!)") | ||
|
||
parser.add_argument("--CONFIG_FILE", help="file path to the config of the service", type=Path) | ||
parser.add_argument("--CONFIG_KEY", help="object key of a config file") | ||
|
||
parser.add_argument("--SERVICE_NAME", help="name of the service also used in the nats service registration") | ||
parser.add_argument("--DATA_DIR", help="path where the service can store persistent data", type=Path) | ||
|
||
namespace, args = parser.parse_known_args() | ||
return vars(namespace), args | ||
|
||
|
||
def _parse_config_file(config_p: Path, key: str = None) -> dict[str, Any]: | ||
with open(config_p, 'r') as config_f: | ||
content = json.load(config_f) | ||
|
||
if key is None: | ||
return content | ||
|
||
return content[key] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
import json | ||
from dataclasses import dataclass, replace | ||
from pathlib import Path | ||
from typing import Any | ||
|
||
import nats | ||
from nats.aio.client import Client as NatsClient, Msg as NatsMsg, DEFAULT_FLUSH_TIMEOUT # mostly for type hinting | ||
from nats.aio.subscription import Subscription | ||
|
||
from telestion.backend.config import TelestionConfig, build_config | ||
|
||
|
||
@dataclass | ||
class Service: | ||
"""Helper Class for starting NATS clients also exposing the parsed config.""" | ||
nc: NatsClient | None # None if Options.nats = False | ||
"""Configured and started NATS client for this service.""" | ||
data_dir: Path | ||
"""Directory where all data (temporary and persistent) should be stored.""" | ||
service_name: str | ||
"""Name of this service. Note that it is not necessarily unique!""" | ||
config: TelestionConfig | ||
"""TelestionConfig instance for this service """ | ||
|
||
# wrapper methods for NatsClient instance for convenience | ||
async def publish(self, **kwargs) -> None: | ||
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.publish""" | ||
await self.nc.publish(**kwargs) | ||
|
||
async def subscribe(self, **kwargs) -> Subscription: | ||
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.subscribe""" | ||
return await self.nc.subscribe(**kwargs) | ||
|
||
async def request(self, **kwargs) -> NatsMsg: | ||
"""Wrapper for Client.request(subject, payload, timeout, old_style, headers)""" | ||
return await self.nc.request(**kwargs) | ||
|
||
async def flush(self, timeout: int = DEFAULT_FLUSH_TIMEOUT) -> None: | ||
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.flush""" | ||
await self.nc.flush(timeout) | ||
|
||
async def drain(self) -> None: | ||
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.drain""" | ||
await self.nc.drain() | ||
|
||
async def close(self) -> None: | ||
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.close""" | ||
await self.nc.close() | ||
|
||
|
||
@dataclass | ||
class Options: | ||
"""Storing a custom configuration which overwrites the parsed config during startup of the Telestion service.""" | ||
nats: bool = True | ||
"""Whether a service should use nats. If set to False no nats client is set up during startup""" | ||
# (officially) we don't support int keys, btw... | ||
overwrite_args: dict[str, Any] | None = None | ||
"""Arguments overwriting the parsed config of a service.""" | ||
custom_nc: NatsClient | None = None | ||
"""Custom nats client. During startup no configuration takes place if present.""" | ||
|
||
def without_nats(self) -> 'Options': | ||
"""Returns a copy of this Options instance with nats switched off.""" | ||
return replace(self, nats=False, custom_nc=None) | ||
|
||
def with_overwrite_args(self, **kwargs) -> 'Options': | ||
"""Returns a copy of this Options instance with different custom arguments.""" | ||
return replace(self, overwrite_args=kwargs) | ||
|
||
def with_custom_nc(self, nats_client: NatsClient) -> 'Options': | ||
"""Returns a copy of this Options instance with a custom client.""" | ||
return replace(self, custom_nc=nats_client) | ||
|
||
|
||
async def setup_healthcheck(nc: NatsClient, service_name: str) -> NatsClient: | ||
"""Sets up __telestion__.health for a NatsClient. Returns NatsClient for fluent API.""" | ||
async def _respond_hc(msg): | ||
msg.respond( | ||
json_encode({ | ||
"errors": 0, | ||
"name": service_name | ||
}) | ||
) | ||
|
||
await nc.subscribe( | ||
'__telestion__.health', | ||
cb=_respond_hc | ||
) | ||
|
||
return nc | ||
|
||
|
||
async def start_service(opts: Options = None) -> Service: | ||
"""Creates a Service for the given Options and the parsed config and spins up a new NATS client if configured so.""" | ||
if opts is None: | ||
opts = Options() | ||
|
||
config = build_config() | ||
if opts.overwrite_args is not None: | ||
config = config.model_copy(update=opts.overwrite_args) | ||
|
||
service = Service(opts.custom_nc, config.data_dir, config.service_name, config) | ||
|
||
if not opts.nats or opts.custom_nc is not None: | ||
return service | ||
Comment on lines
+104
to
+105
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also custom NATS proxies / mocks can connect => this differentiation is unneeded. Quite the opposite: you still need the health check setup to enable testing of that feature. |
||
|
||
nc = await nats.connect(servers=_prepare_nats_url(config)) | ||
|
||
return replace(service, nc=await setup_healthcheck(nc, config.service_name)) | ||
|
||
|
||
# Macros | ||
def json_encode(msg: Any, encoding='utf-8', **dumps_kwargs) -> bytes: | ||
""" | ||
Helper function to encode messages to json. | ||
This convenience macro helps to reduce encoding/decoding boilerplate. | ||
For finer control implement this function by your own and customize to your needs. | ||
|
||
:param msg: message to encode | ||
:param encoding: encoding to use (default: utf-8) | ||
:param dumps_kwargs: additional arguments to pass to json.dumps | ||
:return: encoded json message as utf-8 bytes | ||
""" | ||
return json.dumps(msg, **dumps_kwargs).encode(encoding=encoding) | ||
|
||
|
||
def json_decode(msg: str | bytes | bytearray, encoding='utf-8', **loads_kwargs) -> Any: | ||
""" | ||
Helper function to decode messages from json into an object. | ||
This convenience macro helps to reduce encoding/decoding boilerplate. | ||
For finer control implement this function by your own and customize to your needs. | ||
|
||
:param msg: message to decode | ||
:param encoding: encoding used to encode the bytes | ||
:param loads_kwargs: | ||
:return: | ||
""" | ||
if not isinstance(msg, str): | ||
# ensure to support any encoding supported by python | ||
msg = msg.decode(encoding=encoding) | ||
|
||
return json.loads(msg, **loads_kwargs) | ||
|
||
|
||
def _prepare_nats_url(config: TelestionConfig) -> str: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Overall, I think we should look at pre-existing URL handling APIs (may that be in the Python core or a library), due to the numerous potential attack vectors currently present in the code. |
||
""" | ||
Helper function that creates the valid url for the NATS client. | ||
Because the Python interface does not support user authentication out of the box with a separate design this is done | ||
via the connecting url. | ||
|
||
:param config: parsed config from all sources | ||
:return: created url from parsed config url, user and password | ||
""" | ||
url = config.nats_url | ||
|
||
if config.nats_user is None or config.nats_password is None: | ||
return url | ||
|
||
if '://' in url: | ||
_, url = url.split('://', 1) | ||
|
||
return f"nats://{config.username}:{config.password}@{url}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Breaks if the username contains a colon (e.g., |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
import asyncio | ||
|
||
from telestion.backend.config import build_config, TelestionConfig | ||
from telestion.backend.lib import start_service, Options | ||
|
||
|
||
async def main(): | ||
import sys | ||
# Put dev server address here for testing | ||
sys.argv.extend(['--dev', '--NATS_URL', 'nats://172.21.73.221:4222']) | ||
_config = build_config(TelestionConfig) | ||
print(_config) | ||
service = await start_service(Options().with_overwrite_args(test="foo")) | ||
await service.nc.close() | ||
print(service.config) | ||
|
||
|
||
if __name__ == '__main__': | ||
asyncio.run(main()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.