From e163107ff77300f75fead1c9ccc6072e3b28e2c7 Mon Sep 17 00:00:00 2001 From: Andrei Damian Date: Wed, 7 Feb 2024 19:47:36 +0000 Subject: [PATCH] added pattern --- TODO.md | 2 + demo-ai-app/README.md | 3 +- demo-ai-app/serving_pattern.py | 86 +++++++++++++++++++++++++++++ demo-basic-fastapi/Dockerfile | 2 +- demo-basic-fastapi/src/app_utils.py | 17 ++++++ demo-basic-fastapi/src/engine.py | 27 +++++---- demo-basic-fastapi/src/main.py | 15 +++-- 7 files changed, 132 insertions(+), 20 deletions(-) create mode 100644 TODO.md create mode 100644 demo-ai-app/serving_pattern.py diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..f74051a --- /dev/null +++ b/TODO.md @@ -0,0 +1,2 @@ + - [ ] finish PostgreSQL manifest + - [ ] add on-the-fly connection to sql \ No newline at end of file diff --git a/demo-ai-app/README.md b/demo-ai-app/README.md index c3a08cb..d0f4673 100644 --- a/demo-ai-app/README.md +++ b/demo-ai-app/README.md @@ -7,7 +7,8 @@ Goal: Setup: - - Python app (1 replica) deployment downloads models - uses Redis to announce update + - Python app (1 replica) deployment downloads model + - uses Redis to announce update - Python app (4 replicas) sfs - loads models from PV and serves using GPU - uses Redis to get models diff --git a/demo-ai-app/serving_pattern.py b/demo-ai-app/serving_pattern.py new file mode 100644 index 0000000..b98cc6e --- /dev/null +++ b/demo-ai-app/serving_pattern.py @@ -0,0 +1,86 @@ +from time import sleep +from fastapi import FastAPI, APIRouter, Request, File, UploadFile +from threading import Thread +from pydantic import BaseModel + + +class ModelServer: + def __init__(self, **kwargs): + self.__done = False + return + + def checker_loop(self): + while not self.__done: + # check for models + sleep(1) + return + + def start(self): + self.__done = False + thread = Thread(target=self.checker_loop, daemon=True) + thread.start() + self.__thread = thread + return + + def shutdown(self): + self.__done = True + return + + + def predict(self, data): + result = None + return result + +engine = ModelServer() +app = FastAPI() +router = APIRouter() + +@app.on_event("startup") +async def on_startup(): + engine.start() + return + +@app.on_event("shutdown") +async def on_shutdown(): + engine.shutdown() + return + +# json request +class Item(BaseModel): + field1: str + field2: str + field3: str + +@app.post("/predict2") +async def predict2(item: Item): + return {"message": f"Received JSON body: {item}"} + + +# image request +@app.post("/predict3") +async def predict3(image: UploadFile = File(...)): + contents = await image.read() + # Process the image data + return {"message": "Image received"} + + +# universal request +@router.post("/predict") +async def predict_universal(request: Request): + content_type = request.headers.get("content-type") + + if content_type == "application/json": + # Handle JSON request + json_body = await request.json() + return {"message": f"Received JSON body: {json_body}"} + elif content_type == "image/jpeg" or content_type == "image/png": + # Handle image upload + image = await request.body() + # Process the image data + return {"message": "Image received"} + else: + # Assume it's a plain text request + text_body = await request.body() + return {"message": f"Received plain text body: {text_body.decode()}"} + +app.include_router(router) \ No newline at end of file diff --git a/demo-basic-fastapi/Dockerfile b/demo-basic-fastapi/Dockerfile index 6f570f8..003056d 100644 --- a/demo-basic-fastapi/Dockerfile +++ b/demo-basic-fastapi/Dockerfile @@ -1,6 +1,6 @@ FROM python:3.9 WORKDIR /test_app -RUN pip install fastapi uvicorn redis numpy +RUN pip install fastapi uvicorn redis numpy pydantic COPY src/ . ENV PORT=5050 EXPOSE $PORT diff --git a/demo-basic-fastapi/src/app_utils.py b/demo-basic-fastapi/src/app_utils.py index 4a76932..0e078cd 100644 --- a/demo-basic-fastapi/src/app_utils.py +++ b/demo-basic-fastapi/src/app_utils.py @@ -1,7 +1,24 @@ import json import numpy as np +import pkg_resources + from datetime import datetime +def get_packages(monitored_packages=None): + packs = [x for x in pkg_resources.working_set] + maxlen = max([len(x.key) for x in packs]) + 1 + if isinstance(monitored_packages, list) and len(monitored_packages) > 0: + packs = [ + "{}{}".format(x.key + ' ' * (maxlen - len(x.key)), x.version) for x in packs + if x.key in monitored_packages + ] + else: + packs = [ + "{}{}".format(x.key + ' ' * (maxlen - len(x.key)), x.version) for x in packs + ] + packs = sorted(packs) + return packs + class NPJson(json.JSONEncoder): """ Used to help jsonify numpy arrays or lists that contain numpy data types. diff --git a/demo-basic-fastapi/src/engine.py b/demo-basic-fastapi/src/engine.py index cb03358..4b024f2 100644 --- a/demo-basic-fastapi/src/engine.py +++ b/demo-basic-fastapi/src/engine.py @@ -1,12 +1,13 @@ import os import redis -import json + + from uuid import uuid4 from datetime import datetime -from app_utils import safe_jsonify +from app_utils import safe_jsonify, get_packages -__VER__ = '0.3.2' +__VER__ = '0.3.3' class AppPaths: @@ -56,6 +57,8 @@ def __setup(self): self.__class__.__name__, __VER__, self.str_local_id, self.hostname )) + self.__packs = get_packages() + self.P("Packages:\n{}".format("\n".join(self.__packs))) dct_env = dict(os.environ) self.P("Environement:\n{}".format(safe_jsonify(dct_env, indent=2))) self.__maybe_setup_redis() @@ -94,8 +97,12 @@ def __maybe_setup_redis(self): return - def _pack_result(self, message): - return {"result": message} + def _pack_result(self, message, path=None, parameter=None): + return { + "result": message, + "path": path, + "parameter": parameter, + } def _inc_cluster_count(self): @@ -126,14 +133,14 @@ def process_data(self): return - def handle_request(self, path): + def handle_request(self, path, parameter=None): self.process_data() if path in self.__avail_paths: func_name = '_handle_' + self.__path_to_func[path] msg = getattr(self, func_name)(path=path) else: msg = self.__handle_generic(path) - result = self._pack_result(msg) + result = self._pack_result(msg, path=path, parameter=parameter) return result @@ -156,9 +163,9 @@ def _handle_stats(self, **kwargs): return dct_result - def __handle_generic(self, path, **kwargs): - msg = "Generic handler '{}', Local/Global: {}/{}, HOSTNAME: '{}', ID: '{}'".format( - path, self.__local_count, self.get_cluster_count(), + def __handle_generic(self, path, parameter=None, **kwargs): + msg = "Generic handler '{}', Param='{}', Local/Global: {}/{}, HOSTNAME: '{}', ID: '{}'".format( + path, parameter, self.__local_count, self.get_cluster_count(), self.hostname, self.str_local_id ) return msg diff --git a/demo-basic-fastapi/src/main.py b/demo-basic-fastapi/src/main.py index f3ad9ee..b2c6523 100644 --- a/demo-basic-fastapi/src/main.py +++ b/demo-basic-fastapi/src/main.py @@ -1,6 +1,6 @@ import os from uuid import uuid4 -from fastapi import FastAPI, APIRouter +from fastapi import FastAPI, APIRouter, Query from engine import AppHandler, AppPaths @@ -10,19 +10,18 @@ ROUTE1 = AppPaths.PATH_ROOT['PATH'] @router1.get(ROUTE1) -async def root(): - return engine.handle_request(ROUTE1) +async def root(parameter: str = Query(...)): + return engine.handle_request(ROUTE1, parameter=parameter) ROUTE2 = AppPaths.PATH_STAT['PATH'] @router1.get(ROUTE2) -async def stat(): - return engine.handle_request(ROUTE2) +async def stat(parameter: str = Query(...)): + return engine.handle_request(ROUTE2, parameter=parameter)) # note: this is a catch-all route, so it should be the last route in the router @router1.get("/{full_path:path}", include_in_schema=False) -async def catch_all(full_path: str): - return engine.handle_request(full_path) +async def catch_all(full_path: str, parameter: str = Query(...)): + return engine.handle_request(full_path, parameter=parameter) - app.include_router(router1)